metrix/
driver.rs

1//! The thing that makes it happen... You need it!
2use std::fmt;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5use std::thread;
6use std::time::{Duration, Instant};
7
8use crossbeam_channel::{
9    self, Receiver as CrossbeamReceiver, Sender as CrossbeamSender, TryRecvError,
10};
11
12use futures::{channel::oneshot, Future, TryFutureExt};
13
14use crate::instruments::switches::*;
15use crate::instruments::*;
16use crate::processor::{
17    AggregatesProcessors, ProcessesTelemetryMessages, ProcessingOutcome, ProcessingStrategy,
18};
19use crate::snapshot::{ItemKind, Snapshot};
20use crate::util;
21use crate::{Descriptive, PutsSnapshot};
22
23/// A Builder for a `TelemetryDriver`
24pub struct DriverBuilder {
25    /// An optional name that will also group the metrics under the name
26    ///
27    /// Default is `None`
28    pub name: Option<String>,
29    /// A title to be added when a `Snapshot` with descriptions is created
30    ///
31    /// Default is `None`
32    pub title: Option<String>,
33    /// A description to be added when a `Snapshot` with descriptions is created
34    ///
35    /// Default is `None`
36    pub description: Option<String>,
37    /// Sets the `ProcessingStrategy`
38    /// dropped. The default is to drop observations older
39    /// than **30 seconds** but not delta observations.
40    pub processing_strategy: ProcessingStrategy,
41    /// If true metrics for the `TelemetryDriver` will be added to the
42    /// generated `Snapshot`
43    ///
44    /// Default is `true`
45    pub with_driver_metrics: bool,
46}
47
48impl DriverBuilder {
49    pub fn new<T: Into<String>>(name: T) -> DriverBuilder {
50        let mut me = Self::default();
51        me.name = Some(name.into());
52        me
53    }
54
55    pub fn set_name<T: Into<String>>(mut self, name: T) -> Self {
56        self.name = Some(name.into());
57        self
58    }
59
60    pub fn set_title<T: Into<String>>(mut self, title: T) -> Self {
61        self.title = Some(title.into());
62        self
63    }
64
65    pub fn set_description<T: Into<String>>(mut self, description: T) -> Self {
66        self.description = Some(description.into());
67        self
68    }
69
70    pub fn set_processing_strategy(mut self, processing_strategy: ProcessingStrategy) -> Self {
71        self.processing_strategy = processing_strategy;
72        self
73    }
74
75    pub fn set_driver_metrics(mut self, enabled: bool) -> Self {
76        self.with_driver_metrics = enabled;
77        self
78    }
79
80    pub fn build(self) -> TelemetryDriver {
81        TelemetryDriver::new(
82            self.name,
83            self.title,
84            self.description,
85            self.processing_strategy,
86            self.with_driver_metrics,
87        )
88    }
89}
90
91impl Default for DriverBuilder {
92    fn default() -> Self {
93        Self {
94            name: None,
95            title: None,
96            description: None,
97            processing_strategy: ProcessingStrategy::default(),
98            with_driver_metrics: true,
99        }
100    }
101}
102
103/// Triggers registered `ProcessesTelemetryMessages` to
104/// poll for messages.
105///
106/// Runs its own background thread. The thread stops once
107/// this struct is dropped.
108///
109/// A `TelemetryDriver` can be 'mounted' into the hierarchy.
110/// If done so, it will still poll its children on its own thread
111/// independently.
112///
113/// # Optional Metrics
114///
115/// The driver can be configured to collect metrics on
116/// its own activities.
117///
118/// The metrics will be added to all snapshots
119/// under a field named `_metrix` which contains the
120/// following fields:
121///
122/// * `collections_per_second`: The number of observation collection runs
123/// done per second
124///
125/// * `collection_times_us`: A histogram of the time each observation collection
126/// took in microseconds.
127///
128/// * `observations_processed_per_second`: The number of observations processed
129/// per second.
130///
131/// * `observations_processed_per_collection`: A histogram of the
132/// number of observations processed during each run
133///
134/// * `observations_dropped_per_second`: The number of observations dropped
135/// per second. See also `max_observation_age`.
136///
137/// * `observations_dropped_per_collection`: A histogram of the
138/// number of observations dropped during each run. See also
139/// `max_observation_age`.
140///
141/// * `snapshots_per_second`: The number of snapshots taken per second.
142///
143/// * `snapshots_times_us`: A histogram of the times it took to take a snapshot
144/// in microseconds
145///
146/// * `dropped_observations_alarm`: Will be `true` if observations have been
147/// dropped. Will by default stay `true` for 60 seconds once triggered.
148///
149/// * `inactivity_alarm`: Will be `true` if no observations have been made for
150/// a certain amount of time. The default is 60 seconds.
151#[derive(Clone)]
152pub struct TelemetryDriver {
153    descriptives: Descriptives,
154    drop_guard: Arc<DropGuard>,
155    sender: CrossbeamSender<DriverMessage>,
156}
157
158struct DropGuard {
159    pub is_running: Arc<AtomicBool>,
160}
161
162impl Drop for DropGuard {
163    fn drop(&mut self) {
164        self.is_running.store(false, Ordering::Relaxed);
165    }
166}
167
168impl TelemetryDriver {
169    /// Creates a new `TelemetryDriver`.
170    ///
171    /// `max_observation_age` is the maximum age of an `Observation`
172    /// to be taken into account. This is determined by the `timestamp`
173    /// field of an `Observation`. `Observations` that are too old are simply
174    /// dropped. The default is **60 seconds**.
175    pub fn new(
176        name: Option<String>,
177        title: Option<String>,
178        description: Option<String>,
179        processing_strategy: ProcessingStrategy,
180        with_driver_metrics: bool,
181    ) -> TelemetryDriver {
182        let is_running = Arc::new(AtomicBool::new(true));
183
184        let driver_metrics = if with_driver_metrics {
185            Some(DriverMetrics {
186                instruments: DriverInstruments::default(),
187            })
188        } else {
189            None
190        };
191
192        let (sender, receiver) = crossbeam_channel::unbounded();
193
194        let mut descriptives = Descriptives::default();
195        descriptives.name = name;
196        descriptives.title = title;
197        descriptives.description = description;
198
199        let driver = TelemetryDriver {
200            descriptives: descriptives.clone(),
201            drop_guard: Arc::new(DropGuard {
202                is_running: is_running.clone(),
203            }),
204            sender,
205        };
206
207        start_telemetry_loop(
208            descriptives,
209            is_running,
210            processing_strategy,
211            driver_metrics,
212            receiver,
213        );
214
215        driver
216    }
217
218    /// Gets the name of this driver
219    pub fn name(&self) -> Option<&str> {
220        self.descriptives.name.as_deref()
221    }
222
223    /// Changes the `ProcessingStrategy`
224    pub fn change_processing_stragtegy(&self, strategy: ProcessingStrategy) {
225        let _ = self
226            .sender
227            .send(DriverMessage::SetProcessingStrategy(strategy));
228    }
229
230    /// Pauses processing of observations.
231    pub fn pause(&self) {
232        let _ = self.sender.send(DriverMessage::Pause);
233    }
234
235    /// Resumes processing of observations
236    pub fn resume(&self) {
237        let _ = self.sender.send(DriverMessage::Resume);
238    }
239
240    pub fn snapshot(&self, descriptive: bool) -> Result<Snapshot, GetSnapshotError> {
241        let snapshot = Snapshot::default();
242        let (tx, rx) = crossbeam_channel::unbounded();
243        let _ = self
244            .sender
245            .send(DriverMessage::GetSnapshotSync(snapshot, tx, descriptive));
246        rx.recv().map_err(|_err| GetSnapshotError)
247    }
248
249    pub fn snapshot_async(
250        &self,
251        descriptive: bool,
252    ) -> impl Future<Output = Result<Snapshot, GetSnapshotError>> + Send + 'static {
253        let snapshot = Snapshot::default();
254        let (tx, rx) = oneshot::channel();
255        let _ = self
256            .sender
257            .send(DriverMessage::GetSnapshotAsync(snapshot, tx, descriptive));
258        rx.map_err(|_| GetSnapshotError)
259    }
260}
261
262#[derive(Clone, Copy, Debug)]
263pub struct GetSnapshotError;
264
265impl ::std::error::Error for GetSnapshotError {
266    fn description(&self) -> &str {
267        "could not create a snapshot"
268    }
269
270    fn cause(&self) -> Option<&dyn ::std::error::Error> {
271        None
272    }
273}
274
275impl fmt::Display for GetSnapshotError {
276    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
277        write!(f, "could not create a snapshot")
278    }
279}
280
281impl ProcessesTelemetryMessages for TelemetryDriver {
282    /// Receive and handle pending operations
283    fn process(&mut self, _max: usize, _strategy: ProcessingStrategy) -> ProcessingOutcome {
284        ProcessingOutcome::default()
285    }
286}
287
288impl PutsSnapshot for TelemetryDriver {
289    fn put_snapshot(&self, into: &mut Snapshot, descriptive: bool) {
290        if let Ok(snapshot) = self.snapshot(descriptive) {
291            snapshot
292                .items
293                .into_iter()
294                .for_each(|(k, v)| into.push(k, v));
295        }
296    }
297}
298
299impl Default for TelemetryDriver {
300    fn default() -> TelemetryDriver {
301        TelemetryDriver::new(None, None, None, ProcessingStrategy::default(), true)
302    }
303}
304
305impl AggregatesProcessors for TelemetryDriver {
306    fn add_processor<P: ProcessesTelemetryMessages>(&mut self, processor: P) {
307        let _ = self
308            .sender
309            .send(DriverMessage::AddProcessor(Box::new(processor)));
310    }
311
312    fn add_snapshooter<S: PutsSnapshot>(&mut self, snapshooter: S) {
313        let _ = self
314            .sender
315            .send(DriverMessage::AddSnapshooter(Box::new(snapshooter)));
316    }
317}
318
319impl Descriptive for TelemetryDriver {
320    fn title(&self) -> Option<&str> {
321        self.descriptives.title.as_deref()
322    }
323
324    fn description(&self) -> Option<&str> {
325        self.descriptives.description.as_deref()
326    }
327}
328
329fn start_telemetry_loop(
330    descriptives: Descriptives,
331    is_running: Arc<AtomicBool>,
332    processing_strategy: ProcessingStrategy,
333    driver_metrics: Option<DriverMetrics>,
334    receiver: CrossbeamReceiver<DriverMessage>,
335) {
336    let builder = thread::Builder::new().name("metrix".to_string());
337    builder
338        .spawn(move || {
339            telemetry_loop(
340                descriptives,
341                &is_running,
342                processing_strategy,
343                driver_metrics,
344                receiver,
345            )
346        })
347        .unwrap();
348}
349
350enum DriverMessage {
351    AddProcessor(Box<dyn ProcessesTelemetryMessages>),
352    AddSnapshooter(Box<dyn PutsSnapshot>),
353    GetSnapshotSync(Snapshot, CrossbeamSender<Snapshot>, bool),
354    GetSnapshotAsync(Snapshot, oneshot::Sender<Snapshot>, bool),
355    SetProcessingStrategy(ProcessingStrategy),
356    Pause,
357    Resume,
358}
359
360fn telemetry_loop(
361    descriptives: Descriptives,
362    is_running: &AtomicBool,
363    processing_strategy: ProcessingStrategy,
364    mut driver_metrics: Option<DriverMetrics>,
365    receiver: CrossbeamReceiver<DriverMessage>,
366) {
367    let mut last_outcome_logged = Instant::now() - Duration::from_secs(60);
368    let mut dropped_since_last_logged = 0usize;
369
370    let mut processors: Vec<Box<dyn ProcessesTelemetryMessages>> = Vec::new();
371    let mut snapshooters: Vec<Box<dyn PutsSnapshot>> = Vec::new();
372
373    let mut processing_stragtegy = processing_strategy;
374
375    let mut paused = false;
376
377    loop {
378        if !is_running.load(Ordering::Relaxed) {
379            break;
380        }
381
382        let iteration_started = Instant::now();
383
384        match receiver.try_recv() {
385            Ok(message) => match message {
386                DriverMessage::AddProcessor(processor) => processors.push(processor),
387                DriverMessage::AddSnapshooter(snapshooter) => snapshooters.push(snapshooter),
388                DriverMessage::GetSnapshotSync(mut snapshot, back_channel, descriptive) => {
389                    put_values_into_snapshot(
390                        &mut snapshot,
391                        &processors,
392                        &snapshooters,
393                        driver_metrics.as_mut(),
394                        &descriptives,
395                        descriptive,
396                    );
397                    let _ = back_channel.send(snapshot);
398                }
399                DriverMessage::GetSnapshotAsync(mut snapshot, back_channel, descriptive) => {
400                    put_values_into_snapshot(
401                        &mut snapshot,
402                        &processors,
403                        &snapshooters,
404                        driver_metrics.as_mut(),
405                        &descriptives,
406                        descriptive,
407                    );
408                    let _ = back_channel.send(snapshot);
409                }
410                DriverMessage::SetProcessingStrategy(strategy) => {
411                    util::log_info(&format!("Processing strategy changed to {:?}", strategy));
412                    processing_stragtegy = strategy
413                }
414                DriverMessage::Pause => {
415                    util::log_info("pausing");
416                    paused = true
417                }
418                DriverMessage::Resume => {
419                    paused = {
420                        util::log_info("resuming");
421                        false
422                    }
423                }
424            },
425            Err(TryRecvError::Empty) => {}
426            Err(TryRecvError::Disconnected) => {
427                util::log_warning(
428                    "Driver failed to receive message. Channel disconnected. Exiting",
429                );
430                break;
431            }
432        }
433
434        if paused {
435            thread::sleep(Duration::from_millis(50));
436            continue;
437        }
438
439        let run_started = Instant::now();
440        let outcome = do_a_run(&mut processors, 1_000, processing_stragtegy);
441        let run_time = run_started.elapsed();
442
443        dropped_since_last_logged += outcome.dropped;
444
445        if dropped_since_last_logged > 0 && last_outcome_logged.elapsed() > Duration::from_secs(5) {
446            log_outcome(dropped_since_last_logged);
447            last_outcome_logged = Instant::now();
448            dropped_since_last_logged = 0;
449        }
450
451        if let Some(ref mut driver_metrics) = driver_metrics {
452            driver_metrics.update_post_collection(&outcome, run_started);
453        }
454
455        if outcome.dropped > 0 || outcome.processed > 100 {
456            continue;
457        }
458
459        let finished = Instant::now();
460        let elapsed = finished - run_started;
461        if outcome.dropped == 0 && elapsed < Duration::from_millis(10) {
462            thread::sleep(Duration::from_millis(10) - elapsed);
463        }
464        report_elapsed_stats(iteration_started, run_time, driver_metrics.as_mut());
465    }
466
467    util::log_info("Metrix driver stopped");
468}
469
470fn do_a_run(
471    processors: &mut [Box<dyn ProcessesTelemetryMessages>],
472    max: usize,
473    strategy: ProcessingStrategy,
474) -> ProcessingOutcome {
475    let mut outcome = ProcessingOutcome::default();
476
477    for processor in processors.iter_mut() {
478        outcome.combine_with(&processor.process(max, strategy));
479    }
480
481    outcome
482}
483
484fn report_elapsed_stats(
485    iteration_started: Instant,
486    run_time: Duration,
487    metrics: Option<&mut DriverMetrics>,
488) {
489    if let Some(metrics) = metrics {
490        let iteration_time = iteration_started.elapsed().as_secs_f64();
491        let run_time = run_time.as_secs_f64();
492
493        if iteration_time > 0.0 {
494            let ratio = run_time / iteration_time;
495            metrics.update_run_ratio(ratio);
496        }
497    }
498}
499
500fn put_values_into_snapshot(
501    into: &mut Snapshot,
502    processors: &[Box<dyn ProcessesTelemetryMessages>],
503    snapshooters: &[Box<dyn PutsSnapshot>],
504    driver_metrics: Option<&mut DriverMetrics>,
505    descriptives: &Descriptives,
506    descriptive: bool,
507) {
508    let started = Instant::now();
509
510    if let Some(ref name) = descriptives.name {
511        let mut new_level = Snapshot::default();
512        add_snapshot_values(
513            &mut new_level,
514            &processors,
515            &snapshooters,
516            driver_metrics,
517            &descriptives,
518            descriptive,
519            started,
520        );
521        into.items
522            .push((name.clone(), ItemKind::Snapshot(new_level)));
523    } else {
524        add_snapshot_values(
525            into,
526            &processors,
527            &snapshooters,
528            driver_metrics,
529            &descriptives,
530            descriptive,
531            started,
532        );
533    }
534}
535
536fn add_snapshot_values(
537    into: &mut Snapshot,
538    processors: &[Box<dyn ProcessesTelemetryMessages>],
539    snapshooters: &[Box<dyn PutsSnapshot>],
540    driver_metrics: Option<&mut DriverMetrics>,
541    descriptives: &Descriptives,
542    descriptive: bool,
543    started: Instant,
544) {
545    util::put_default_descriptives(descriptives, into, descriptive);
546    processors
547        .iter()
548        .for_each(|p| p.put_snapshot(into, descriptive));
549
550    snapshooters
551        .iter()
552        .for_each(|s| s.put_snapshot(into, descriptive));
553
554    if let Some(driver_metrics) = driver_metrics {
555        driver_metrics.update_post_snapshot(started);
556        driver_metrics.put_snapshot(into, descriptive);
557    }
558}
559
560#[derive(Clone)]
561struct Descriptives {
562    pub name: Option<String>,
563    pub title: Option<String>,
564    pub description: Option<String>,
565}
566
567impl Default for Descriptives {
568    fn default() -> Self {
569        Self {
570            name: None,
571            title: None,
572            description: None,
573        }
574    }
575}
576
577impl Descriptive for Descriptives {
578    fn title(&self) -> Option<&str> {
579        self.title.as_deref()
580    }
581
582    fn description(&self) -> Option<&str> {
583        self.description.as_deref()
584    }
585}
586
587#[cfg(feature = "log")]
588#[inline]
589fn log_outcome(dropped: usize) {
590    warn!("{} observations have been dropped.", dropped);
591}
592
593#[cfg(not(feature = "log"))]
594#[inline]
595fn log_outcome(_dropped: usize) {}
596
597struct DriverMetrics {
598    instruments: DriverInstruments,
599}
600
601impl DriverMetrics {
602    pub fn update_post_collection(
603        &mut self,
604        outcome: &ProcessingOutcome,
605        collection_started: Instant,
606    ) {
607        self.instruments
608            .update_post_collection(outcome, collection_started);
609    }
610
611    pub fn update_post_snapshot(&mut self, snapshot_started: Instant) {
612        self.instruments.update_post_snapshot(snapshot_started);
613    }
614
615    pub fn put_snapshot(&mut self, into: &mut Snapshot, descriptive: bool) {
616        self.instruments.put_snapshot(into, descriptive);
617    }
618    pub fn update_run_ratio(&mut self, ratio: f64) {
619        let now = Instant::now();
620        let per_mille = (ratio * 1_000f64) as u64;
621
622        self.instruments
623            .iteration_update_per_mille_ratio_histo
624            .update(&Update::ObservationWithValue(per_mille.into(), now));
625        self.instruments
626            .iteration_update_per_mille_ratio
627            .update(&Update::ObservationWithValue(per_mille.into(), now));
628    }
629}
630
631struct DriverInstruments {
632    collections_per_second: Meter,
633    collection_times_us: Histogram,
634    observations_processed_per_second: Meter,
635    observations_processed_per_collection: Histogram,
636    observations_dropped_per_second: Meter,
637    observations_dropped_per_collection: Histogram,
638    observations_enqueued: Gauge,
639    instruments_updated_per_second: Meter,
640    snapshots_per_second: Meter,
641    snapshots_times_us: Histogram,
642    dropped_observations_alarm: StaircaseTimer,
643    inactivity_alarm: NonOccurrenceIndicator,
644    iteration_update_per_mille_ratio_histo: Histogram,
645    iteration_update_per_mille_ratio: Gauge,
646}
647
648impl Default for DriverInstruments {
649    fn default() -> Self {
650        DriverInstruments {
651            collections_per_second: Meter::new_with_defaults("collections_per_second"),
652            collection_times_us: Histogram::new_with_defaults("collection_times_us"),
653            observations_processed_per_second: Meter::new_with_defaults(
654                "observations_processed_per_second",
655            ),
656            observations_processed_per_collection: Histogram::new_with_defaults(
657                "observations_processed_per_collection",
658            ),
659            observations_dropped_per_second: Meter::new_with_defaults(
660                "observations_dropped_per_second",
661            ),
662            observations_dropped_per_collection: Histogram::new_with_defaults(
663                "observations_dropped_per_collection",
664            ),
665            observations_enqueued: Gauge::new_with_defaults("observations_enqueued")
666                .tracking(60)
667                .group_values(true),
668            instruments_updated_per_second: Meter::new_with_defaults(
669                "instruments_updated_per_second",
670            ),
671            snapshots_per_second: Meter::new_with_defaults("snapshots_per_second"),
672            snapshots_times_us: Histogram::new_with_defaults("snapshots_times_us"),
673            dropped_observations_alarm: StaircaseTimer::new_with_defaults(
674                "dropped_observations_alarm",
675            ),
676            inactivity_alarm: NonOccurrenceIndicator::new_with_defaults("inactivity_alarm"),
677            iteration_update_per_mille_ratio_histo: Histogram::new_with_defaults(
678                "iteration_update_ratio_per_mille_histo",
679            ),
680            iteration_update_per_mille_ratio: Gauge::new_with_defaults(
681                "iteration_update_ratio_per_mille",
682            )
683            .tracking(60)
684            .group_values(true),
685        }
686    }
687}
688
689impl DriverInstruments {
690    pub fn update_post_collection(
691        &mut self,
692        outcome: &ProcessingOutcome,
693        collection_started: Instant,
694    ) {
695        let now = Instant::now();
696        self.collections_per_second
697            .update(&Update::Observation(now));
698        self.collection_times_us
699            .update(&Update::ObservationWithValue(
700                (now - collection_started).into(),
701                now,
702            ));
703        if outcome.processed > 0 {
704            self.observations_processed_per_second
705                .update(&Update::Observations(outcome.processed as u64, now));
706            self.observations_processed_per_collection
707                .update(&Update::ObservationWithValue(outcome.processed.into(), now));
708        }
709        self.observations_enqueued
710            .update(&Update::ObservationWithValue(
711                outcome.observations_enqueued.into(),
712                now,
713            ));
714        if outcome.dropped > 0 {
715            self.observations_dropped_per_second
716                .update(&Update::Observations(outcome.dropped as u64, now));
717            self.observations_dropped_per_collection
718                .update(&Update::ObservationWithValue(outcome.dropped.into(), now));
719            self.dropped_observations_alarm
720                .update(&Update::Observation(now));
721        }
722        if outcome.instruments_updated > 0 {
723            self.instruments_updated_per_second
724                .update(&Update::Observations(
725                    outcome.instruments_updated as u64,
726                    now,
727                ));
728        }
729        self.inactivity_alarm.update(&Update::Observation(now));
730    }
731
732    pub fn update_post_snapshot(&mut self, snapshot_started: Instant) {
733        let now = Instant::now();
734        self.snapshots_per_second.update(&Update::Observation(now));
735        self.snapshots_times_us
736            .update(&Update::ObservationWithValue(
737                (now - snapshot_started).into(),
738                now,
739            ));
740    }
741
742    pub fn put_snapshot(&self, into: &mut Snapshot, descriptive: bool) {
743        let mut container = Snapshot::default();
744        self.collections_per_second
745            .put_snapshot(&mut container, descriptive);
746        self.collection_times_us
747            .put_snapshot(&mut container, descriptive);
748        self.observations_processed_per_second
749            .put_snapshot(&mut container, descriptive);
750        self.observations_processed_per_collection
751            .put_snapshot(&mut container, descriptive);
752        self.observations_dropped_per_second
753            .put_snapshot(&mut container, descriptive);
754        self.observations_dropped_per_collection
755            .put_snapshot(&mut container, descriptive);
756        self.observations_enqueued
757            .put_snapshot(&mut container, descriptive);
758        self.instruments_updated_per_second
759            .put_snapshot(&mut container, descriptive);
760        self.snapshots_per_second
761            .put_snapshot(&mut container, descriptive);
762        self.snapshots_times_us
763            .put_snapshot(&mut container, descriptive);
764        self.dropped_observations_alarm
765            .put_snapshot(&mut container, descriptive);
766        self.inactivity_alarm
767            .put_snapshot(&mut container, descriptive);
768        self.iteration_update_per_mille_ratio
769            .put_snapshot(&mut container, descriptive);
770        self.iteration_update_per_mille_ratio_histo
771            .put_snapshot(&mut container, descriptive);
772
773        into.items
774            .push(("_metrix".into(), ItemKind::Snapshot(container)));
775    }
776}
777
778/*
779#[inline]
780fn duration_to_micros(d: Duration) -> u64 {
781    let nanos = (d.as_secs() * 1_000_000_000) + (d.subsec_nanos() as u64);
782    nanos / 1000
783}
784*/