Skip to main content

commonware_runtime/utils/
mod.rs

1//! Utility functions for interacting with any runtime.
2
3use commonware_utils::sync::{Condvar, Mutex};
4use futures::task::ArcWake;
5use prometheus_client::{encoding::text::encode, registry::Registry as PrometheusRegistry};
6use std::{
7    any::Any,
8    collections::BTreeMap,
9    future::Future,
10    pin::Pin,
11    sync::Arc,
12    task::{Context, Poll},
13};
14
15commonware_macros::stability_mod!(BETA, pub mod buffer);
16pub mod signal;
17#[cfg(not(target_arch = "wasm32"))]
18pub(crate) mod thread;
19
20mod handle;
21pub use handle::Handle;
22#[commonware_macros::stability(ALPHA)]
23pub(crate) use handle::Panicked;
24pub(crate) use handle::{Aborter, MetricHandle, Panicker};
25
26mod cell;
27pub use cell::Cell as ContextCell;
28
29pub(crate) mod supervision;
30
31/// The execution mode of a task.
32#[derive(Copy, Clone, Debug)]
33pub enum Execution {
34    /// Task runs on a dedicated thread.
35    Dedicated,
36    /// Task runs on the shared executor. `true` marks short blocking work that should
37    /// use the runtime's blocking-friendly pool.
38    Shared(bool),
39}
40
41impl Default for Execution {
42    fn default() -> Self {
43        Self::Shared(false)
44    }
45}
46
47/// Yield control back to the runtime.
48pub async fn reschedule() {
49    struct Reschedule {
50        yielded: bool,
51    }
52
53    impl Future for Reschedule {
54        type Output = ();
55
56        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
57            if self.yielded {
58                Poll::Ready(())
59            } else {
60                self.yielded = true;
61                cx.waker().wake_by_ref();
62                Poll::Pending
63            }
64        }
65    }
66
67    Reschedule { yielded: false }.await
68}
69
70fn extract_panic_message(err: &(dyn Any + Send)) -> String {
71    err.downcast_ref::<&str>().map_or_else(
72        || {
73            err.downcast_ref::<String>()
74                .map_or_else(|| format!("{err:?}"), |s| s.clone())
75        },
76        |s| s.to_string(),
77    )
78}
79
80/// Synchronization primitive that enables a thread to block until a waker delivers a signal.
81pub struct Blocker {
82    /// Tracks whether a wake-up signal has been delivered (even if wait has not started yet).
83    state: Mutex<bool>,
84    /// Condvar used to park and resume the thread when the signal flips to true.
85    cv: Condvar,
86}
87
88impl Blocker {
89    /// Create a new [Blocker].
90    pub fn new() -> Arc<Self> {
91        Arc::new(Self {
92            state: Mutex::new(false),
93            cv: Condvar::new(),
94        })
95    }
96
97    /// Block the current thread until a waker delivers a signal.
98    pub fn wait(&self) {
99        // Use a loop to tolerate spurious wake-ups and only proceed once a real signal arrives.
100        let mut signaled = self.state.lock();
101        while !*signaled {
102            self.cv.wait(&mut signaled);
103        }
104
105        // Reset the flag so subsequent waits park again until the next wake signal.
106        *signaled = false;
107    }
108}
109
110impl ArcWake for Blocker {
111    fn wake_by_ref(arc_self: &Arc<Self>) {
112        // Mark as signaled (and release lock before notifying).
113        {
114            let mut signaled = arc_self.state.lock();
115            *signaled = true;
116        }
117
118        // Notify a single waiter so the blocked thread re-checks the flag.
119        arc_self.cv.notify_one();
120    }
121}
122
123#[cfg(any(test, feature = "test-utils"))]
124/// Count the number of running tasks whose name starts with the given prefix.
125///
126/// This function encodes metrics and counts tasks that are currently running
127/// (have a value of 1) and whose name starts with the specified prefix.
128///
129/// This is useful for verifying that all child tasks under a given label hierarchy
130/// have been properly shut down.
131///
132/// # Example
133///
134/// ```rust
135/// use commonware_runtime::{Clock, Metrics, Runner, Spawner, deterministic};
136/// use commonware_runtime::utils::count_running_tasks;
137/// use std::time::Duration;
138///
139/// let executor = deterministic::Runner::default();
140/// executor.start(|context| async move {
141///     // Spawn a task under a labeled context
142///     let handle = context.with_label("worker").spawn(|ctx| async move {
143///         ctx.sleep(Duration::from_secs(100)).await;
144///     });
145///
146///     // Allow the task to start
147///     context.sleep(Duration::from_millis(10)).await;
148///
149///     // Count running tasks with "worker" prefix
150///     let count = count_running_tasks(&context, "worker");
151///     assert!(count > 0, "worker task should be running");
152///
153///     // Abort the task
154///     handle.abort();
155///     let _ = handle.await;
156///     context.sleep(Duration::from_millis(10)).await;
157///
158///     // Verify task is stopped
159///     let count = count_running_tasks(&context, "worker");
160///     assert_eq!(count, 0, "worker task should be stopped");
161/// });
162/// ```
163pub fn count_running_tasks(metrics: &impl crate::Metrics, prefix: &str) -> usize {
164    let encoded = metrics.encode();
165    encoded
166        .lines()
167        .filter(|line| {
168            line.starts_with("runtime_tasks_running{")
169                && line.contains("kind=\"Task\"")
170                && line.trim_end().ends_with(" 1")
171                && line
172                    .split("name=\"")
173                    .nth(1)
174                    .is_some_and(|s| s.split('"').next().unwrap_or("").starts_with(prefix))
175        })
176        .count()
177}
178
179/// Validates that a label matches Prometheus metric name format: `[a-zA-Z][a-zA-Z0-9_]*`.
180///
181/// # Panics
182///
183/// Panics if the label is empty, starts with a non-alphabetic character,
184/// or contains characters other than `[a-zA-Z0-9_]`.
185pub fn validate_label(label: &str) {
186    let mut chars = label.chars();
187    assert!(
188        chars.next().is_some_and(|c| c.is_ascii_alphabetic()),
189        "label must start with [a-zA-Z]: {label}"
190    );
191    assert!(
192        chars.all(|c| c.is_ascii_alphanumeric() || c == '_'),
193        "label must only contain [a-zA-Z0-9_]: {label}"
194    );
195}
196
197/// Add an attribute to a sorted attribute list, maintaining sorted order via binary search.
198///
199/// Returns `true` if the key was new, `false` if it was a duplicate (value overwritten).
200pub fn add_attribute(
201    attributes: &mut Vec<(String, String)>,
202    key: &str,
203    value: impl std::fmt::Display,
204) -> bool {
205    let key_string = key.to_string();
206    let value_string = value.to_string();
207
208    match attributes.binary_search_by(|(k, _)| k.cmp(&key_string)) {
209        Ok(pos) => {
210            attributes[pos].1 = value_string;
211            false
212        }
213        Err(pos) => {
214            attributes.insert(pos, (key_string, value_string));
215            true
216        }
217    }
218}
219
220/// A writer that groups metrics by family name and deduplicates HELP/TYPE metadata
221/// during Prometheus encoding.
222///
223/// When the same metric is registered across scoped registries (via
224/// `Registry::encode`), prometheus_client outputs each scope's metrics
225/// separately, interleaving families. This writer collects all lines and
226/// regroups them so that every sample for a given metric name appears
227/// together with a single HELP/TYPE header.
228///
229/// Also strips `# EOF` lines so that `Registry::encode` can append exactly one at
230/// the end of the combined output.
231///
232/// Uses "first wins" semantics: keeps the first HELP/TYPE description encountered
233/// for each metric name and discards subsequent duplicates.
234pub struct MetricEncoder {
235    line_buffer: String,
236    families: BTreeMap<String, MetricFamily>,
237    active_family: Option<String>,
238}
239
240#[derive(Default)]
241struct MetricFamily {
242    help: Option<String>,
243    type_line: Option<String>,
244    unit: Option<String>,
245    metric_type: Option<String>,
246    data: Vec<String>,
247}
248
249/// OpenMetrics data lines use type-specific suffixes that differ from the
250/// base name in HELP/TYPE headers (e.g., a counter named `foo` emits data
251/// as `foo_total`). Each suffix is only valid for specific metric types.
252///
253/// See: <https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#suffixes>
254const TYPED_SUFFIXES: &[(&str, &[&str])] = &[
255    ("_total", &["counter"]),
256    ("_bucket", &["histogram", "gaugehistogram"]),
257    ("_count", &["histogram", "summary"]),
258    ("_sum", &["histogram", "summary"]),
259    ("_gcount", &["gaugehistogram"]),
260    ("_gsum", &["gaugehistogram"]),
261    ("_created", &["counter", "histogram", "summary"]),
262    ("_info", &["info"]),
263];
264
265/// Returns true if `sample_name` can belong to `family_name` (either an
266/// exact match or a valid type-specific suffix per the OpenMetrics spec).
267///
268/// See: <https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#suffixes>
269fn family_accepts_sample(
270    families: &BTreeMap<String, MetricFamily>,
271    family_name: &str,
272    sample_name: &str,
273) -> bool {
274    if sample_name == family_name {
275        return true;
276    }
277    let Some(metric_type) = families
278        .get(family_name)
279        .and_then(|family| family.metric_type.as_deref())
280    else {
281        return false;
282    };
283    let Some(suffix) = sample_name.strip_prefix(family_name) else {
284        return false;
285    };
286    TYPED_SUFFIXES.iter().any(|(known_suffix, valid_types)| {
287        suffix == *known_suffix && valid_types.contains(&metric_type)
288    })
289}
290
291/// Extract the metric name from a sample line: `sample = metricname [labels] SP number ...`
292///
293/// See: <https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#abnf>
294fn extract_metric_name(line: &str) -> &str {
295    let end = line.find(['{', ' ']).unwrap_or(line.len());
296    &line[..end]
297}
298
299impl MetricEncoder {
300    pub const fn new() -> Self {
301        Self {
302            line_buffer: String::new(),
303            families: BTreeMap::new(),
304            active_family: None,
305        }
306    }
307
308    pub fn into_string(mut self) -> String {
309        if !self.line_buffer.is_empty() {
310            self.flush_line();
311        }
312        let total: usize = self
313            .families
314            .values()
315            .map(|f| {
316                f.help.as_ref().map_or(0, |h| h.len() + 1)
317                    + f.type_line.as_ref().map_or(0, |t| t.len() + 1)
318                    + f.unit.as_ref().map_or(0, |u| u.len() + 1)
319                    + f.data.iter().map(|d| d.len() + 1).sum::<usize>()
320            })
321            .sum();
322        let mut output = String::with_capacity(total);
323        for family in self.families.values() {
324            if let Some(help) = &family.help {
325                output.push_str(help);
326                output.push('\n');
327            }
328            if let Some(type_line) = &family.type_line {
329                output.push_str(type_line);
330                output.push('\n');
331            }
332            if let Some(unit) = &family.unit {
333                output.push_str(unit);
334                output.push('\n');
335            }
336            for data in &family.data {
337                output.push_str(data);
338                output.push('\n');
339            }
340        }
341        output
342    }
343
344    /// Resolve a data line's metric name to its family key, inserting a new
345    /// family if none exists, and return a mutable reference to it.
346    ///
347    /// OpenMetrics appends type-specific suffixes to data lines that differ
348    /// from the base name in HELP/TYPE headers (e.g., a counter named "votes"
349    /// emits data as "votes_total"). This method uses the TYPE declaration to
350    /// correctly match suffixed data lines to their family, even when another
351    /// family with the suffixed name exists (e.g., a gauge named "votes_total").
352    fn resolve_data_family(&mut self, name: &str) -> &mut MetricFamily {
353        let key = self.find_typed_family(name).unwrap_or(name);
354        self.families.entry(key.to_string()).or_default()
355    }
356
357    /// Try to find an existing family whose TYPE declaration expects the
358    /// suffix present in `name`.
359    fn find_typed_family<'a>(&self, name: &'a str) -> Option<&'a str> {
360        TYPED_SUFFIXES.iter().find_map(|(suffix, valid_types)| {
361            let base = name.strip_suffix(suffix)?;
362            let family = self.families.get(base)?;
363            let t = family.metric_type.as_deref()?;
364            valid_types.contains(&t).then_some(base)
365        })
366    }
367
368    fn flush_line(&mut self) {
369        let line = std::mem::take(&mut self.line_buffer);
370        if line == "# EOF" {
371            self.active_family = None;
372            return;
373        }
374        if let Some(rest) = line.strip_prefix("# HELP ") {
375            let name = rest.split_whitespace().next().unwrap_or("").to_string();
376            let family = self.families.entry(name.clone()).or_default();
377            if family.help.is_none() {
378                family.help = Some(line);
379            }
380            self.active_family = Some(name);
381        } else if let Some(rest) = line.strip_prefix("# TYPE ") {
382            let mut parts = rest.split_whitespace();
383            let name = parts.next().unwrap_or("").to_string();
384            let metric_type = parts.next().map(|s| s.to_string());
385            let family = self.families.entry(name.clone()).or_default();
386            if family.type_line.is_none() {
387                family.type_line = Some(line);
388                family.metric_type = metric_type;
389            }
390            self.active_family = Some(name);
391        } else if let Some(rest) = line.strip_prefix("# UNIT ") {
392            let name = rest.split_whitespace().next().unwrap_or("").to_string();
393            let family = self.families.entry(name.clone()).or_default();
394            if family.unit.is_none() {
395                family.unit = Some(line);
396            }
397            self.active_family = Some(name);
398        } else {
399            let name = extract_metric_name(&line);
400            if let Some(family_name) = &self.active_family {
401                if family_accepts_sample(&self.families, family_name, name) {
402                    self.families
403                        .get_mut(family_name.as_str())
404                        .unwrap()
405                        .data
406                        .push(line);
407                    return;
408                }
409            }
410            let family = self.resolve_data_family(name);
411            family.data.push(line);
412        }
413    }
414}
415
416impl Default for MetricEncoder {
417    fn default() -> Self {
418        Self::new()
419    }
420}
421
422impl std::fmt::Write for MetricEncoder {
423    fn write_str(&mut self, s: &str) -> std::fmt::Result {
424        let mut remaining = s;
425        while let Some(pos) = remaining.find('\n') {
426            self.line_buffer.push_str(&remaining[..pos]);
427            self.flush_line();
428            remaining = &remaining[pos + 1..];
429        }
430        self.line_buffer.push_str(remaining);
431        Ok(())
432    }
433}
434
435/// Internal handle that deregisters a metric scope when dropped.
436///
437/// Stored inside contexts via `Arc<ScopeGuard>`. When the last context clone
438/// holding this handle is dropped, the scope's metrics are automatically removed.
439pub(crate) struct ScopeGuard {
440    scope_id: u64,
441    cleanup: Option<Box<dyn FnOnce(u64) + Send + Sync>>,
442}
443
444impl ScopeGuard {
445    pub(crate) fn new(scope_id: u64, cleanup: impl FnOnce(u64) + Send + Sync + 'static) -> Self {
446        Self {
447            scope_id,
448            cleanup: Some(Box::new(cleanup)),
449        }
450    }
451
452    pub(crate) const fn scope_id(&self) -> u64 {
453        self.scope_id
454    }
455}
456
457impl Drop for ScopeGuard {
458    fn drop(&mut self) {
459        if let Some(cleanup) = self.cleanup.take() {
460            cleanup(self.scope_id);
461        }
462    }
463}
464
465/// Manages multiple prometheus registries with lifecycle-based scoping.
466///
467/// Holds a permanent root registry for long-lived metrics (runtime internals)
468/// and a collection of scoped registries that can be removed when the associated
469/// work (e.g., an epoch's consensus engine) is done.
470pub(crate) struct Registry {
471    root: PrometheusRegistry,
472    scopes: BTreeMap<u64, PrometheusRegistry>,
473    next_scope_id: u64,
474}
475
476impl Registry {
477    pub fn new() -> Self {
478        Self {
479            root: PrometheusRegistry::default(),
480            scopes: BTreeMap::new(),
481            next_scope_id: 0,
482        }
483    }
484
485    pub const fn root_mut(&mut self) -> &mut PrometheusRegistry {
486        &mut self.root
487    }
488
489    pub fn create_scope(&mut self) -> u64 {
490        let id = self.next_scope_id;
491        self.next_scope_id = self.next_scope_id.checked_add(1).expect("scope overflow");
492        self.scopes.insert(id, PrometheusRegistry::default());
493        id
494    }
495
496    pub fn get_scope(&mut self, scope: Option<u64>) -> &mut PrometheusRegistry {
497        match scope {
498            None => &mut self.root,
499            Some(id) => self
500                .scopes
501                .get_mut(&id)
502                .unwrap_or_else(|| panic!("scope {id} not found (already deregistered?)")),
503        }
504    }
505
506    pub fn remove_scope(&mut self, id: u64) {
507        self.scopes.remove(&id);
508    }
509
510    pub fn encode(&self) -> String {
511        let mut encoder = MetricEncoder::new();
512        encode(&mut encoder, &self.root).expect("encoding root failed");
513        for registry in self.scopes.values() {
514            encode(&mut encoder, registry).expect("encoding scope failed");
515        }
516        let mut output = encoder.into_string();
517        output.push_str("# EOF\n");
518        output
519    }
520}
521
522#[cfg(test)]
523mod tests {
524    use super::*;
525    use crate::{deterministic, Metrics, Runner};
526    use commonware_macros::test_traced;
527    use futures::task::waker;
528    use prometheus_client::metrics::counter::Counter;
529    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
530
531    fn encode_dedup(input: &str) -> String {
532        use std::fmt::Write;
533        let mut encoder = MetricEncoder::new();
534        encoder.write_str(input).unwrap();
535        encoder.into_string()
536    }
537
538    #[test]
539    fn test_metric_encoder_empty() {
540        assert_eq!(encode_dedup(""), "");
541        assert_eq!(encode_dedup("# EOF\n"), "");
542    }
543
544    #[test]
545    fn test_metric_encoder_no_duplicates() {
546        let input = r#"# HELP foo_total A counter.
547# TYPE foo_total counter
548foo_total 1
549# HELP bar_gauge A gauge.
550# TYPE bar_gauge gauge
551bar_gauge 42
552# EOF
553"#;
554        let expected = r#"# HELP bar_gauge A gauge.
555# TYPE bar_gauge gauge
556bar_gauge 42
557# HELP foo_total A counter.
558# TYPE foo_total counter
559foo_total 1
560"#;
561        assert_eq!(encode_dedup(input), expected);
562    }
563
564    #[test]
565    fn test_metric_encoder_with_duplicates() {
566        let input = r#"# HELP votes_total vote count.
567# TYPE votes_total counter
568votes_total{epoch="e5"} 1
569# HELP votes_total vote count.
570# TYPE votes_total counter
571votes_total{epoch="e6"} 2
572# EOF
573"#;
574        let expected = r#"# HELP votes_total vote count.
575# TYPE votes_total counter
576votes_total{epoch="e5"} 1
577votes_total{epoch="e6"} 2
578"#;
579        assert_eq!(encode_dedup(input), expected);
580    }
581
582    #[test]
583    fn test_metric_encoder_multiple_metrics() {
584        let input = r#"# HELP a_total First.
585# TYPE a_total counter
586a_total{tag="x"} 1
587# HELP b_total Second.
588# TYPE b_total counter
589b_total 5
590# HELP a_total First.
591# TYPE a_total counter
592a_total{tag="y"} 2
593# EOF
594"#;
595        let expected = r#"# HELP a_total First.
596# TYPE a_total counter
597a_total{tag="x"} 1
598a_total{tag="y"} 2
599# HELP b_total Second.
600# TYPE b_total counter
601b_total 5
602"#;
603        assert_eq!(encode_dedup(input), expected);
604    }
605
606    #[test]
607    fn test_metric_encoder_groups_by_name() {
608        let input = r#"# HELP a_total First.
609# TYPE a_total counter
610a_total{tag="x"} 1
611# HELP b_total Second.
612# TYPE b_total counter
613b_total 5
614# HELP a_total First.
615# TYPE a_total counter
616a_total{tag="y"} 2
617# EOF
618"#;
619        let expected = r#"# HELP a_total First.
620# TYPE a_total counter
621a_total{tag="x"} 1
622a_total{tag="y"} 2
623# HELP b_total Second.
624# TYPE b_total counter
625b_total 5
626"#;
627        assert_eq!(encode_dedup(input), expected);
628    }
629
630    #[test]
631    fn test_metric_encoder_deterministic_order() {
632        let input = r#"# HELP z First alphabetically last.
633# TYPE z counter
634z_total 1
635# HELP a Last alphabetically first.
636# TYPE a counter
637a_total 2
638# EOF
639"#;
640        let expected = r#"# HELP a Last alphabetically first.
641# TYPE a counter
642a_total 2
643# HELP z First alphabetically last.
644# TYPE z counter
645z_total 1
646"#;
647        assert_eq!(encode_dedup(input), expected);
648    }
649
650    #[test]
651    fn test_metric_encoder_counter_suffix_grouping() {
652        // prometheus_client uses the base name for HELP/TYPE (e.g., "ab_votes")
653        // but appends "_total" to the data line (e.g., "ab_votes_total").
654        // A metric whose name sorts between these (e.g., "ab_votes_size")
655        // must not split the family.
656        let input = r#"# HELP ab_votes vote count.
657# TYPE ab_votes counter
658ab_votes_total{epoch="1"} 1
659# HELP ab_votes_size size gauge.
660# TYPE ab_votes_size gauge
661ab_votes_size 99
662# HELP ab_votes vote count.
663# TYPE ab_votes counter
664ab_votes_total{epoch="2"} 2
665# EOF
666"#;
667        let expected = r#"# HELP ab_votes vote count.
668# TYPE ab_votes counter
669ab_votes_total{epoch="1"} 1
670ab_votes_total{epoch="2"} 2
671# HELP ab_votes_size size gauge.
672# TYPE ab_votes_size gauge
673ab_votes_size 99
674"#;
675        assert_eq!(encode_dedup(input), expected);
676    }
677
678    #[test]
679    fn test_metric_encoder_type_aware_suffix() {
680        // A gauge named "foo_total" and a counter named "foo" both produce
681        // data lines called "foo_total". The encoder must use the TYPE info
682        // to route each data line to the correct family.
683        let input = r#"# HELP foo_total A gauge.
684# TYPE foo_total gauge
685foo_total 42
686# HELP foo A counter.
687# TYPE foo counter
688foo_total 1
689# EOF
690"#;
691        let expected = r#"# HELP foo A counter.
692# TYPE foo counter
693foo_total 1
694# HELP foo_total A gauge.
695# TYPE foo_total gauge
696foo_total 42
697"#;
698        assert_eq!(encode_dedup(input), expected);
699    }
700
701    #[test]
702    fn test_metric_encoder_literal_suffix_family_not_hijacked() {
703        // A family may legitimately end with a reserved OpenMetrics suffix.
704        // The encoder must not always remap "foo_created" to base family "foo".
705        let input = r#"# HELP foo A counter.
706# TYPE foo counter
707foo_total 1
708# HELP foo_created A gauge.
709# TYPE foo_created gauge
710foo_created 42
711# EOF
712"#;
713        let expected = r#"# HELP foo A counter.
714# TYPE foo counter
715foo_total 1
716# HELP foo_created A gauge.
717# TYPE foo_created gauge
718foo_created 42
719"#;
720        assert_eq!(encode_dedup(input), expected);
721    }
722
723    #[test]
724    fn test_metric_encoder_type_aware_suffix_interleaved_segments() {
725        // Two families may emit lines with the same sample name (`foo_total`):
726        // a counter named `foo` and a gauge named `foo_total`.
727        //
728        // Repeated counter descriptors (as emitted by separate scoped registries)
729        // must keep all counter samples in family `foo` and not leak them into
730        // family `foo_total`.
731        let input = r#"# HELP foo Counter.
732# TYPE foo counter
733foo_total{scope="a"} 1
734# HELP foo_total Gauge.
735# TYPE foo_total gauge
736foo_total 42
737# HELP foo Counter.
738# TYPE foo counter
739foo_total{scope="b"} 2
740# EOF
741"#;
742        let expected = r#"# HELP foo Counter.
743# TYPE foo counter
744foo_total{scope="a"} 1
745foo_total{scope="b"} 2
746# HELP foo_total Gauge.
747# TYPE foo_total gauge
748foo_total 42
749"#;
750        assert_eq!(encode_dedup(input), expected);
751    }
752
753    #[test]
754    fn test_metric_encoder_unit_metadata_is_grouped() {
755        let input = r#"# HELP latency Latency histogram.
756# TYPE latency histogram
757# UNIT latency seconds
758latency_sum 1.2
759latency_count 3
760# HELP requests Requests.
761# TYPE requests counter
762requests_total 9
763# EOF
764"#;
765        let expected = r#"# HELP latency Latency histogram.
766# TYPE latency histogram
767# UNIT latency seconds
768latency_sum 1.2
769latency_count 3
770# HELP requests Requests.
771# TYPE requests counter
772requests_total 9
773"#;
774        assert_eq!(encode_dedup(input), expected);
775    }
776
777    #[test]
778    fn test_metric_encoder_unit_metadata_deduped_across_segments() {
779        let input = r#"# HELP req Requests.
780# TYPE req counter
781# UNIT req requests
782req_total{scope="a"} 1
783# HELP req Requests.
784# TYPE req counter
785# UNIT req requests
786req_total{scope="b"} 2
787# EOF
788"#;
789        let expected = r#"# HELP req Requests.
790# TYPE req counter
791# UNIT req requests
792req_total{scope="a"} 1
793req_total{scope="b"} 2
794"#;
795        assert_eq!(encode_dedup(input), expected);
796    }
797
798    #[test]
799    fn test_metric_encoder_fallback_uses_typed_suffix_even_if_literal_exists() {
800        // Regression test for a buggy fallback that preferred `contains_key(name)`
801        // before typed-suffix mapping.
802        //
803        // Here `foo_total` exists as its own counter family, but a sample named
804        // `foo_total` can only come from counter family `foo` (because
805        // `foo_total` counter samples are `foo_total_total`).
806        //
807        // We force fallback mode by ending descriptor groups with `# EOF`, which
808        // clears `active_family`.
809        let input = r#"# HELP foo_total Counter with literal suffix.
810# TYPE foo_total counter
811foo_total_total 9
812# EOF
813# HELP foo Base counter.
814# TYPE foo counter
815# EOF
816foo_total{scope="x"} 1
817# EOF
818"#;
819        let expected = r#"# HELP foo Base counter.
820# TYPE foo counter
821foo_total{scope="x"} 1
822# HELP foo_total Counter with literal suffix.
823# TYPE foo_total counter
824foo_total_total 9
825"#;
826        assert_eq!(encode_dedup(input), expected);
827    }
828
829    #[test]
830    fn test_metric_encoder_strips_intermediate_eof() {
831        let input = r#"# HELP a_total Root.
832# TYPE a_total counter
833a_total 1
834# EOF
835# HELP b_total Scoped.
836# TYPE b_total counter
837b_total 2
838# EOF
839"#;
840        let expected = r#"# HELP a_total Root.
841# TYPE a_total counter
842a_total 1
843# HELP b_total Scoped.
844# TYPE b_total counter
845b_total 2
846"#;
847        assert_eq!(encode_dedup(input), expected);
848    }
849
850    #[test]
851    fn test_blocker_waits_until_wake() {
852        let blocker = Blocker::new();
853        let started = Arc::new(AtomicBool::new(false));
854        let completed = Arc::new(AtomicBool::new(false));
855
856        let thread_blocker = blocker.clone();
857        let thread_started = started.clone();
858        let thread_completed = completed.clone();
859        let handle = std::thread::spawn(move || {
860            thread_started.store(true, Ordering::SeqCst);
861            thread_blocker.wait();
862            thread_completed.store(true, Ordering::SeqCst);
863        });
864
865        while !started.load(Ordering::SeqCst) {
866            std::thread::yield_now();
867        }
868
869        assert!(!completed.load(Ordering::SeqCst));
870        waker(blocker).wake();
871        handle.join().unwrap();
872        assert!(completed.load(Ordering::SeqCst));
873    }
874
875    #[test]
876    fn test_blocker_handles_pre_wake() {
877        let blocker = Blocker::new();
878        waker(blocker.clone()).wake();
879
880        let completed = Arc::new(AtomicBool::new(false));
881        let thread_blocker = blocker;
882        let thread_completed = completed.clone();
883        std::thread::spawn(move || {
884            thread_blocker.wait();
885            thread_completed.store(true, Ordering::SeqCst);
886        })
887        .join()
888        .unwrap();
889
890        assert!(completed.load(Ordering::SeqCst));
891    }
892
893    #[test]
894    fn test_blocker_reusable_across_signals() {
895        let blocker = Blocker::new();
896        let completed = Arc::new(AtomicUsize::new(0));
897
898        let thread_blocker = blocker.clone();
899        let thread_completed = completed.clone();
900        let handle = std::thread::spawn(move || {
901            for _ in 0..2 {
902                thread_blocker.wait();
903                thread_completed.fetch_add(1, Ordering::SeqCst);
904            }
905        });
906
907        for expected in 1..=2 {
908            waker(blocker.clone()).wake();
909            while completed.load(Ordering::SeqCst) < expected {
910                std::thread::yield_now();
911            }
912        }
913
914        handle.join().unwrap();
915        assert_eq!(completed.load(Ordering::SeqCst), 2);
916    }
917
918    #[test_traced]
919    fn test_count_running_tasks() {
920        use crate::{Metrics, Runner, Spawner};
921        use futures::future;
922
923        let executor = deterministic::Runner::default();
924        executor.start(|context| async move {
925            // Initially no tasks with "worker" prefix
926            assert_eq!(
927                count_running_tasks(&context, "worker"),
928                0,
929                "no worker tasks initially"
930            );
931
932            // Spawn a task under a labeled context that stays running
933            let worker_ctx = context.with_label("worker");
934            let handle1 = worker_ctx.clone().spawn(|_| async move {
935                future::pending::<()>().await;
936            });
937
938            // Count running tasks with "worker" prefix
939            let count = count_running_tasks(&context, "worker");
940            assert_eq!(count, 1, "worker task should be running");
941
942            // Non-matching prefix should return 0
943            assert_eq!(
944                count_running_tasks(&context, "other"),
945                0,
946                "no tasks with 'other' prefix"
947            );
948
949            // Spawn a nested task (worker_child)
950            let handle2 = worker_ctx.with_label("child").spawn(|_| async move {
951                future::pending::<()>().await;
952            });
953
954            // Count should include both parent and nested tasks
955            let count = count_running_tasks(&context, "worker");
956            assert_eq!(count, 2, "both worker and worker_child should be counted");
957
958            // Abort parent task
959            handle1.abort();
960            let _ = handle1.await;
961
962            // Only nested task remains
963            let count = count_running_tasks(&context, "worker");
964            assert_eq!(count, 1, "only worker_child should remain");
965
966            // Abort nested task
967            handle2.abort();
968            let _ = handle2.await;
969
970            // All tasks stopped
971            assert_eq!(
972                count_running_tasks(&context, "worker"),
973                0,
974                "all worker tasks should be stopped"
975            );
976        });
977    }
978
979    #[test_traced]
980    fn test_no_duplicate_metrics() {
981        let executor = deterministic::Runner::default();
982        executor.start(|context| async move {
983            // Register metrics under different labels (no duplicates)
984            let c1 = Counter::<u64>::default();
985            context.with_label("a").register("test", "help", c1);
986            let c2 = Counter::<u64>::default();
987            context.with_label("b").register("test", "help", c2);
988        });
989        // Test passes if runtime doesn't panic on shutdown
990    }
991
992    #[test]
993    #[should_panic(expected = "duplicate metric:")]
994    fn test_duplicate_metrics_panics() {
995        let executor = deterministic::Runner::default();
996        executor.start(|context| async move {
997            // Register metrics with the same label, causing duplicates
998            let c1 = Counter::<u64>::default();
999            context.with_label("a").register("test", "help", c1);
1000            let c2 = Counter::<u64>::default();
1001            context.with_label("a").register("test", "help", c2);
1002        });
1003    }
1004}