Skip to main content

commonware_runtime/telemetry/metrics/
mod.rs

1//! Utility functions for metrics
2
3pub mod histogram;
4mod registration;
5pub mod status;
6pub(crate) mod task;
7
8/// Prefix for runtime metrics.
9pub(crate) const METRICS_PREFIX: &str = "runtime";
10
11pub use commonware_runtime_macros::{EncodeLabelSet, EncodeLabelValue, EncodeStruct};
12pub use prometheus_client::{
13    collector, encoding,
14    encoding::{
15        CounterValueEncoder, DescriptorEncoder, EncodeCounterValue, EncodeExemplarTime,
16        EncodeExemplarValue, EncodeGaugeValue, EncodeLabel, EncodeLabelKey,
17        EncodeLabelSet as EncodeLabelSetTrait, EncodeLabelValue as EncodeLabelValueTrait,
18        EncodeMetric, ExemplarValueEncoder, GaugeValueEncoder, LabelEncoder, LabelKeyEncoder,
19        LabelSetEncoder, LabelValueEncoder, MetricEncoder, NoLabelSet,
20    },
21    metrics::{MetricType, TypedMetric},
22    registry,
23    registry::Metric,
24};
25
26/// Underlying Prometheus metric types. Used when constructing a metric
27/// to pass to [`crate::Metrics::register`].
28pub mod raw {
29    pub use prometheus_client::metrics::{
30        counter::Counter,
31        family::{self, Family},
32        gauge::Gauge,
33        histogram::Histogram,
34    };
35}
36
37use commonware_utils::sync::Mutex;
38use prometheus_client::encoding::{
39    text::{encode, encode_eof},
40    MetricEncoder as PromMetricEncoder,
41};
42pub use registration::Registration;
43use std::{
44    any::Any,
45    borrow::Cow,
46    collections::{BTreeMap, HashMap},
47    ops::Deref,
48    sync::{atomic::Ordering, Arc, Weak},
49};
50
51/// Native integer width used by [`raw::Gauge`] on this target.
52///
53/// `i64` on platforms with 64-bit atomics, `i32` otherwise. Matches
54/// `prometheus_client::metrics::gauge::Gauge`'s backing type.
55#[cfg(target_has_atomic = "64")]
56pub type GaugeValue = i64;
57#[cfg(not(target_has_atomic = "64"))]
58pub type GaugeValue = i32;
59
60/// A registered counter metric.
61pub type Counter = Registered<raw::Counter>;
62/// A registered gauge metric.
63pub type Gauge = Registered<raw::Gauge>;
64/// A registered histogram metric.
65pub type Histogram = Registered<raw::Histogram>;
66/// A registered family of counters keyed by `L`.
67pub type CounterFamily<L> = Registered<raw::Family<L, raw::Counter>>;
68/// A registered family of gauges keyed by `L`.
69pub type GaugeFamily<L> = Registered<raw::Family<L, raw::Gauge>>;
70
71/// Convenience methods for Prometheus gauges.
72pub trait GaugeExt {
73    /// Set a gauge from a lossless integer conversion.
74    fn try_set<T: TryInto<GaugeValue>>(&self, value: T) -> Result<GaugeValue, T::Error>;
75
76    /// Atomically raise a gauge to at least the provided value.
77    fn try_set_max<T: TryInto<GaugeValue> + Copy>(&self, value: T) -> Result<GaugeValue, T::Error>;
78}
79
80impl GaugeExt for raw::Gauge {
81    fn try_set<T: TryInto<GaugeValue>>(&self, value: T) -> Result<GaugeValue, T::Error> {
82        let value = value.try_into()?;
83        Ok(self.set(value))
84    }
85
86    fn try_set_max<T: TryInto<GaugeValue> + Copy>(&self, value: T) -> Result<GaugeValue, T::Error> {
87        let value = value.try_into()?;
88        Ok(self.inner().fetch_max(value, Ordering::Relaxed))
89    }
90}
91
92pub use histogram::HistogramExt;
93
94/// One-line constructors for the common metric types.
95pub trait MetricsExt: crate::Metrics {
96    /// Register a counter with the runtime.
97    fn counter<N: Into<String>, H: Into<String>>(&self, name: N, help: H) -> Counter {
98        self.register(name, help, raw::Counter::default())
99    }
100
101    /// Register a gauge with the runtime.
102    fn gauge<N: Into<String>, H: Into<String>>(&self, name: N, help: H) -> Gauge {
103        self.register(name, help, raw::Gauge::default())
104    }
105
106    /// Register a histogram with the runtime.
107    fn histogram<N: Into<String>, H: Into<String>, I>(
108        &self,
109        name: N,
110        help: H,
111        buckets: I,
112    ) -> Histogram
113    where
114        I: IntoIterator<Item = f64>,
115    {
116        self.register(name, help, raw::Histogram::new(buckets))
117    }
118
119    /// Register a metric family with the runtime.
120    fn family<N, H, S, M>(&self, name: N, help: H) -> Registered<raw::Family<S, M>>
121    where
122        N: Into<String>,
123        H: Into<String>,
124        S: Clone + std::hash::Hash + Eq,
125        M: Default,
126        raw::Family<S, M>: Metric,
127    {
128        self.register(name, help, raw::Family::<S, M>::default())
129    }
130}
131
132impl<T: crate::Metrics> MetricsExt for T {}
133
134/// Validates that a label matches Prometheus metric name format: `[a-zA-Z][a-zA-Z0-9_]*`.
135///
136/// # Panics
137///
138/// Panics if the label is empty, starts with a non-alphabetic character,
139/// or contains characters other than `[a-zA-Z0-9_]`.
140pub fn validate_label(label: &str) {
141    let mut chars = label.chars();
142    assert!(
143        chars.next().is_some_and(|c| c.is_ascii_alphabetic()),
144        "label must start with [a-zA-Z]: {label}"
145    );
146    assert!(
147        chars.all(|c| c.is_ascii_alphanumeric() || c == '_'),
148        "label must only contain [a-zA-Z0-9_]: {label}"
149    );
150}
151
152/// Add an attribute to a sorted attribute list, maintaining sorted order via binary search.
153///
154/// Returns `true` if the key was new, `false` if it was a duplicate (value overwritten).
155pub fn add_attribute(
156    attributes: &mut Vec<(String, String)>,
157    key: &str,
158    value: impl std::fmt::Display,
159) -> bool {
160    let key_string = key.to_string();
161    let value_string = value.to_string();
162
163    match attributes.binary_search_by(|(k, _)| k.cmp(&key_string)) {
164        Ok(pos) => {
165            attributes[pos].1 = value_string;
166            false
167        }
168        Err(pos) => {
169            attributes.insert(pos, (key_string, value_string));
170            true
171        }
172    }
173}
174
175#[cfg(any(test, feature = "test-utils"))]
176fn matches_metric_name(full: &str, name: &str) -> bool {
177    full == name
178        || full
179            .strip_suffix(name)
180            .is_some_and(|prefix| prefix.ends_with('_'))
181}
182
183/// Return `true` if encoded Prometheus metrics contain a sample with `name` and `value`.
184///
185/// `name` may be either the full encoded metric name or its unprefixed suffix.
186/// Labels attached to the sample are ignored.
187#[cfg(any(test, feature = "test-utils"))]
188#[must_use]
189pub fn has_metric_value(metrics: &str, name: &str, value: impl std::fmt::Display) -> bool {
190    let value = value.to_string();
191    metrics.lines().any(|line| {
192        let line = line.trim();
193        if line.starts_with('#') {
194            return false;
195        }
196
197        let Some(sample_end) = line.find(|c: char| c == '{' || c.is_whitespace()) else {
198            return false;
199        };
200        let sample_name = &line[..sample_end];
201        if !matches_metric_name(sample_name, name) {
202            return false;
203        }
204
205        let mut rest = &line[sample_end..];
206        if let Some(labeled) = rest.strip_prefix('{') {
207            let Some(labels_end) = labeled.find('}') else {
208                return false;
209            };
210            rest = &labeled[labels_end + 1..];
211        }
212        if !rest.chars().next().is_some_and(char::is_whitespace) {
213            return false;
214        }
215
216        rest.split_whitespace().next() == Some(value.as_str())
217    })
218}
219
220/// Count the number of running tasks whose name starts with the given prefix.
221///
222/// This function encodes metrics and counts tasks that are currently running
223/// (have a value of 1) and whose name starts with the specified prefix.
224///
225/// This is useful for verifying that all child tasks under a given label hierarchy
226/// have been properly shut down.
227///
228/// # Example
229///
230/// ```rust
231/// use commonware_runtime::{
232///     deterministic, telemetry::metrics::count_running_tasks, Clock, Metrics, Runner, Spawner,
233///     Supervisor,
234/// };
235/// use std::time::Duration;
236///
237/// let executor = deterministic::Runner::default();
238/// executor.start(|context| async move {
239///     // Spawn a task under a labeled context
240///     let handle = context.child("worker").spawn(|ctx| async move {
241///         ctx.sleep(Duration::from_secs(100)).await;
242///     });
243///
244///     // Allow the task to start
245///     context.sleep(Duration::from_millis(10)).await;
246///
247///     // Count running tasks with "worker" prefix
248///     let count = count_running_tasks(&context, "worker");
249///     assert!(count > 0, "worker task should be running");
250///
251///     // Abort the task
252///     handle.abort();
253///     let _ = handle.await;
254///     context.sleep(Duration::from_millis(10)).await;
255///
256///     // Verify task is stopped
257///     let count = count_running_tasks(&context, "worker");
258///     assert_eq!(count, 0, "worker task should be stopped");
259/// });
260/// ```
261#[cfg(any(test, feature = "test-utils"))]
262pub fn count_running_tasks(metrics: &impl crate::Metrics, prefix: &str) -> usize {
263    let encoded = metrics.encode();
264    encoded
265        .lines()
266        .filter_map(|line| {
267            if !line.starts_with("runtime_tasks_running{") || !line.contains("kind=\"Task\"") {
268                return None;
269            }
270            let name = line.split("name=\"").nth(1)?.split('"').next()?;
271            if !name.starts_with(prefix) {
272                return None;
273            }
274            line.trim_end().rsplit(' ').next()?.parse::<usize>().ok()
275        })
276        .sum()
277}
278
279// Adaptation of client_rust's internal descriptor encoder.
280//
281// Source:
282// https://github.com/prometheus/client_rust/blob/4a6d40a55443d5b18f5be311d246c03e56f417d6/src/encoding/text.rs#L218-L275
283fn encode_descriptor<W>(
284    writer: &mut W,
285    name: &str,
286    help: &str,
287    metric_type: MetricType,
288) -> Result<(), std::fmt::Error>
289where
290    W: std::fmt::Write,
291{
292    writer.write_str("# HELP ")?;
293    writer.write_str(name)?;
294    writer.write_str(" ")?;
295    writer.write_str(help)?;
296    writer.write_str("\n# TYPE ")?;
297    writer.write_str(name)?;
298    writer.write_str(" ")?;
299    writer.write_str(metric_type.as_str())?;
300    writer.write_str("\n")?;
301    Ok(())
302}
303
304/// Join a metric or label prefix with a child name using Prometheus' `_` separator.
305pub(crate) fn prefixed_name(prefix: &str, name: &str) -> String {
306    if prefix.is_empty() {
307        name.to_string()
308    } else {
309        format!("{prefix}_{name}")
310    }
311}
312
313/// Build a child context label by appending `label` to `prefix`, asserting that
314/// `label` is valid and does not shadow the reserved runtime metric prefix.
315pub(crate) fn child_label(prefix: &str, label: &str) -> String {
316    validate_label(label);
317    let name = prefixed_name(prefix, label);
318    assert!(
319        !name.starts_with(METRICS_PREFIX),
320        "using runtime label is not allowed"
321    );
322    name
323}
324
325struct RegistryGuard {
326    id: usize,
327    registry: Weak<Mutex<RegistryInner>>,
328}
329
330impl Drop for RegistryGuard {
331    fn drop(&mut self) {
332        let Some(registry) = self.registry.upgrade() else {
333            return;
334        };
335        registry.lock().release_registration(self.id);
336    }
337}
338
339/// A metric handle whose lifetime controls registry exposure and attached cleanup.
340#[must_use = "registered metrics are removed when the returned handle is dropped"]
341pub struct Registered<M> {
342    metric: Arc<M>,
343    registration: Registration,
344}
345
346impl<M> Clone for Registered<M> {
347    fn clone(&self) -> Self {
348        Self {
349            metric: self.metric.clone(),
350            registration: self.registration.clone(),
351        }
352    }
353}
354
355impl<M> Registered<M> {
356    /// Create a metric handle with an explicit lifecycle registration.
357    ///
358    /// The provided [`Registration`] controls what happens when the last clone
359    /// of this handle is dropped. Use [`Registration::from`] with `()` for a
360    /// raw handle that is not exposed by a runtime registry.
361    pub fn with_registration(metric: M, registration: Registration) -> Self {
362        Self {
363            metric: Arc::new(metric),
364            registration,
365        }
366    }
367
368    pub fn metric(&self) -> &M {
369        self.metric.as_ref()
370    }
371}
372
373impl<S, M, C> Registered<raw::Family<S, M, C>>
374where
375    S: Clone + std::hash::Hash + Eq,
376    C: raw::family::MetricConstructor<M>,
377{
378    pub fn get_by<Q>(&self, label_set: &Q) -> Option<impl Deref<Target = M> + '_>
379    where
380        for<'a> S: From<&'a Q>,
381    {
382        let label_set = S::from(label_set);
383        self.get(&label_set)
384    }
385
386    pub fn get_or_create_by<Q>(&self, label_set: &Q) -> impl Deref<Target = M> + '_
387    where
388        for<'a> S: From<&'a Q>,
389    {
390        let label_set = S::from(label_set);
391        self.get_or_create(&label_set)
392    }
393
394    pub fn remove_by<Q>(&self, label_set: &Q) -> bool
395    where
396        for<'a> S: From<&'a Q>,
397    {
398        let label_set = S::from(label_set);
399        self.remove(&label_set)
400    }
401}
402
403impl<M> Deref for Registered<M> {
404    type Target = M;
405
406    fn deref(&self) -> &Self::Target {
407        self.metric()
408    }
409}
410
411impl<M: std::fmt::Debug> std::fmt::Debug for Registered<M> {
412    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
413        f.debug_struct("Registered")
414            .field("metric", self.metric())
415            .finish_non_exhaustive()
416    }
417}
418
419type MetricAttributes = Vec<(Cow<'static, str>, Cow<'static, str>)>;
420type MetricKey = (String, MetricAttributes);
421type SampleEncoder = dyn Fn(&mut String) -> Result<(), std::fmt::Error> + Send + Sync;
422
423struct PendingMetricEntry {
424    family_name: String,
425    attributes: MetricAttributes,
426    encode_samples: Box<SampleEncoder>,
427    metric_any: Arc<dyn Any + Send + Sync>,
428}
429
430pub(crate) struct SharedMetric<M>(pub(crate) Arc<M>);
431
432impl<M: std::fmt::Debug> std::fmt::Debug for SharedMetric<M> {
433    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
434        self.0.fmt(f)
435    }
436}
437
438impl<M: EncodeMetric> EncodeMetric for SharedMetric<M> {
439    fn encode(&self, encoder: PromMetricEncoder<'_>) -> Result<(), std::fmt::Error> {
440        self.0.encode(encoder)
441    }
442
443    fn metric_type(&self) -> MetricType {
444        self.0.metric_type()
445    }
446}
447
448fn create_sample_encoder<M>(
449    name: String,
450    labels: MetricAttributes,
451    metric: Arc<M>,
452) -> Box<SampleEncoder>
453where
454    M: Metric,
455{
456    // TODO (#3659): Avoid allocating an upstream registry per metric once
457    // `prometheus-client` exposes a public sample-only `MetricEncoder` path
458    // for encoding one metric with const labels.
459    let mut registry = registry::Registry::with_labels(labels.into_iter());
460    registry.register(name, "", SharedMetric(metric));
461
462    Box::new(move |samples| {
463        let mut encoded = String::new();
464        encode(&mut encoded, &registry).expect("encoding temporary metric registry failed");
465        for line in encoded.lines() {
466            if line.starts_with('#') {
467                continue;
468            }
469            samples.push_str(line);
470            samples.push('\n');
471        }
472        Ok(())
473    })
474}
475
476fn owned_attributes(attributes: Vec<(String, String)>) -> MetricAttributes {
477    attributes
478        .into_iter()
479        .map(|(k, v)| (Cow::Owned(k), Cow::Owned(v)))
480        .collect()
481}
482
483// Match upstream prometheus-client's `Descriptor::new` normalization.
484//
485// Source:
486// https://github.com/prometheus/client_rust/blob/4a6d40a55443d5b18f5be311d246c03e56f417d6/src/registry.rs#L340-L348
487fn normalize_help(help: String) -> String {
488    help + "."
489}
490
491struct MetricEntry {
492    family_name: String,
493    attributes: MetricAttributes,
494    encode_samples: Box<SampleEncoder>,
495    metric_any: Arc<dyn Any + Send + Sync>,
496    claims: usize,
497    family_index: usize,
498}
499
500#[derive(Debug)]
501struct MetricFamily {
502    help: String,
503    metric_type: MetricType,
504    descriptor: String,
505    metric_ids: Vec<usize>,
506}
507
508/// Manages metrics with explicit lifetimes.
509#[derive(Clone)]
510pub struct Registry {
511    inner: Arc<Mutex<RegistryInner>>,
512}
513
514struct RegistryInner {
515    /// Dense metric storage indexed by stable metric id.
516    metrics: Vec<Option<MetricEntry>>,
517    /// Metric ids that can be reused after a metric is fully unregistered.
518    free_metric_ids: Vec<usize>,
519    /// Metric families keyed by family name, kept sorted for deterministic encoding.
520    families: BTreeMap<String, MetricFamily>,
521    /// Exact metric keys for duplicate registration detection.
522    keys: HashMap<MetricKey, usize>,
523    /// Monotonic id source used when there is no reusable metric slot.
524    next_metric_id: usize,
525}
526
527impl Default for Registry {
528    fn default() -> Self {
529        Self::new()
530    }
531}
532
533impl Registry {
534    pub fn new() -> Self {
535        Self {
536            inner: Arc::new(Mutex::new(RegistryInner::new())),
537        }
538    }
539
540    pub(crate) fn register<M>(
541        &self,
542        name: String,
543        help: String,
544        attributes: Vec<(String, String)>,
545        metric: Arc<M>,
546    ) -> Registered<M>
547    where
548        M: Metric,
549    {
550        let mut inner = self.inner.lock();
551        inner.register(Arc::downgrade(&self.inner), name, help, attributes, metric)
552    }
553
554    pub fn encode(&self) -> String {
555        self.inner.lock().encode()
556    }
557}
558
559impl RegistryInner {
560    fn new() -> Self {
561        Self {
562            metrics: Vec::new(),
563            free_metric_ids: Vec::new(),
564            families: BTreeMap::new(),
565            keys: HashMap::new(),
566            next_metric_id: 0,
567        }
568    }
569
570    fn register<M>(
571        &mut self,
572        registry: Weak<Mutex<Self>>,
573        name: String,
574        help: String,
575        attributes: Vec<(String, String)>,
576        metric: Arc<M>,
577    ) -> Registered<M>
578    where
579        M: Metric,
580    {
581        let attributes = owned_attributes(attributes);
582        let help = normalize_help(help);
583        let metric_type = metric.metric_type();
584        let encode_samples =
585            create_sample_encoder(name.clone(), attributes.clone(), metric.clone());
586        let key = (name.clone(), attributes.clone());
587        if let Some(existing_id) = self.keys.get(&key).copied() {
588            let entry = self.metric_ref(existing_id);
589            if let Some(family) = self.families.get(&name) {
590                assert_eq!(
591                    family.help, help,
592                    "metric family `{}` registered with inconsistent help text",
593                    name
594                );
595            }
596            let existing_metric = Arc::clone(&entry.metric_any)
597                .downcast::<M>()
598                .unwrap_or_else(|_| {
599                    panic!(
600                        "duplicate metric `{}` with attributes {:?} registered with different type",
601                        key.0, key.1
602                    )
603                });
604            self.claim_registration(existing_id);
605            return Registered {
606                metric: existing_metric,
607                registration: Registration::from(RegistryGuard {
608                    id: existing_id,
609                    registry,
610                }),
611            };
612        }
613        self.assert_family_matches(&name, &help, metric_type);
614
615        let id = self.allocate_metric_id();
616        let registration = Registration::from(RegistryGuard { id, registry });
617        let metric_any: Arc<dyn Any + Send + Sync> = metric.clone();
618        self.insert_metric_entry(
619            id,
620            help,
621            metric_type,
622            PendingMetricEntry {
623                family_name: name,
624                attributes,
625                encode_samples,
626                metric_any,
627            },
628        );
629        Registered {
630            metric,
631            registration,
632        }
633    }
634
635    fn metric_slot_mut(&mut self, id: usize) -> &mut Option<MetricEntry> {
636        if id == self.metrics.len() {
637            self.metrics.push(None);
638        }
639        &mut self.metrics[id]
640    }
641
642    fn metric_ref(&self, id: usize) -> &MetricEntry {
643        self.metrics
644            .get(id)
645            .and_then(Option::as_ref)
646            .expect("metric id missing from registry")
647    }
648
649    fn metric_mut(&mut self, id: usize) -> &mut MetricEntry {
650        self.metrics
651            .get_mut(id)
652            .and_then(Option::as_mut)
653            .expect("metric id missing from registry")
654    }
655
656    fn allocate_metric_id(&mut self) -> usize {
657        if let Some(id) = self.free_metric_ids.pop() {
658            return id;
659        }
660        let id = self.next_metric_id;
661        self.next_metric_id = self
662            .next_metric_id
663            .checked_add(1)
664            .expect("metric id overflow");
665        id
666    }
667
668    fn assert_family_matches(&self, name: &str, help: &str, metric_type: MetricType) {
669        if let Some(family) = self.families.get(name) {
670            assert_eq!(
671                family.help, help,
672                "metric family `{}` registered with inconsistent help text",
673                name
674            );
675            assert_eq!(
676                family.metric_type.as_str(),
677                metric_type.as_str(),
678                "metric family `{}` registered with inconsistent metric type",
679                name
680            );
681        }
682    }
683
684    fn insert_metric_entry(
685        &mut self,
686        id: usize,
687        help: String,
688        metric_type: MetricType,
689        entry: PendingMetricEntry,
690    ) {
691        let PendingMetricEntry {
692            family_name,
693            attributes,
694            encode_samples,
695            metric_any,
696        } = entry;
697        self.keys
698            .insert((family_name.clone(), attributes.clone()), id);
699        let family = match self.families.entry(family_name.clone()) {
700            std::collections::btree_map::Entry::Occupied(entry) => entry.into_mut(),
701            std::collections::btree_map::Entry::Vacant(entry) => {
702                let mut descriptor = String::new();
703                encode_descriptor(&mut descriptor, &family_name, &help, metric_type)
704                    .expect("encoding cached descriptor failed");
705                entry.insert(MetricFamily {
706                    help,
707                    metric_type,
708                    descriptor,
709                    metric_ids: Vec::new(),
710                })
711            }
712        };
713        let family_index = family.metric_ids.len();
714        family.metric_ids.push(id);
715        self.metric_slot_mut(id).replace(MetricEntry {
716            family_name,
717            attributes,
718            encode_samples,
719            metric_any,
720            claims: 1,
721            family_index,
722        });
723    }
724
725    fn claim_registration(&mut self, id: usize) {
726        let entry = self.metric_mut(id);
727        entry.claims = entry
728            .claims
729            .checked_add(1)
730            .expect("registration claims overflow");
731    }
732
733    fn release_registration(&mut self, id: usize) {
734        let entry = self.metric_mut(id);
735        entry.claims = entry
736            .claims
737            .checked_sub(1)
738            .expect("registration claim count underflow");
739        if entry.claims > 0 {
740            return;
741        }
742        self.drop_metric_entry(id);
743    }
744
745    fn drop_metric_entry(&mut self, id: usize) {
746        let metric = self
747            .metrics
748            .get_mut(id)
749            .and_then(Option::take)
750            .expect("metric id missing from registry");
751        let MetricEntry {
752            family_name,
753            attributes,
754            family_index,
755            ..
756        } = metric;
757        let key = (family_name, attributes);
758        if self.keys.get(&key).copied() == Some(id) {
759            self.keys.remove(&key);
760        }
761        let (family_name, _) = key;
762        let (swapped_metric_id, remove_family) = {
763            let family = self
764                .families
765                .get_mut(&family_name)
766                .expect("family missing during unregister");
767            let removed = family.metric_ids.swap_remove(family_index);
768            assert_eq!(removed, id, "family index mismatch during unregister");
769            let swapped = family.metric_ids.get(family_index).copied();
770            (swapped, family.metric_ids.is_empty())
771        };
772        if let Some(swapped_metric_id) = swapped_metric_id {
773            self.metric_mut(swapped_metric_id).family_index = family_index;
774        }
775        if remove_family {
776            self.families.remove(&family_name);
777        }
778        self.free_metric_ids.push(id);
779    }
780
781    pub fn encode(&self) -> String {
782        let mut output = String::new();
783        let mut samples = String::new();
784        for family in self.families.values() {
785            samples.clear();
786            for metric_id in &family.metric_ids {
787                let metric = self.metric_ref(*metric_id);
788                (metric.encode_samples)(&mut samples).expect("encoding live metric samples failed");
789            }
790            // Suppress the HELP/TYPE descriptor when the family produced no
791            // samples (e.g. a `Family<S, M>` with no child entries). Matches
792            // upstream prometheus-client's empty-metric filtering.
793            if samples.is_empty() {
794                continue;
795            }
796            output.push_str(&family.descriptor);
797            output.push_str(&samples);
798        }
799
800        encode_eof(&mut output).expect("encoding EOF failed");
801        output
802    }
803}
804
805pub(crate) struct Scope {
806    registry: Registry,
807    prefix: String,
808}
809
810pub(crate) trait Register {
811    /// Register a metric under this scope's prefix.
812    fn register<M: Metric>(&mut self, name: &str, help: &str, metric: M) -> Registered<M>;
813
814    /// Create a child scope by appending `prefix` to the current prefix.
815    fn sub_registry(&mut self, prefix: &str) -> Scope;
816}
817
818impl Register for Registry {
819    fn register<M: Metric>(&mut self, name: &str, help: &str, metric: M) -> Registered<M> {
820        validate_label(name);
821        Self::register(
822            self,
823            name.to_string(),
824            help.to_string(),
825            Vec::new(),
826            Arc::new(metric),
827        )
828    }
829
830    fn sub_registry(&mut self, prefix: &str) -> Scope {
831        validate_label(prefix);
832        Scope {
833            registry: self.clone(),
834            prefix: prefix.to_string(),
835        }
836    }
837}
838
839impl Register for Scope {
840    fn register<M: Metric>(&mut self, name: &str, help: &str, metric: M) -> Registered<M> {
841        validate_label(name);
842        let name = prefixed_name(&self.prefix, name);
843        let help = help.to_string();
844        let metric = Arc::new(metric);
845        Registry::register(&self.registry, name, help, Vec::new(), metric)
846    }
847
848    fn sub_registry(&mut self, prefix: &str) -> Scope {
849        validate_label(prefix);
850        Self {
851            registry: self.registry.clone(),
852            prefix: prefixed_name(&self.prefix, prefix),
853        }
854    }
855}
856
857#[cfg(test)]
858mod tests {
859    use super::*;
860    use crate::{deterministic, Metrics as _, Runner, Spawner, Supervisor as _};
861    use commonware_macros::test_traced;
862    use futures::future;
863    use std::sync::mpsc::{self, TryRecvError};
864
865    #[test]
866    fn test_has_metric_value_unlabeled() {
867        let metrics = "# HELP storage_items_tracked items\nstorage_items_tracked 2\n";
868        assert!(has_metric_value(metrics, "items_tracked", 2));
869        assert!(has_metric_value(metrics, "storage_items_tracked", 2));
870        assert!(!has_metric_value(metrics, "items_tracked_extra", 2));
871        assert!(!has_metric_value(metrics, "items_tracked", 3));
872    }
873
874    #[test]
875    fn test_has_metric_value_labeled() {
876        let metrics = r#"storage_init_items_tracked{index="2"} 2"#;
877        assert!(has_metric_value(metrics, "items_tracked", 2));
878        assert!(has_metric_value(metrics, "storage_init_items_tracked", 2));
879    }
880
881    #[test_traced]
882    fn test_count_running_tasks() {
883        let executor = deterministic::Runner::default();
884        executor.start(|context| async move {
885            // Initially no tasks with "worker" prefix
886            assert_eq!(
887                count_running_tasks(&context, "worker"),
888                0,
889                "no worker tasks initially"
890            );
891
892            // Spawn a task under a labeled context that stays running
893            let handle1 = context.child("worker").spawn(|_| async move {
894                future::pending::<()>().await;
895            });
896
897            // Count running tasks with "worker" prefix
898            let count = count_running_tasks(&context, "worker");
899            assert_eq!(count, 1, "worker task should be running");
900
901            // Non-matching prefix should return 0
902            assert_eq!(
903                count_running_tasks(&context, "other"),
904                0,
905                "no tasks with 'other' prefix"
906            );
907
908            // Spawn a nested task (worker_child)
909            let handle2 = context
910                .child("worker")
911                .child("child")
912                .spawn(|_| async move {
913                    future::pending::<()>().await;
914                });
915
916            // Count should include both parent and nested tasks
917            let count = count_running_tasks(&context, "worker");
918            assert_eq!(count, 2, "both worker and worker_child should be counted");
919
920            // Abort parent task
921            handle1.abort();
922            let _ = handle1.await;
923
924            // Only nested task remains
925            let count = count_running_tasks(&context, "worker");
926            assert_eq!(count, 1, "only worker_child should remain");
927
928            // Abort nested task
929            handle2.abort();
930            let _ = handle2.await;
931
932            // All tasks stopped
933            assert_eq!(
934                count_running_tasks(&context, "worker"),
935                0,
936                "all worker tasks should be stopped"
937            );
938        });
939    }
940
941    #[test_traced]
942    fn test_no_duplicate_metrics() {
943        let executor = deterministic::Runner::default();
944        executor.start(|context| async move {
945            // Register metrics under different labels (no duplicates)
946            let c1 = raw::Counter::<u64>::default();
947            let _metric_a = context.child("a").register("test", "help", c1);
948            let c2 = raw::Counter::<u64>::default();
949            let _metric_b = context.child("b").register("test", "help", c2);
950        });
951        // Test passes if runtime doesn't panic on shutdown
952    }
953
954    #[test_traced]
955    fn test_duplicate_metrics_reuse_existing_handle() {
956        let executor = deterministic::Runner::default();
957        executor.start(|context| async move {
958            let c1 = raw::Counter::<u64>::default();
959            let metric_a = context.child("a").register("test", "help", c1);
960            let c2 = raw::Counter::<u64>::default();
961            let metric_b = context.child("a").register("test", "help", c2);
962
963            assert!(std::ptr::eq(metric_a.metric(), metric_b.metric()));
964
965            metric_a.inc();
966            metric_b.inc_by(2);
967            let encoded = context.encode();
968            assert!(encoded.contains("a_test_total 3"));
969        });
970    }
971
972    #[test]
973    fn test_claims_track_register_calls_not_handle_clones() {
974        let registry = Registry::new();
975        let key: MetricKey = ("votes".to_string(), Vec::new());
976
977        let first = registry.register(
978            key.0.clone(),
979            "vote count".to_string(),
980            Vec::new(),
981            Arc::new(raw::Counter::<u64>::default()),
982        );
983        let first_clone = first.clone();
984        let id = {
985            let registry = registry.inner.lock();
986            let id = *registry.keys.get(&key).expect("metric key missing");
987            assert_eq!(registry.metric_ref(id).claims, 1);
988            id
989        };
990
991        let second = registry.register(
992            key.0,
993            "vote count".to_string(),
994            Vec::new(),
995            Arc::new(raw::Counter::<u64>::default()),
996        );
997        let second_clone = second.clone();
998        {
999            let registry = registry.inner.lock();
1000            assert_eq!(registry.metric_ref(id).claims, 2);
1001        }
1002
1003        drop(first);
1004        drop(second);
1005        {
1006            let registry = registry.inner.lock();
1007            assert_eq!(registry.metric_ref(id).claims, 2);
1008        }
1009
1010        drop(second_clone);
1011        {
1012            let registry = registry.inner.lock();
1013            assert_eq!(registry.metric_ref(id).claims, 1);
1014        }
1015
1016        drop(first_clone);
1017        let registry = registry.inner.lock();
1018        assert!(
1019            registry.keys.is_empty(),
1020            "keys left behind: {:?}",
1021            registry.keys
1022        );
1023        assert!(
1024            registry.families.is_empty(),
1025            "families left behind: {:?}",
1026            registry.families
1027        );
1028    }
1029
1030    #[test]
1031    #[should_panic(expected = "registered with different type")]
1032    fn test_duplicate_metrics_different_type_panics() {
1033        let executor = deterministic::Runner::default();
1034        executor.start(|context| async move {
1035            let counter = raw::Counter::<u64>::default();
1036            let _metric_a = context.child("a").register("test", "help", counter);
1037            let gauge = raw::Gauge::<i64>::default();
1038            let _metric_b = context.child("a").register("test", "help", gauge);
1039        });
1040    }
1041
1042    #[test]
1043    fn test_duplicate_register_acquires_during_last_drop_window() {
1044        let registry = Registry::new();
1045        let key: MetricKey = ("votes".to_string(), Vec::new());
1046
1047        let original = registry.register(
1048            key.0.clone(),
1049            "vote count".to_string(),
1050            Vec::new(),
1051            Arc::new(raw::Counter::<u64>::default()),
1052        );
1053        let original_metric = Arc::clone(&original.metric);
1054        let _original = std::mem::ManuallyDrop::new(original);
1055        let original_id = {
1056            let registry = registry.inner.lock();
1057            *registry.keys.get(&key).expect("metric key missing")
1058        };
1059        // Simulate the final drop after it has decided to clean up but before
1060        // it obtains the registry lock. The dropped claim is still counted in
1061        // this window.
1062        let duplicate = registry.register(
1063            key.0,
1064            "vote count".to_string(),
1065            Vec::new(),
1066            Arc::new(raw::Counter::<u64>::default()),
1067        );
1068        assert!(Arc::ptr_eq(&original_metric, &duplicate.metric));
1069
1070        registry.inner.lock().release_registration(original_id);
1071
1072        duplicate.inc_by(7);
1073        let encoded = registry.encode();
1074        assert!(
1075            encoded.contains("votes_total 7"),
1076            "last drop removed duplicate registration: {encoded}"
1077        );
1078
1079        drop(duplicate);
1080        let registry = registry.inner.lock();
1081        assert!(
1082            registry.keys.is_empty(),
1083            "keys left behind: {:?}",
1084            registry.keys
1085        );
1086        assert!(
1087            registry.families.is_empty(),
1088            "families left behind: {:?}",
1089            registry.families
1090        );
1091    }
1092
1093    #[test]
1094    fn test_registered_with_registration_notifies_on_last_drop() {
1095        struct NotifyOnDrop(mpsc::Sender<&'static str>);
1096
1097        impl Drop for NotifyOnDrop {
1098            fn drop(&mut self) {
1099                let _ = self.0.send("dropped");
1100            }
1101        }
1102
1103        let (tx, rx) = mpsc::channel();
1104        let registered = Registered::with_registration(
1105            raw::Counter::<u64>::default(),
1106            Registration::from(NotifyOnDrop(tx)),
1107        );
1108        let clone = registered.clone();
1109
1110        drop(registered);
1111        assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
1112
1113        drop(clone);
1114        assert_eq!(rx.recv().unwrap(), "dropped");
1115        assert!(matches!(rx.try_recv(), Err(TryRecvError::Disconnected)));
1116    }
1117
1118    fn register_counter(registry: &Registry, name: &str, help: &str, value: u64) -> Counter {
1119        let counter = raw::Counter::<u64>::default();
1120        counter.inc_by(value);
1121        registry.register(
1122            name.to_string(),
1123            help.to_string(),
1124            Vec::new(),
1125            Arc::new(counter),
1126        )
1127    }
1128
1129    #[test]
1130    fn test_encode_is_deterministic() {
1131        let registry = Registry::default();
1132        let _beta = register_counter(&registry, "beta", "beta counter", 2);
1133        let _alpha = register_counter(&registry, "alpha", "alpha counter", 1);
1134        let first = registry.encode();
1135        let second = registry.encode();
1136        assert_eq!(first, second);
1137        let alpha = first
1138            .find("# TYPE alpha")
1139            .expect("alpha family header present");
1140        let beta = first
1141            .find("# TYPE beta")
1142            .expect("beta family header present");
1143        assert!(alpha < beta, "families emitted in sorted order: {first}");
1144    }
1145
1146    #[test]
1147    fn test_encode_emits_single_eof() {
1148        let registry = Registry::default();
1149        let _a = register_counter(&registry, "a", "help", 1);
1150        let _b = register_counter(&registry, "b", "help", 2);
1151        let encoded = registry.encode();
1152        assert_eq!(encoded.matches("# EOF").count(), 1);
1153        assert!(
1154            encoded.ends_with("# EOF\n"),
1155            "must terminate with EOF: {encoded}"
1156        );
1157    }
1158
1159    #[test]
1160    fn test_encode_type_aware_suffixes() {
1161        let registry = Registry::default();
1162        let _requests = register_counter(&registry, "requests", "request count", 3);
1163        let histogram = raw::Histogram::new([0.1, 1.0, 10.0]);
1164        histogram.observe(0.5);
1165        let _histogram = registry.register(
1166            "latency".to_string(),
1167            "latency seconds".to_string(),
1168            Vec::new(),
1169            Arc::new(histogram),
1170        );
1171        let encoded = registry.encode();
1172        assert!(
1173            encoded.contains("requests_total 3"),
1174            "counter _total suffix: {encoded}"
1175        );
1176        assert!(
1177            encoded.contains("latency_bucket"),
1178            "histogram _bucket suffix: {encoded}"
1179        );
1180        assert!(
1181            encoded.contains("latency_sum"),
1182            "histogram _sum suffix: {encoded}"
1183        );
1184        assert!(
1185            encoded.contains("latency_count"),
1186            "histogram _count suffix: {encoded}"
1187        );
1188    }
1189
1190    #[test]
1191    fn test_encode_shares_family_header_across_attributes() {
1192        let registry = Registry::default();
1193        let c1 = raw::Counter::<u64>::default();
1194        c1.inc();
1195        let _c1 = registry.register(
1196            "votes".to_string(),
1197            "vote count".to_string(),
1198            vec![("epoch".to_string(), "1".to_string())],
1199            Arc::new(c1),
1200        );
1201        let c2 = raw::Counter::<u64>::default();
1202        c2.inc_by(2);
1203        let _c2 = registry.register(
1204            "votes".to_string(),
1205            "vote count".to_string(),
1206            vec![("epoch".to_string(), "2".to_string())],
1207            Arc::new(c2),
1208        );
1209        let encoded = registry.encode();
1210        assert_eq!(
1211            encoded.matches("# HELP votes").count(),
1212            1,
1213            "single HELP: {encoded}"
1214        );
1215        assert_eq!(
1216            encoded.matches("# TYPE votes").count(),
1217            1,
1218            "single TYPE: {encoded}"
1219        );
1220        assert!(encoded.contains("votes_total{epoch=\"1\"} 1"));
1221        assert!(encoded.contains("votes_total{epoch=\"2\"} 2"));
1222    }
1223
1224    #[test]
1225    fn test_encode_registers_without_prefix() {
1226        let registry = Registry::default();
1227        let _registered = register_counter(&registry, "votes", "vote count", 1);
1228        let encoded = registry.encode();
1229        assert!(
1230            encoded.contains("votes_total 1"),
1231            "no prefix applied: {encoded}"
1232        );
1233        assert!(
1234            encoded.starts_with("# HELP votes"),
1235            "family header at start: {encoded}"
1236        );
1237    }
1238
1239    #[test]
1240    fn test_encode_suppresses_empty_family() {
1241        // A Family registered with no child entries should not emit its HELP/TYPE
1242        // descriptor on scrape. This matches upstream prometheus-client's
1243        // `encode_omit_empty` behavior.
1244        let registry = Registry::default();
1245        let empty_family = raw::Family::<Vec<(String, String)>, raw::Counter>::default();
1246        let _empty_family = registry.register(
1247            "votes".to_string(),
1248            "vote count".to_string(),
1249            Vec::new(),
1250            Arc::new(empty_family),
1251        );
1252        let _ticks = register_counter(&registry, "ticks", "tick count", 1);
1253        let encoded = registry.encode();
1254        assert!(!encoded.contains("votes"), "empty family leaked: {encoded}");
1255        assert!(
1256            encoded.contains("ticks_total 1"),
1257            "populated metric missing: {encoded}"
1258        );
1259        assert_eq!(encoded.matches("# EOF").count(), 1);
1260    }
1261
1262    #[test]
1263    fn test_encode_matches_upstream_registry() {
1264        // Byte-for-byte parity between our `Registry::encode` and upstream
1265        // prometheus-client's `registry::Registry::encode` on an equivalent
1266        // metric set. Covers HELP normalization (trailing `.`), TYPE lines,
1267        // counter `_total` suffix, histogram `_bucket`/`_sum`/`_count`, and
1268        // the single final `# EOF`. Our registry emits families in sorted
1269        // order (see `test_encode_is_deterministic`); upstream preserves
1270        // registration order. Register here in sorted order so the parity
1271        // assertion only flags real format divergences.
1272        let counter = raw::Counter::<u64>::default();
1273        counter.inc_by(7);
1274        let gauge = raw::Gauge::<i64>::default();
1275        gauge.set(-3);
1276        let histogram = raw::Histogram::new([0.1, 1.0]);
1277        histogram.observe(0.5);
1278
1279        let ours = Registry::default();
1280        let _latency = ours.register(
1281            "latency".to_string(),
1282            "request latency seconds".to_string(),
1283            Vec::new(),
1284            Arc::new(histogram.clone()),
1285        );
1286        let _level = ours.register(
1287            "level".to_string(),
1288            "current level".to_string(),
1289            Vec::new(),
1290            Arc::new(gauge.clone()),
1291        );
1292        let _votes = ours.register(
1293            "votes".to_string(),
1294            "number of votes".to_string(),
1295            Vec::new(),
1296            Arc::new(counter.clone()),
1297        );
1298        let ours_encoded = ours.encode();
1299
1300        let mut theirs = registry::Registry::default();
1301        theirs.register("latency", "request latency seconds", histogram);
1302        theirs.register("level", "current level", gauge);
1303        theirs.register("votes", "number of votes", counter);
1304        let mut theirs_encoded = String::new();
1305        encode(&mut theirs_encoded, &theirs).expect("upstream encode failed");
1306
1307        assert_eq!(
1308            ours_encoded, theirs_encoded,
1309            "output diverged from upstream prometheus-client registry"
1310        );
1311    }
1312
1313    #[test]
1314    fn test_shuffled_duplicate_drops_do_not_leave_registry_entries() {
1315        let registry = Registry::new();
1316        let mut handles = Vec::new();
1317
1318        for _ in 0..8 {
1319            handles.push(registry.register(
1320                "votes".to_string(),
1321                "vote count".to_string(),
1322                Vec::new(),
1323                Arc::new(raw::Counter::<u64>::default()),
1324            ));
1325        }
1326
1327        for index in [3, 0, 6, 1] {
1328            let _ = handles.swap_remove(index);
1329            handles.push(registry.register(
1330                "votes".to_string(),
1331                "vote count".to_string(),
1332                Vec::new(),
1333                Arc::new(raw::Counter::<u64>::default()),
1334            ));
1335        }
1336
1337        drop(handles);
1338        let registry = registry.inner.lock();
1339        assert!(
1340            registry.keys.is_empty(),
1341            "keys left behind: {:?}",
1342            registry.keys
1343        );
1344        assert!(
1345            registry.families.is_empty(),
1346            "families left behind: {:?}",
1347            registry.families
1348        );
1349    }
1350}