nm/
event.rs

1use std::marker::PhantomData;
2use std::time::{Duration, Instant};
3
4use num_traits::AsPrimitive;
5
6use crate::{EventBuilder, Magnitude, Observe, PublishModel, Pull};
7
8/// Allows you to observe the occurrences of an event in your code.
9///
10/// The typical pattern is to observe events via thread-local static variables.
11///
12/// # Publishing models
13///
14/// The ultimate goal of the metrics collected by an [`Event`] is to end up in a [`Report`][1].
15/// There are two models by which this can happen:
16///
17/// - **Pull** model - the reporting system queries each event in the process for its latest data
18///   set when generating a report. This is the default and requires no action from you.
19/// - **Push** model - data from an event only flows to a thread-local [`MetricsPusher`][2], which
20///   publishes the data into the reporting system on demand. This requires you to periodically
21///   trigger the publishing via [`MetricsPusher::push()`][3].
22///
23/// The push model has lower overhead but requires action from you to ensure that data is published.
24/// You may consider using it under controlled conditions, such as when you are certain that every
25/// thread that will be reporting data will also call the pusher at some point.
26///
27/// The choice of publishing model can be made separately for each event.
28///
29/// # Example (pull model)
30///
31/// ```
32/// use nm::Event;
33///
34/// thread_local! {
35///     static CONNECT_TIME_MS: Event = Event::builder()
36///         .name("net_http_connect_time_ms")
37///         .build();
38/// }
39///
40/// pub fn http_connect() {
41///     CONNECT_TIME_MS.with(|e| {
42///         e.observe_duration_millis(|| {
43///             do_http_connect();
44///         })
45///     });
46/// }
47/// # http_connect();
48/// # fn do_http_connect() {}
49/// ```
50///
51/// # Example (push model)
52///
53/// ```
54/// use nm::{Event, MetricsPusher, Push};
55///
56/// thread_local! {
57///     static HTTP_EVENTS_PUSHER: MetricsPusher = MetricsPusher::new();
58///
59///     static CONNECT_TIME_MS: Event<Push> = Event::builder()
60///         .name("net_http_connect_time_ms")
61///         .pusher_local(&HTTP_EVENTS_PUSHER)
62///         .build();
63/// }
64///
65/// pub fn http_connect() {
66///     CONNECT_TIME_MS.with(|e| {
67///         e.observe_duration_millis(|| {
68///             do_http_connect();
69///         })
70///     });
71/// }
72///
73/// loop {
74///     http_connect();
75///
76///     // Periodically push the data to the reporting system.
77///     if is_time_to_push() {
78///         HTTP_EVENTS_PUSHER.with(MetricsPusher::push);
79///     }
80///     # break; // Avoid infinite loop when running example.
81/// }
82/// # fn do_http_connect() {}
83/// # fn is_time_to_push() -> bool { true }
84/// ```
85///
86/// # Thread safety
87///
88/// This type is single-threaded. You would typically create instances in a
89/// `thread_local!` block, so each thread gets its own instance.
90///
91/// [1]: crate::Report
92/// [2]: crate::MetricsPusher
93/// [3]: crate::MetricsPusher::push
94#[derive(Debug)]
95pub struct Event<P = Pull>
96where
97    P: PublishModel,
98{
99    publish_model: P,
100
101    _single_threaded: PhantomData<*const ()>,
102}
103
104impl Event<Pull> {
105    /// Creates a new event builder with the default builder configuration.
106    #[must_use]
107    #[cfg_attr(test, mutants::skip)] // Gets replaced with itself by different name, bad mutation.
108    pub fn builder() -> EventBuilder<Pull> {
109        EventBuilder::new()
110    }
111}
112
113impl<P> Event<P>
114where
115    P: PublishModel,
116{
117    #[must_use]
118    pub(crate) fn new(publish_model: P) -> Self {
119        Self {
120            publish_model,
121            _single_threaded: PhantomData,
122        }
123    }
124
125    /// Observes an event that has no explicit magnitude.
126    ///
127    /// By convention, this is represented as a magnitude of 1. We expose a separate
128    /// method for this to make it clear that the magnitude has no inherent meaning.
129    #[inline]
130    pub fn observe_once(&self) {
131        self.batch(1).observe(1);
132    }
133
134    /// Observes an event with a specific magnitude.
135    #[inline]
136    pub fn observe(&self, magnitude: impl AsPrimitive<Magnitude>) {
137        self.batch(1).observe(magnitude);
138    }
139
140    /// Observes an event with the magnitude being the indicated duration in milliseconds.
141    ///
142    /// Only the whole number part of the duration is used - fractional milliseconds are ignored.
143    /// Values outside the i64 range are not guaranteed to be correctly represented.
144    #[inline]
145    pub fn observe_millis(&self, duration: Duration) {
146        self.batch(1).observe_millis(duration);
147    }
148
149    /// Observes the duration of a function call, in milliseconds.
150    #[inline]
151    pub fn observe_duration_millis<F, R>(&self, f: F) -> R
152    where
153        F: FnOnce() -> R,
154    {
155        self.batch(1).observe_duration_millis(f)
156    }
157
158    /// Prepares to observe a batch of events with the same magnitude.
159    ///
160    /// # Example
161    ///
162    /// ```
163    /// use nm::Event;
164    ///
165    /// thread_local! {
166    ///     static REQUESTS_PROCESSED: Event = Event::builder()
167    ///         .name("requests_processed")
168    ///         .build();
169    ///     static HTTP_RESPONSE_TIME_MS: Event = Event::builder()
170    ///         .name("http_response_time_ms")
171    ///         .build();
172    /// }
173    ///
174    /// // Record 100 HTTP responses, each taking 50ms
175    /// HTTP_RESPONSE_TIME_MS.with(|event| {
176    ///     event.batch(100).observe(50);
177    /// });
178    ///
179    /// // Record 50 simple count events
180    /// REQUESTS_PROCESSED.with(|event| {
181    ///     event.batch(50).observe_once();
182    /// });
183    /// ```
184    #[must_use]
185    #[inline]
186    pub fn batch(&self, count: usize) -> ObservationBatch<'_, P> {
187        ObservationBatch { event: self, count }
188    }
189
190    #[cfg(test)]
191    pub(crate) fn snapshot(&self) -> crate::ObservationBagSnapshot {
192        self.publish_model.snapshot()
193    }
194}
195
196/// A batch of pending observations for an event, waiting for the magnitude to be specified.
197#[derive(Debug)]
198pub struct ObservationBatch<'a, P>
199where
200    P: PublishModel,
201{
202    event: &'a Event<P>,
203    count: usize,
204}
205
206impl<P> ObservationBatch<'_, P>
207where
208    P: PublishModel,
209{
210    /// Observes a batch of events that have no explicit magnitude.
211    ///
212    /// By convention, this is represented as a magnitude of 1. We expose a separate
213    /// method for this to make it clear that the magnitude has no inherent meaning.
214    #[inline]
215    pub fn observe_once(&self) {
216        self.event.publish_model.insert(1, self.count);
217    }
218
219    /// Observes a batch of events with a specific magnitude.
220    #[inline]
221    pub fn observe(&self, magnitude: impl AsPrimitive<Magnitude>) {
222        self.event.publish_model.insert(magnitude.as_(), self.count);
223    }
224
225    /// Observes an event with the magnitude being the indicated duration in milliseconds.
226    ///
227    /// Only the whole number part of the duration is used - fractional milliseconds are ignored.
228    /// Values outside the i64 range are not guaranteed to be correctly represented.
229    #[inline]
230    pub fn observe_millis(&self, duration: Duration) {
231        #[expect(
232            clippy::cast_possible_truncation,
233            reason = "intentional - nothing we can do about it; typical values are in safe range"
234        )]
235        let millis = duration.as_millis() as i64;
236
237        self.event.publish_model.insert(millis, self.count);
238    }
239
240    /// Observes the duration of a function call, in milliseconds.
241    #[inline]
242    pub fn observe_duration_millis<F, R>(&self, f: F) -> R
243    where
244        F: FnOnce() -> R,
245    {
246        // TODO: Use low precision time to make this faster.
247        // TODO: Consider supporting ultra low precision time from external source.
248        let start = Instant::now();
249
250        let result = f();
251
252        self.observe_millis(start.elapsed());
253
254        result
255    }
256}
257
258#[cfg_attr(coverage_nightly, coverage(off))] // Trivial forwarders.
259impl<P> Observe for Event<P>
260where
261    P: PublishModel,
262{
263    #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
264    #[inline]
265    fn observe_once(&self) {
266        self.observe_once();
267    }
268
269    #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
270    #[inline]
271    fn observe(&self, magnitude: impl AsPrimitive<Magnitude>) {
272        self.observe(magnitude);
273    }
274
275    #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
276    #[inline]
277    fn observe_millis(&self, duration: Duration) {
278        self.observe_millis(duration);
279    }
280
281    #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
282    #[inline]
283    fn observe_duration_millis<F, R>(&self, f: F) -> R
284    where
285        F: FnOnce() -> R,
286    {
287        self.observe_duration_millis(f)
288    }
289}
290
291#[cfg_attr(coverage_nightly, coverage(off))] // Trivial forwarders.
292impl<P> Observe for ObservationBatch<'_, P>
293where
294    P: PublishModel,
295{
296    #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
297    #[inline]
298    fn observe_once(&self) {
299        self.observe_once();
300    }
301
302    #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
303    #[inline]
304    fn observe(&self, magnitude: impl AsPrimitive<Magnitude>) {
305        self.observe(magnitude);
306    }
307
308    #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
309    #[inline]
310    fn observe_millis(&self, duration: Duration) {
311        self.observe_millis(duration);
312    }
313
314    #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
315    #[inline]
316    fn observe_duration_millis<F, R>(&self, f: F) -> R
317    where
318        F: FnOnce() -> R,
319    {
320        self.observe_duration_millis(f)
321    }
322}
323
324#[cfg(test)]
325#[cfg_attr(coverage_nightly, coverage(off))]
326mod tests {
327    use std::rc::Rc;
328    use std::sync::Arc;
329
330    use static_assertions::assert_not_impl_any;
331
332    use super::*;
333    use crate::{ObservationBag, ObservationBagSync, Push};
334
335    #[test]
336    fn pull_event_observations_are_recorded() {
337        // Histogram logic is tested as part of ObservationBag tests, so we do not bother
338        // with it here - we assume that if data is correctly recorded, it will reach the histogram.
339        let observations = Arc::new(ObservationBagSync::new(&[]));
340
341        let event = Event {
342            publish_model: Pull { observations },
343            _single_threaded: PhantomData,
344        };
345
346        let snapshot = event.snapshot();
347
348        assert_eq!(snapshot.count, 0);
349        assert_eq!(snapshot.sum, 0);
350
351        event.observe_once();
352
353        let snapshot = event.snapshot();
354
355        assert_eq!(snapshot.count, 1);
356        assert_eq!(snapshot.sum, 1);
357
358        event.batch(3).observe_once();
359
360        let snapshot = event.snapshot();
361        assert_eq!(snapshot.count, 4);
362        assert_eq!(snapshot.sum, 4);
363
364        event.observe(5);
365
366        let snapshot = event.snapshot();
367        assert_eq!(snapshot.count, 5);
368        assert_eq!(snapshot.sum, 9);
369
370        event.observe_millis(Duration::from_millis(100));
371
372        let snapshot = event.snapshot();
373        assert_eq!(snapshot.count, 6);
374        assert_eq!(snapshot.sum, 109);
375
376        event.batch(2).observe(10);
377
378        let snapshot = event.snapshot();
379        assert_eq!(snapshot.count, 8);
380        assert_eq!(snapshot.sum, 129);
381    }
382
383    #[test]
384    fn push_event_observations_are_recorded() {
385        // Histogram logic is tested as part of ObservationBag tests, so we do not bother
386        // with it here - we assume that if data is correctly recorded, it will reach the histogram.
387        let observations = Rc::new(ObservationBag::new(&[]));
388
389        let event = Event {
390            publish_model: Push { observations },
391            _single_threaded: PhantomData,
392        };
393
394        let snapshot = event.snapshot();
395
396        assert_eq!(snapshot.count, 0);
397        assert_eq!(snapshot.sum, 0);
398
399        event.observe_once();
400
401        let snapshot = event.snapshot();
402
403        assert_eq!(snapshot.count, 1);
404        assert_eq!(snapshot.sum, 1);
405
406        event.batch(3).observe_once();
407
408        let snapshot = event.snapshot();
409        assert_eq!(snapshot.count, 4);
410        assert_eq!(snapshot.sum, 4);
411
412        event.observe(5);
413
414        let snapshot = event.snapshot();
415        assert_eq!(snapshot.count, 5);
416        assert_eq!(snapshot.sum, 9);
417
418        event.observe_millis(Duration::from_millis(100));
419
420        let snapshot = event.snapshot();
421        assert_eq!(snapshot.count, 6);
422        assert_eq!(snapshot.sum, 109);
423
424        event.batch(2).observe(10);
425
426        let snapshot = event.snapshot();
427        assert_eq!(snapshot.count, 8);
428        assert_eq!(snapshot.sum, 129);
429    }
430
431    #[test]
432    fn event_accepts_different_numeric_types_without_casting() {
433        let event = Event::builder().name("test_event").build();
434
435        event.observe(1_u8);
436        event.observe(2_u16);
437        event.observe(3_u32);
438        event.observe(4_u64);
439        event.observe(5_usize);
440        event.observe(6.66);
441        event.observe(7_i32);
442        event.observe(8_i128);
443    }
444
445    #[test]
446    fn single_threaded_type() {
447        assert_not_impl_any!(Event: Send, Sync);
448    }
449}