nm/
event.rs

1use std::marker::PhantomData;
2use std::time::{Duration, Instant};
3
4use crate::{EventBuilder, Magnitude, Observe, PublishModel, Pull};
5
6/// Allows you to observe the occurrences of an event in your code.
7///
8/// The typical pattern is to observe events via thread-local static variables.
9///
10/// # Publishing models
11///
12/// The ultimate goal of the metrics collected by an [`Event`] is to end up in a [`Report`][1].
13/// There are two models by which this can happen:
14///
15/// - **Pull** model - the reporting system queries each event in the process for its latest data
16///   set when generating a report. This is the default and requires no action from you.
17/// - **Push** model - data from an event only flows to a thread-local [`MetricsPusher`][2], which
18///   publishes the data into the reporting system on demand. This requires you to periodically
19///   trigger the publishing via [`MetricsPusher::push()`][3].
20///
21/// The push model has lower overhead but requires action from you to ensure that data is published.
22/// You may consider using it under controlled conditions, such as when you are certain that every
23/// thread that will be reporting data will also call the pusher at some point.
24///
25/// The choice of publishing model can be made separately for each event.
26///
27/// # Example (pull model)
28///
29/// ```
30/// use nm::Event;
31///
32/// thread_local! {
33///     static CONNECT_TIME_MS: Event = Event::builder()
34///         .name("net_http_connect_time_ms")
35///         .build();
36/// }
37///
38/// pub fn http_connect() {
39///     CONNECT_TIME_MS.with(|e| {
40///         e.observe_duration_millis(|| {
41///             do_http_connect();
42///         })
43///     });
44/// }
45/// # http_connect();
46/// # fn do_http_connect() {}
47/// ```
48///
49/// # Example (push model)
50///
51/// ```
52/// use nm::{Event, MetricsPusher, Push};
53///
54/// thread_local! {
55///     static HTTP_EVENTS_PUSHER: MetricsPusher = MetricsPusher::new();
56///
57///     static CONNECT_TIME_MS: Event<Push> = Event::builder()
58///         .name("net_http_connect_time_ms")
59///         .pusher_local(&HTTP_EVENTS_PUSHER)
60///         .build();
61/// }
62///
63/// pub fn http_connect() {
64///     CONNECT_TIME_MS.with(|e| {
65///         e.observe_duration_millis(|| {
66///             do_http_connect();
67///         })
68///     });
69/// }
70///
71/// loop {
72///     http_connect();
73///
74///     // Periodically push the data to the reporting system.
75///     if is_time_to_push() {
76///         HTTP_EVENTS_PUSHER.with(MetricsPusher::push);
77///     }
78///     # break; // Avoid infinite loop when running example.
79/// }
80/// # fn do_http_connect() {}
81/// # fn is_time_to_push() -> bool { true }
82/// ```
83///
84/// # Thread safety
85///
86/// This type is single-threaded. You would typically create instances in a
87/// `thread_local!` block, so each thread gets its own instance.
88///
89/// [1]: crate::Report
90/// [2]: crate::MetricsPusher
91/// [3]: crate::MetricsPusher::push
92#[derive(Debug)]
93pub struct Event<P = Pull>
94where
95    P: PublishModel,
96{
97    publish_model: P,
98
99    _single_threaded: PhantomData<*const ()>,
100}
101
102impl Event<Pull> {
103    /// Creates a new event builder with the default builder configuration.
104    #[must_use]
105    #[cfg_attr(test, mutants::skip)] // Gets replaced with itself by different name, bad mutation.
106    pub fn builder() -> EventBuilder<Pull> {
107        EventBuilder::new()
108    }
109}
110
111impl<P> Event<P>
112where
113    P: PublishModel,
114{
115    #[must_use]
116    pub(crate) fn new(publish_model: P) -> Self {
117        Self {
118            publish_model,
119            _single_threaded: PhantomData,
120        }
121    }
122
123    /// Observes an event that has no explicit magnitude.
124    ///
125    /// By convention, this is represented as a magnitude of 1. We expose a separate
126    /// method for this to make it clear that the magnitude has no inherent meaning.
127    #[inline]
128    pub fn observe_once(&self) {
129        self.batch(1).observe(1);
130    }
131
132    /// Observes an event with a specific magnitude.
133    #[inline]
134    pub fn observe(&self, magnitude: Magnitude) {
135        self.batch(1).observe(magnitude);
136    }
137
138    /// Observes an event with the magnitude being the indicated duration in milliseconds.
139    ///
140    /// Only the whole number part of the duration is used - fractional milliseconds are ignored.
141    /// Values outside the i64 range are not guaranteed to be correctly represented.
142    #[inline]
143    pub fn observe_millis(&self, duration: Duration) {
144        self.batch(1).observe_millis(duration);
145    }
146
147    /// Observes the duration of a function call, in milliseconds.
148    #[inline]
149    pub fn observe_duration_millis<F, R>(&self, f: F) -> R
150    where
151        F: FnOnce() -> R,
152    {
153        self.batch(1).observe_duration_millis(f)
154    }
155
156    /// Prepares to observe a batch of events with the same magnitude.
157    ///
158    /// # Example
159    ///
160    /// ```
161    /// use nm::Event;
162    ///
163    /// thread_local! {
164    ///     static REQUESTS_PROCESSED: Event = Event::builder()
165    ///         .name("requests_processed")
166    ///         .build();
167    ///     static HTTP_RESPONSE_TIME_MS: Event = Event::builder()
168    ///         .name("http_response_time_ms")
169    ///         .build();
170    /// }
171    ///
172    /// // Record 100 HTTP responses, each taking 50ms
173    /// HTTP_RESPONSE_TIME_MS.with(|event| {
174    ///     event.batch(100).observe(50);
175    /// });
176    ///
177    /// // Record 50 simple count events
178    /// REQUESTS_PROCESSED.with(|event| {
179    ///     event.batch(50).observe_once();
180    /// });
181    /// ```
182    #[must_use]
183    #[inline]
184    pub fn batch(&self, count: usize) -> ObservationBatch<'_, P> {
185        ObservationBatch { event: self, count }
186    }
187
188    #[cfg(test)]
189    pub(crate) fn snapshot(&self) -> crate::ObservationBagSnapshot {
190        self.publish_model.snapshot()
191    }
192}
193
194/// A batch of pending observations for an event, waiting for the magnitude to be specified.
195#[derive(Debug)]
196pub struct ObservationBatch<'a, P>
197where
198    P: PublishModel,
199{
200    event: &'a Event<P>,
201    count: usize,
202}
203
204impl<P> ObservationBatch<'_, P>
205where
206    P: PublishModel,
207{
208    /// Observes a batch of events that have no explicit magnitude.
209    ///
210    /// By convention, this is represented as a magnitude of 1. We expose a separate
211    /// method for this to make it clear that the magnitude has no inherent meaning.
212    #[inline]
213    pub fn observe_once(&self) {
214        self.event.publish_model.insert(1, self.count);
215    }
216
217    /// Observes a batch of events with a specific magnitude.
218    #[inline]
219    pub fn observe(&self, magnitude: Magnitude) {
220        self.event.publish_model.insert(magnitude, self.count);
221    }
222
223    /// Observes an event with the magnitude being the indicated duration in milliseconds.
224    ///
225    /// Only the whole number part of the duration is used - fractional milliseconds are ignored.
226    /// Values outside the i64 range are not guaranteed to be correctly represented.
227    #[inline]
228    pub fn observe_millis(&self, duration: Duration) {
229        #[expect(
230            clippy::cast_possible_truncation,
231            reason = "intentional - nothing we can do about it; typical values are in safe range"
232        )]
233        let millis = duration.as_millis() as i64;
234
235        self.event.publish_model.insert(millis, self.count);
236    }
237
238    /// Observes the duration of a function call, in milliseconds.
239    #[inline]
240    pub fn observe_duration_millis<F, R>(&self, f: F) -> R
241    where
242        F: FnOnce() -> R,
243    {
244        // TODO: Use low precision time to make this faster.
245        // TODO: Consider supporting ultra low precision time from external source.
246        let start = Instant::now();
247
248        let result = f();
249
250        self.observe_millis(start.elapsed());
251
252        result
253    }
254}
255
256impl<P> Observe for Event<P>
257where
258    P: PublishModel,
259{
260    #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
261    #[inline]
262    fn observe_once(&self) {
263        self.observe_once();
264    }
265
266    #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
267    #[inline]
268    fn observe(&self, magnitude: Magnitude) {
269        self.observe(magnitude);
270    }
271
272    #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
273    #[inline]
274    fn observe_millis(&self, duration: Duration) {
275        self.observe_millis(duration);
276    }
277
278    #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
279    #[inline]
280    fn observe_duration_millis<F, R>(&self, f: F) -> R
281    where
282        F: FnOnce() -> R,
283    {
284        self.observe_duration_millis(f)
285    }
286}
287
288impl<P> Observe for ObservationBatch<'_, P>
289where
290    P: PublishModel,
291{
292    #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
293    #[inline]
294    fn observe_once(&self) {
295        self.observe_once();
296    }
297
298    #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
299    #[inline]
300    fn observe(&self, magnitude: Magnitude) {
301        self.observe(magnitude);
302    }
303
304    #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
305    #[inline]
306    fn observe_millis(&self, duration: Duration) {
307        self.observe_millis(duration);
308    }
309
310    #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
311    #[inline]
312    fn observe_duration_millis<F, R>(&self, f: F) -> R
313    where
314        F: FnOnce() -> R,
315    {
316        self.observe_duration_millis(f)
317    }
318}
319
320#[cfg(test)]
321mod tests {
322    use std::rc::Rc;
323    use std::sync::Arc;
324
325    use static_assertions::assert_not_impl_any;
326
327    use super::*;
328    use crate::{ObservationBag, ObservationBagSync, Push};
329
330    #[test]
331    fn pull_event_observations_are_recorded() {
332        // Histogram logic is tested as part of ObservationBag tests, so we do not bother
333        // with it here - we assume that if data is correctly recorded, it will reach the histogram.
334        let observations = Arc::new(ObservationBagSync::new(&[]));
335
336        let event = Event {
337            publish_model: Pull { observations },
338            _single_threaded: PhantomData,
339        };
340
341        let snapshot = event.snapshot();
342
343        assert_eq!(snapshot.count, 0);
344        assert_eq!(snapshot.sum, 0);
345
346        event.observe_once();
347
348        let snapshot = event.snapshot();
349
350        assert_eq!(snapshot.count, 1);
351        assert_eq!(snapshot.sum, 1);
352
353        event.batch(3).observe_once();
354
355        let snapshot = event.snapshot();
356        assert_eq!(snapshot.count, 4);
357        assert_eq!(snapshot.sum, 4);
358
359        event.observe(5);
360
361        let snapshot = event.snapshot();
362        assert_eq!(snapshot.count, 5);
363        assert_eq!(snapshot.sum, 9);
364
365        event.observe_millis(Duration::from_millis(100));
366
367        let snapshot = event.snapshot();
368        assert_eq!(snapshot.count, 6);
369        assert_eq!(snapshot.sum, 109);
370
371        event.batch(2).observe(10);
372
373        let snapshot = event.snapshot();
374        assert_eq!(snapshot.count, 8);
375        assert_eq!(snapshot.sum, 129);
376    }
377
378    #[test]
379    fn push_event_observations_are_recorded() {
380        // Histogram logic is tested as part of ObservationBag tests, so we do not bother
381        // with it here - we assume that if data is correctly recorded, it will reach the histogram.
382        let observations = Rc::new(ObservationBag::new(&[]));
383
384        let event = Event {
385            publish_model: Push { observations },
386            _single_threaded: PhantomData,
387        };
388
389        let snapshot = event.snapshot();
390
391        assert_eq!(snapshot.count, 0);
392        assert_eq!(snapshot.sum, 0);
393
394        event.observe_once();
395
396        let snapshot = event.snapshot();
397
398        assert_eq!(snapshot.count, 1);
399        assert_eq!(snapshot.sum, 1);
400
401        event.batch(3).observe_once();
402
403        let snapshot = event.snapshot();
404        assert_eq!(snapshot.count, 4);
405        assert_eq!(snapshot.sum, 4);
406
407        event.observe(5);
408
409        let snapshot = event.snapshot();
410        assert_eq!(snapshot.count, 5);
411        assert_eq!(snapshot.sum, 9);
412
413        event.observe_millis(Duration::from_millis(100));
414
415        let snapshot = event.snapshot();
416        assert_eq!(snapshot.count, 6);
417        assert_eq!(snapshot.sum, 109);
418
419        event.batch(2).observe(10);
420
421        let snapshot = event.snapshot();
422        assert_eq!(snapshot.count, 8);
423        assert_eq!(snapshot.sum, 129);
424    }
425
426    #[test]
427    fn single_threaded_type() {
428        assert_not_impl_any!(Event: Send, Sync);
429    }
430}