nm/event.rs
1use std::marker::PhantomData;
2use std::time::{Duration, Instant};
3
4use num_traits::AsPrimitive;
5
6use crate::{EventBuilder, Magnitude, Observe, PublishModel, Pull};
7
8/// Allows you to observe the occurrences of an event in your code.
9///
10/// The typical pattern is to observe events via thread-local static variables.
11///
12/// # Publishing models
13///
14/// The ultimate goal of the metrics collected by an [`Event`] is to end up in a [`Report`][1].
15/// There are two models by which this can happen:
16///
17/// - **Pull** model - the reporting system queries each event in the process for its latest data
18/// set when generating a report. This is the default and requires no action from you.
19/// - **Push** model - data from an event only flows to a thread-local [`MetricsPusher`][2], which
20/// publishes the data into the reporting system on demand. This requires you to periodically
21/// trigger the publishing via [`MetricsPusher::push()`][3].
22///
23/// The push model has lower overhead but requires action from you to ensure that data is published.
24/// You may consider using it under controlled conditions, such as when you are certain that every
25/// thread that will be reporting data will also call the pusher at some point.
26///
27/// The choice of publishing model can be made separately for each event.
28///
29/// # Example (pull model)
30///
31/// ```
32/// use nm::Event;
33///
34/// thread_local! {
35/// static CONNECT_TIME_MS: Event = Event::builder()
36/// .name("net_http_connect_time_ms")
37/// .build();
38/// }
39///
40/// pub fn http_connect() {
41/// CONNECT_TIME_MS.with(|e| {
42/// e.observe_duration_millis(|| {
43/// do_http_connect();
44/// })
45/// });
46/// }
47/// # http_connect();
48/// # fn do_http_connect() {}
49/// ```
50///
51/// # Example (push model)
52///
53/// ```
54/// use nm::{Event, MetricsPusher, Push};
55///
56/// thread_local! {
57/// static HTTP_EVENTS_PUSHER: MetricsPusher = MetricsPusher::new();
58///
59/// static CONNECT_TIME_MS: Event<Push> = Event::builder()
60/// .name("net_http_connect_time_ms")
61/// .pusher_local(&HTTP_EVENTS_PUSHER)
62/// .build();
63/// }
64///
65/// pub fn http_connect() {
66/// CONNECT_TIME_MS.with(|e| {
67/// e.observe_duration_millis(|| {
68/// do_http_connect();
69/// })
70/// });
71/// }
72///
73/// loop {
74/// http_connect();
75///
76/// // Periodically push the data to the reporting system.
77/// if is_time_to_push() {
78/// HTTP_EVENTS_PUSHER.with(MetricsPusher::push);
79/// }
80/// # break; // Avoid infinite loop when running example.
81/// }
82/// # fn do_http_connect() {}
83/// # fn is_time_to_push() -> bool { true }
84/// ```
85///
86/// # Thread safety
87///
88/// This type is single-threaded. You would typically create instances in a
89/// `thread_local!` block, so each thread gets its own instance.
90///
91/// [1]: crate::Report
92/// [2]: crate::MetricsPusher
93/// [3]: crate::MetricsPusher::push
94#[derive(Debug)]
95pub struct Event<P = Pull>
96where
97 P: PublishModel,
98{
99 publish_model: P,
100
101 _single_threaded: PhantomData<*const ()>,
102}
103
104impl Event<Pull> {
105 /// Creates a new event builder with the default builder configuration.
106 #[must_use]
107 #[cfg_attr(test, mutants::skip)] // Gets replaced with itself by different name, bad mutation.
108 pub fn builder() -> EventBuilder<Pull> {
109 EventBuilder::new()
110 }
111}
112
113impl<P> Event<P>
114where
115 P: PublishModel,
116{
117 #[must_use]
118 pub(crate) fn new(publish_model: P) -> Self {
119 Self {
120 publish_model,
121 _single_threaded: PhantomData,
122 }
123 }
124
125 /// Observes an event that has no explicit magnitude.
126 ///
127 /// By convention, this is represented as a magnitude of 1. We expose a separate
128 /// method for this to make it clear that the magnitude has no inherent meaning.
129 #[inline]
130 pub fn observe_once(&self) {
131 self.batch(1).observe(1);
132 }
133
134 /// Observes an event with a specific magnitude.
135 #[inline]
136 pub fn observe(&self, magnitude: impl AsPrimitive<Magnitude>) {
137 self.batch(1).observe(magnitude);
138 }
139
140 /// Observes an event with the magnitude being the indicated duration in milliseconds.
141 ///
142 /// Only the whole number part of the duration is used - fractional milliseconds are ignored.
143 /// Values outside the i64 range are not guaranteed to be correctly represented.
144 #[inline]
145 pub fn observe_millis(&self, duration: Duration) {
146 self.batch(1).observe_millis(duration);
147 }
148
149 /// Observes the duration of a function call, in milliseconds.
150 #[inline]
151 pub fn observe_duration_millis<F, R>(&self, f: F) -> R
152 where
153 F: FnOnce() -> R,
154 {
155 self.batch(1).observe_duration_millis(f)
156 }
157
158 /// Prepares to observe a batch of events with the same magnitude.
159 ///
160 /// # Example
161 ///
162 /// ```
163 /// use nm::Event;
164 ///
165 /// thread_local! {
166 /// static REQUESTS_PROCESSED: Event = Event::builder()
167 /// .name("requests_processed")
168 /// .build();
169 /// static HTTP_RESPONSE_TIME_MS: Event = Event::builder()
170 /// .name("http_response_time_ms")
171 /// .build();
172 /// }
173 ///
174 /// // Record 100 HTTP responses, each taking 50ms
175 /// HTTP_RESPONSE_TIME_MS.with(|event| {
176 /// event.batch(100).observe(50);
177 /// });
178 ///
179 /// // Record 50 simple count events
180 /// REQUESTS_PROCESSED.with(|event| {
181 /// event.batch(50).observe_once();
182 /// });
183 /// ```
184 #[must_use]
185 #[inline]
186 pub fn batch(&self, count: usize) -> ObservationBatch<'_, P> {
187 ObservationBatch { event: self, count }
188 }
189
190 #[cfg(test)]
191 pub(crate) fn snapshot(&self) -> crate::ObservationBagSnapshot {
192 self.publish_model.snapshot()
193 }
194}
195
196/// A batch of pending observations for an event, waiting for the magnitude to be specified.
197#[derive(Debug)]
198pub struct ObservationBatch<'a, P>
199where
200 P: PublishModel,
201{
202 event: &'a Event<P>,
203 count: usize,
204}
205
206impl<P> ObservationBatch<'_, P>
207where
208 P: PublishModel,
209{
210 /// Observes a batch of events that have no explicit magnitude.
211 ///
212 /// By convention, this is represented as a magnitude of 1. We expose a separate
213 /// method for this to make it clear that the magnitude has no inherent meaning.
214 #[inline]
215 pub fn observe_once(&self) {
216 self.event.publish_model.insert(1, self.count);
217 }
218
219 /// Observes a batch of events with a specific magnitude.
220 #[inline]
221 pub fn observe(&self, magnitude: impl AsPrimitive<Magnitude>) {
222 self.event.publish_model.insert(magnitude.as_(), self.count);
223 }
224
225 /// Observes an event with the magnitude being the indicated duration in milliseconds.
226 ///
227 /// Only the whole number part of the duration is used - fractional milliseconds are ignored.
228 /// Values outside the i64 range are not guaranteed to be correctly represented.
229 #[inline]
230 pub fn observe_millis(&self, duration: Duration) {
231 #[expect(
232 clippy::cast_possible_truncation,
233 reason = "intentional - nothing we can do about it; typical values are in safe range"
234 )]
235 let millis = duration.as_millis() as i64;
236
237 self.event.publish_model.insert(millis, self.count);
238 }
239
240 /// Observes the duration of a function call, in milliseconds.
241 #[inline]
242 pub fn observe_duration_millis<F, R>(&self, f: F) -> R
243 where
244 F: FnOnce() -> R,
245 {
246 // TODO: Use low precision time to make this faster.
247 // TODO: Consider supporting ultra low precision time from external source.
248 let start = Instant::now();
249
250 let result = f();
251
252 self.observe_millis(start.elapsed());
253
254 result
255 }
256}
257
258#[cfg_attr(coverage_nightly, coverage(off))] // Trivial forwarders.
259impl<P> Observe for Event<P>
260where
261 P: PublishModel,
262{
263 #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
264 #[inline]
265 fn observe_once(&self) {
266 self.observe_once();
267 }
268
269 #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
270 #[inline]
271 fn observe(&self, magnitude: impl AsPrimitive<Magnitude>) {
272 self.observe(magnitude);
273 }
274
275 #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
276 #[inline]
277 fn observe_millis(&self, duration: Duration) {
278 self.observe_millis(duration);
279 }
280
281 #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
282 #[inline]
283 fn observe_duration_millis<F, R>(&self, f: F) -> R
284 where
285 F: FnOnce() -> R,
286 {
287 self.observe_duration_millis(f)
288 }
289}
290
291#[cfg_attr(coverage_nightly, coverage(off))] // Trivial forwarders.
292impl<P> Observe for ObservationBatch<'_, P>
293where
294 P: PublishModel,
295{
296 #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
297 #[inline]
298 fn observe_once(&self) {
299 self.observe_once();
300 }
301
302 #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
303 #[inline]
304 fn observe(&self, magnitude: impl AsPrimitive<Magnitude>) {
305 self.observe(magnitude);
306 }
307
308 #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
309 #[inline]
310 fn observe_millis(&self, duration: Duration) {
311 self.observe_millis(duration);
312 }
313
314 #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
315 #[inline]
316 fn observe_duration_millis<F, R>(&self, f: F) -> R
317 where
318 F: FnOnce() -> R,
319 {
320 self.observe_duration_millis(f)
321 }
322}
323
324#[cfg(test)]
325#[cfg_attr(coverage_nightly, coverage(off))]
326mod tests {
327 use std::rc::Rc;
328 use std::sync::Arc;
329
330 use static_assertions::assert_not_impl_any;
331
332 use super::*;
333 use crate::{ObservationBag, ObservationBagSync, Push};
334
335 #[test]
336 fn pull_event_observations_are_recorded() {
337 // Histogram logic is tested as part of ObservationBag tests, so we do not bother
338 // with it here - we assume that if data is correctly recorded, it will reach the histogram.
339 let observations = Arc::new(ObservationBagSync::new(&[]));
340
341 let event = Event {
342 publish_model: Pull { observations },
343 _single_threaded: PhantomData,
344 };
345
346 let snapshot = event.snapshot();
347
348 assert_eq!(snapshot.count, 0);
349 assert_eq!(snapshot.sum, 0);
350
351 event.observe_once();
352
353 let snapshot = event.snapshot();
354
355 assert_eq!(snapshot.count, 1);
356 assert_eq!(snapshot.sum, 1);
357
358 event.batch(3).observe_once();
359
360 let snapshot = event.snapshot();
361 assert_eq!(snapshot.count, 4);
362 assert_eq!(snapshot.sum, 4);
363
364 event.observe(5);
365
366 let snapshot = event.snapshot();
367 assert_eq!(snapshot.count, 5);
368 assert_eq!(snapshot.sum, 9);
369
370 event.observe_millis(Duration::from_millis(100));
371
372 let snapshot = event.snapshot();
373 assert_eq!(snapshot.count, 6);
374 assert_eq!(snapshot.sum, 109);
375
376 event.batch(2).observe(10);
377
378 let snapshot = event.snapshot();
379 assert_eq!(snapshot.count, 8);
380 assert_eq!(snapshot.sum, 129);
381 }
382
383 #[test]
384 fn push_event_observations_are_recorded() {
385 // Histogram logic is tested as part of ObservationBag tests, so we do not bother
386 // with it here - we assume that if data is correctly recorded, it will reach the histogram.
387 let observations = Rc::new(ObservationBag::new(&[]));
388
389 let event = Event {
390 publish_model: Push { observations },
391 _single_threaded: PhantomData,
392 };
393
394 let snapshot = event.snapshot();
395
396 assert_eq!(snapshot.count, 0);
397 assert_eq!(snapshot.sum, 0);
398
399 event.observe_once();
400
401 let snapshot = event.snapshot();
402
403 assert_eq!(snapshot.count, 1);
404 assert_eq!(snapshot.sum, 1);
405
406 event.batch(3).observe_once();
407
408 let snapshot = event.snapshot();
409 assert_eq!(snapshot.count, 4);
410 assert_eq!(snapshot.sum, 4);
411
412 event.observe(5);
413
414 let snapshot = event.snapshot();
415 assert_eq!(snapshot.count, 5);
416 assert_eq!(snapshot.sum, 9);
417
418 event.observe_millis(Duration::from_millis(100));
419
420 let snapshot = event.snapshot();
421 assert_eq!(snapshot.count, 6);
422 assert_eq!(snapshot.sum, 109);
423
424 event.batch(2).observe(10);
425
426 let snapshot = event.snapshot();
427 assert_eq!(snapshot.count, 8);
428 assert_eq!(snapshot.sum, 129);
429 }
430
431 #[test]
432 fn event_accepts_different_numeric_types_without_casting() {
433 let event = Event::builder().name("test_event").build();
434
435 event.observe(1_u8);
436 event.observe(2_u16);
437 event.observe(3_u32);
438 event.observe(4_u64);
439 event.observe(5_usize);
440 event.observe(6.66);
441 event.observe(7_i32);
442 event.observe(8_i128);
443 }
444
445 #[test]
446 fn single_threaded_type() {
447 assert_not_impl_any!(Event: Send, Sync);
448 }
449}