nm/event.rs
1use std::marker::PhantomData;
2use std::time::{Duration, Instant};
3
4use crate::{EventBuilder, Magnitude, Observe, PublishModel, Pull};
5
6/// Allows you to observe the occurrences of an event in your code.
7///
8/// The typical pattern is to observe events via thread-local static variables.
9///
10/// # Publishing models
11///
12/// The ultimate goal of the metrics collected by an [`Event`] is to end up in a [`Report`][1].
13/// There are two models by which this can happen:
14///
15/// - **Pull** model - the reporting system queries each event in the process for its latest data
16/// set when generating a report. This is the default and requires no action from you.
17/// - **Push** model - data from an event only flows to a thread-local [`MetricsPusher`][2], which
18/// publishes the data into the reporting system on demand. This requires you to periodically
19/// trigger the publishing via [`MetricsPusher::push()`][3].
20///
21/// The push model has lower overhead but requires action from you to ensure that data is published.
22/// You may consider using it under controlled conditions, such as when you are certain that every
23/// thread that will be reporting data will also call the pusher at some point.
24///
25/// The choice of publishing model can be made separately for each event.
26///
27/// # Example (pull model)
28///
29/// ```
30/// use nm::Event;
31///
32/// thread_local! {
33/// static CONNECT_TIME_MS: Event = Event::builder()
34/// .name("net_http_connect_time_ms")
35/// .build();
36/// }
37///
38/// pub fn http_connect() {
39/// CONNECT_TIME_MS.with(|e| {
40/// e.observe_duration_millis(|| {
41/// do_http_connect();
42/// })
43/// });
44/// }
45/// # http_connect();
46/// # fn do_http_connect() {}
47/// ```
48///
49/// # Example (push model)
50///
51/// ```
52/// use nm::{Event, MetricsPusher, Push};
53///
54/// thread_local! {
55/// static HTTP_EVENTS_PUSHER: MetricsPusher = MetricsPusher::new();
56///
57/// static CONNECT_TIME_MS: Event<Push> = Event::builder()
58/// .name("net_http_connect_time_ms")
59/// .pusher_local(&HTTP_EVENTS_PUSHER)
60/// .build();
61/// }
62///
63/// pub fn http_connect() {
64/// CONNECT_TIME_MS.with(|e| {
65/// e.observe_duration_millis(|| {
66/// do_http_connect();
67/// })
68/// });
69/// }
70///
71/// loop {
72/// http_connect();
73///
74/// // Periodically push the data to the reporting system.
75/// if is_time_to_push() {
76/// HTTP_EVENTS_PUSHER.with(MetricsPusher::push);
77/// }
78/// # break; // Avoid infinite loop when running example.
79/// }
80/// # fn do_http_connect() {}
81/// # fn is_time_to_push() -> bool { true }
82/// ```
83///
84/// # Thread safety
85///
86/// This type is single-threaded. You would typically create instances in a
87/// `thread_local!` block, so each thread gets its own instance.
88///
89/// [1]: crate::Report
90/// [2]: crate::MetricsPusher
91/// [3]: crate::MetricsPusher::push
92#[derive(Debug)]
93pub struct Event<P = Pull>
94where
95 P: PublishModel,
96{
97 publish_model: P,
98
99 _single_threaded: PhantomData<*const ()>,
100}
101
102impl Event<Pull> {
103 /// Creates a new event builder with the default builder configuration.
104 #[must_use]
105 #[cfg_attr(test, mutants::skip)] // Gets replaced with itself by different name, bad mutation.
106 pub fn builder() -> EventBuilder<Pull> {
107 EventBuilder::new()
108 }
109}
110
111impl<P> Event<P>
112where
113 P: PublishModel,
114{
115 #[must_use]
116 pub(crate) fn new(publish_model: P) -> Self {
117 Self {
118 publish_model,
119 _single_threaded: PhantomData,
120 }
121 }
122
123 /// Observes an event that has no explicit magnitude.
124 ///
125 /// By convention, this is represented as a magnitude of 1. We expose a separate
126 /// method for this to make it clear that the magnitude has no inherent meaning.
127 #[inline]
128 pub fn observe_once(&self) {
129 self.batch(1).observe(1);
130 }
131
132 /// Observes an event with a specific magnitude.
133 #[inline]
134 pub fn observe(&self, magnitude: Magnitude) {
135 self.batch(1).observe(magnitude);
136 }
137
138 /// Observes an event with the magnitude being the indicated duration in milliseconds.
139 ///
140 /// Only the whole number part of the duration is used - fractional milliseconds are ignored.
141 /// Values outside the i64 range are not guaranteed to be correctly represented.
142 #[inline]
143 pub fn observe_millis(&self, duration: Duration) {
144 self.batch(1).observe_millis(duration);
145 }
146
147 /// Observes the duration of a function call, in milliseconds.
148 #[inline]
149 pub fn observe_duration_millis<F, R>(&self, f: F) -> R
150 where
151 F: FnOnce() -> R,
152 {
153 self.batch(1).observe_duration_millis(f)
154 }
155
156 /// Prepares to observe a batch of events with the same magnitude.
157 ///
158 /// # Example
159 ///
160 /// ```
161 /// use nm::Event;
162 ///
163 /// thread_local! {
164 /// static REQUESTS_PROCESSED: Event = Event::builder()
165 /// .name("requests_processed")
166 /// .build();
167 /// static HTTP_RESPONSE_TIME_MS: Event = Event::builder()
168 /// .name("http_response_time_ms")
169 /// .build();
170 /// }
171 ///
172 /// // Record 100 HTTP responses, each taking 50ms
173 /// HTTP_RESPONSE_TIME_MS.with(|event| {
174 /// event.batch(100).observe(50);
175 /// });
176 ///
177 /// // Record 50 simple count events
178 /// REQUESTS_PROCESSED.with(|event| {
179 /// event.batch(50).observe_once();
180 /// });
181 /// ```
182 #[must_use]
183 #[inline]
184 pub fn batch(&self, count: usize) -> ObservationBatch<'_, P> {
185 ObservationBatch { event: self, count }
186 }
187
188 #[cfg(test)]
189 pub(crate) fn snapshot(&self) -> crate::ObservationBagSnapshot {
190 self.publish_model.snapshot()
191 }
192}
193
194/// A batch of pending observations for an event, waiting for the magnitude to be specified.
195#[derive(Debug)]
196pub struct ObservationBatch<'a, P>
197where
198 P: PublishModel,
199{
200 event: &'a Event<P>,
201 count: usize,
202}
203
204impl<P> ObservationBatch<'_, P>
205where
206 P: PublishModel,
207{
208 /// Observes a batch of events that have no explicit magnitude.
209 ///
210 /// By convention, this is represented as a magnitude of 1. We expose a separate
211 /// method for this to make it clear that the magnitude has no inherent meaning.
212 #[inline]
213 pub fn observe_once(&self) {
214 self.event.publish_model.insert(1, self.count);
215 }
216
217 /// Observes a batch of events with a specific magnitude.
218 #[inline]
219 pub fn observe(&self, magnitude: Magnitude) {
220 self.event.publish_model.insert(magnitude, self.count);
221 }
222
223 /// Observes an event with the magnitude being the indicated duration in milliseconds.
224 ///
225 /// Only the whole number part of the duration is used - fractional milliseconds are ignored.
226 /// Values outside the i64 range are not guaranteed to be correctly represented.
227 #[inline]
228 pub fn observe_millis(&self, duration: Duration) {
229 #[expect(
230 clippy::cast_possible_truncation,
231 reason = "intentional - nothing we can do about it; typical values are in safe range"
232 )]
233 let millis = duration.as_millis() as i64;
234
235 self.event.publish_model.insert(millis, self.count);
236 }
237
238 /// Observes the duration of a function call, in milliseconds.
239 #[inline]
240 pub fn observe_duration_millis<F, R>(&self, f: F) -> R
241 where
242 F: FnOnce() -> R,
243 {
244 // TODO: Use low precision time to make this faster.
245 // TODO: Consider supporting ultra low precision time from external source.
246 let start = Instant::now();
247
248 let result = f();
249
250 self.observe_millis(start.elapsed());
251
252 result
253 }
254}
255
256impl<P> Observe for Event<P>
257where
258 P: PublishModel,
259{
260 #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
261 #[inline]
262 fn observe_once(&self) {
263 self.observe_once();
264 }
265
266 #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
267 #[inline]
268 fn observe(&self, magnitude: Magnitude) {
269 self.observe(magnitude);
270 }
271
272 #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
273 #[inline]
274 fn observe_millis(&self, duration: Duration) {
275 self.observe_millis(duration);
276 }
277
278 #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
279 #[inline]
280 fn observe_duration_millis<F, R>(&self, f: F) -> R
281 where
282 F: FnOnce() -> R,
283 {
284 self.observe_duration_millis(f)
285 }
286}
287
288impl<P> Observe for ObservationBatch<'_, P>
289where
290 P: PublishModel,
291{
292 #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
293 #[inline]
294 fn observe_once(&self) {
295 self.observe_once();
296 }
297
298 #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
299 #[inline]
300 fn observe(&self, magnitude: Magnitude) {
301 self.observe(magnitude);
302 }
303
304 #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
305 #[inline]
306 fn observe_millis(&self, duration: Duration) {
307 self.observe_millis(duration);
308 }
309
310 #[cfg_attr(test, mutants::skip)] // Trivial forwarder.
311 #[inline]
312 fn observe_duration_millis<F, R>(&self, f: F) -> R
313 where
314 F: FnOnce() -> R,
315 {
316 self.observe_duration_millis(f)
317 }
318}
319
320#[cfg(test)]
321mod tests {
322 use std::rc::Rc;
323 use std::sync::Arc;
324
325 use static_assertions::assert_not_impl_any;
326
327 use super::*;
328 use crate::{ObservationBag, ObservationBagSync, Push};
329
330 #[test]
331 fn pull_event_observations_are_recorded() {
332 // Histogram logic is tested as part of ObservationBag tests, so we do not bother
333 // with it here - we assume that if data is correctly recorded, it will reach the histogram.
334 let observations = Arc::new(ObservationBagSync::new(&[]));
335
336 let event = Event {
337 publish_model: Pull { observations },
338 _single_threaded: PhantomData,
339 };
340
341 let snapshot = event.snapshot();
342
343 assert_eq!(snapshot.count, 0);
344 assert_eq!(snapshot.sum, 0);
345
346 event.observe_once();
347
348 let snapshot = event.snapshot();
349
350 assert_eq!(snapshot.count, 1);
351 assert_eq!(snapshot.sum, 1);
352
353 event.batch(3).observe_once();
354
355 let snapshot = event.snapshot();
356 assert_eq!(snapshot.count, 4);
357 assert_eq!(snapshot.sum, 4);
358
359 event.observe(5);
360
361 let snapshot = event.snapshot();
362 assert_eq!(snapshot.count, 5);
363 assert_eq!(snapshot.sum, 9);
364
365 event.observe_millis(Duration::from_millis(100));
366
367 let snapshot = event.snapshot();
368 assert_eq!(snapshot.count, 6);
369 assert_eq!(snapshot.sum, 109);
370
371 event.batch(2).observe(10);
372
373 let snapshot = event.snapshot();
374 assert_eq!(snapshot.count, 8);
375 assert_eq!(snapshot.sum, 129);
376 }
377
378 #[test]
379 fn push_event_observations_are_recorded() {
380 // Histogram logic is tested as part of ObservationBag tests, so we do not bother
381 // with it here - we assume that if data is correctly recorded, it will reach the histogram.
382 let observations = Rc::new(ObservationBag::new(&[]));
383
384 let event = Event {
385 publish_model: Push { observations },
386 _single_threaded: PhantomData,
387 };
388
389 let snapshot = event.snapshot();
390
391 assert_eq!(snapshot.count, 0);
392 assert_eq!(snapshot.sum, 0);
393
394 event.observe_once();
395
396 let snapshot = event.snapshot();
397
398 assert_eq!(snapshot.count, 1);
399 assert_eq!(snapshot.sum, 1);
400
401 event.batch(3).observe_once();
402
403 let snapshot = event.snapshot();
404 assert_eq!(snapshot.count, 4);
405 assert_eq!(snapshot.sum, 4);
406
407 event.observe(5);
408
409 let snapshot = event.snapshot();
410 assert_eq!(snapshot.count, 5);
411 assert_eq!(snapshot.sum, 9);
412
413 event.observe_millis(Duration::from_millis(100));
414
415 let snapshot = event.snapshot();
416 assert_eq!(snapshot.count, 6);
417 assert_eq!(snapshot.sum, 109);
418
419 event.batch(2).observe(10);
420
421 let snapshot = event.snapshot();
422 assert_eq!(snapshot.count, 8);
423 assert_eq!(snapshot.sum, 129);
424 }
425
426 #[test]
427 fn single_threaded_type() {
428 assert_not_impl_any!(Event: Send, Sync);
429 }
430}