Skip to main content

commonware_runtime/utils/
mod.rs

1//! Utility functions for interacting with any runtime.
2
3use commonware_utils::sync::{Condvar, Mutex};
4use futures::task::ArcWake;
5use prometheus_client::{encoding::text::encode, registry::Registry as PrometheusRegistry};
6use std::{
7    any::Any,
8    collections::BTreeMap,
9    future::Future,
10    pin::Pin,
11    sync::Arc,
12    task::{Context, Poll},
13};
14
15commonware_macros::stability_mod!(BETA, pub mod buffer);
16pub mod signal;
17
18mod handle;
19pub use handle::Handle;
20#[commonware_macros::stability(ALPHA)]
21pub(crate) use handle::Panicked;
22pub(crate) use handle::{Aborter, MetricHandle, Panicker};
23
24mod cell;
25pub use cell::Cell as ContextCell;
26
27pub(crate) mod supervision;
28
29/// The execution mode of a task.
30#[derive(Copy, Clone, Debug)]
31pub enum Execution {
32    /// Task runs on a dedicated thread.
33    Dedicated,
34    /// Task runs on the shared executor. `true` marks short blocking work that should
35    /// use the runtime's blocking-friendly pool.
36    Shared(bool),
37}
38
39impl Default for Execution {
40    fn default() -> Self {
41        Self::Shared(false)
42    }
43}
44
45/// Yield control back to the runtime.
46pub async fn reschedule() {
47    struct Reschedule {
48        yielded: bool,
49    }
50
51    impl Future for Reschedule {
52        type Output = ();
53
54        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
55            if self.yielded {
56                Poll::Ready(())
57            } else {
58                self.yielded = true;
59                cx.waker().wake_by_ref();
60                Poll::Pending
61            }
62        }
63    }
64
65    Reschedule { yielded: false }.await
66}
67
68fn extract_panic_message(err: &(dyn Any + Send)) -> String {
69    err.downcast_ref::<&str>().map_or_else(
70        || {
71            err.downcast_ref::<String>()
72                .map_or_else(|| format!("{err:?}"), |s| s.clone())
73        },
74        |s| s.to_string(),
75    )
76}
77
78/// Synchronization primitive that enables a thread to block until a waker delivers a signal.
79pub struct Blocker {
80    /// Tracks whether a wake-up signal has been delivered (even if wait has not started yet).
81    state: Mutex<bool>,
82    /// Condvar used to park and resume the thread when the signal flips to true.
83    cv: Condvar,
84}
85
86impl Blocker {
87    /// Create a new [Blocker].
88    pub fn new() -> Arc<Self> {
89        Arc::new(Self {
90            state: Mutex::new(false),
91            cv: Condvar::new(),
92        })
93    }
94
95    /// Block the current thread until a waker delivers a signal.
96    pub fn wait(&self) {
97        // Use a loop to tolerate spurious wake-ups and only proceed once a real signal arrives.
98        let mut signaled = self.state.lock();
99        while !*signaled {
100            self.cv.wait(&mut signaled);
101        }
102
103        // Reset the flag so subsequent waits park again until the next wake signal.
104        *signaled = false;
105    }
106}
107
108impl ArcWake for Blocker {
109    fn wake_by_ref(arc_self: &Arc<Self>) {
110        // Mark as signaled (and release lock before notifying).
111        {
112            let mut signaled = arc_self.state.lock();
113            *signaled = true;
114        }
115
116        // Notify a single waiter so the blocked thread re-checks the flag.
117        arc_self.cv.notify_one();
118    }
119}
120
121#[cfg(any(test, feature = "test-utils"))]
122/// Count the number of running tasks whose name starts with the given prefix.
123///
124/// This function encodes metrics and counts tasks that are currently running
125/// (have a value of 1) and whose name starts with the specified prefix.
126///
127/// This is useful for verifying that all child tasks under a given label hierarchy
128/// have been properly shut down.
129///
130/// # Example
131///
132/// ```rust
133/// use commonware_runtime::{Clock, Metrics, Runner, Spawner, deterministic};
134/// use commonware_runtime::utils::count_running_tasks;
135/// use std::time::Duration;
136///
137/// let executor = deterministic::Runner::default();
138/// executor.start(|context| async move {
139///     // Spawn a task under a labeled context
140///     let handle = context.with_label("worker").spawn(|ctx| async move {
141///         ctx.sleep(Duration::from_secs(100)).await;
142///     });
143///
144///     // Allow the task to start
145///     context.sleep(Duration::from_millis(10)).await;
146///
147///     // Count running tasks with "worker" prefix
148///     let count = count_running_tasks(&context, "worker");
149///     assert!(count > 0, "worker task should be running");
150///
151///     // Abort the task
152///     handle.abort();
153///     let _ = handle.await;
154///     context.sleep(Duration::from_millis(10)).await;
155///
156///     // Verify task is stopped
157///     let count = count_running_tasks(&context, "worker");
158///     assert_eq!(count, 0, "worker task should be stopped");
159/// });
160/// ```
161pub fn count_running_tasks(metrics: &impl crate::Metrics, prefix: &str) -> usize {
162    let encoded = metrics.encode();
163    encoded
164        .lines()
165        .filter(|line| {
166            line.starts_with("runtime_tasks_running{")
167                && line.contains("kind=\"Task\"")
168                && line.trim_end().ends_with(" 1")
169                && line
170                    .split("name=\"")
171                    .nth(1)
172                    .is_some_and(|s| s.split('"').next().unwrap_or("").starts_with(prefix))
173        })
174        .count()
175}
176
177/// Validates that a label matches Prometheus metric name format: `[a-zA-Z][a-zA-Z0-9_]*`.
178///
179/// # Panics
180///
181/// Panics if the label is empty, starts with a non-alphabetic character,
182/// or contains characters other than `[a-zA-Z0-9_]`.
183pub fn validate_label(label: &str) {
184    let mut chars = label.chars();
185    assert!(
186        chars.next().is_some_and(|c| c.is_ascii_alphabetic()),
187        "label must start with [a-zA-Z]: {label}"
188    );
189    assert!(
190        chars.all(|c| c.is_ascii_alphanumeric() || c == '_'),
191        "label must only contain [a-zA-Z0-9_]: {label}"
192    );
193}
194
195/// Add an attribute to a sorted attribute list, maintaining sorted order via binary search.
196///
197/// Returns `true` if the key was new, `false` if it was a duplicate (value overwritten).
198pub fn add_attribute(
199    attributes: &mut Vec<(String, String)>,
200    key: &str,
201    value: impl std::fmt::Display,
202) -> bool {
203    let key_string = key.to_string();
204    let value_string = value.to_string();
205
206    match attributes.binary_search_by(|(k, _)| k.cmp(&key_string)) {
207        Ok(pos) => {
208            attributes[pos].1 = value_string;
209            false
210        }
211        Err(pos) => {
212            attributes.insert(pos, (key_string, value_string));
213            true
214        }
215    }
216}
217
218/// A writer that groups metrics by family name and deduplicates HELP/TYPE metadata
219/// during Prometheus encoding.
220///
221/// When the same metric is registered across scoped registries (via
222/// `Registry::encode`), prometheus_client outputs each scope's metrics
223/// separately, interleaving families. This writer collects all lines and
224/// regroups them so that every sample for a given metric name appears
225/// together with a single HELP/TYPE header.
226///
227/// Also strips `# EOF` lines so that `Registry::encode` can append exactly one at
228/// the end of the combined output.
229///
230/// Uses "first wins" semantics: keeps the first HELP/TYPE description encountered
231/// for each metric name and discards subsequent duplicates.
232pub struct MetricEncoder {
233    line_buffer: String,
234    families: BTreeMap<String, MetricFamily>,
235    active_family: Option<String>,
236}
237
238#[derive(Default)]
239struct MetricFamily {
240    help: Option<String>,
241    type_line: Option<String>,
242    unit: Option<String>,
243    metric_type: Option<String>,
244    data: Vec<String>,
245}
246
247/// OpenMetrics data lines use type-specific suffixes that differ from the
248/// base name in HELP/TYPE headers (e.g., a counter named `foo` emits data
249/// as `foo_total`). Each suffix is only valid for specific metric types.
250///
251/// See: <https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#suffixes>
252const TYPED_SUFFIXES: &[(&str, &[&str])] = &[
253    ("_total", &["counter"]),
254    ("_bucket", &["histogram", "gaugehistogram"]),
255    ("_count", &["histogram", "summary"]),
256    ("_sum", &["histogram", "summary"]),
257    ("_gcount", &["gaugehistogram"]),
258    ("_gsum", &["gaugehistogram"]),
259    ("_created", &["counter", "histogram", "summary"]),
260    ("_info", &["info"]),
261];
262
263/// Returns true if `sample_name` can belong to `family_name` (either an
264/// exact match or a valid type-specific suffix per the OpenMetrics spec).
265///
266/// See: <https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#suffixes>
267fn family_accepts_sample(
268    families: &BTreeMap<String, MetricFamily>,
269    family_name: &str,
270    sample_name: &str,
271) -> bool {
272    if sample_name == family_name {
273        return true;
274    }
275    let Some(metric_type) = families
276        .get(family_name)
277        .and_then(|family| family.metric_type.as_deref())
278    else {
279        return false;
280    };
281    let Some(suffix) = sample_name.strip_prefix(family_name) else {
282        return false;
283    };
284    TYPED_SUFFIXES.iter().any(|(known_suffix, valid_types)| {
285        suffix == *known_suffix && valid_types.contains(&metric_type)
286    })
287}
288
289/// Extract the metric name from a sample line: `sample = metricname [labels] SP number ...`
290///
291/// See: <https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#abnf>
292fn extract_metric_name(line: &str) -> &str {
293    let end = line.find(['{', ' ']).unwrap_or(line.len());
294    &line[..end]
295}
296
297impl MetricEncoder {
298    pub const fn new() -> Self {
299        Self {
300            line_buffer: String::new(),
301            families: BTreeMap::new(),
302            active_family: None,
303        }
304    }
305
306    pub fn into_string(mut self) -> String {
307        if !self.line_buffer.is_empty() {
308            self.flush_line();
309        }
310        let total: usize = self
311            .families
312            .values()
313            .map(|f| {
314                f.help.as_ref().map_or(0, |h| h.len() + 1)
315                    + f.type_line.as_ref().map_or(0, |t| t.len() + 1)
316                    + f.unit.as_ref().map_or(0, |u| u.len() + 1)
317                    + f.data.iter().map(|d| d.len() + 1).sum::<usize>()
318            })
319            .sum();
320        let mut output = String::with_capacity(total);
321        for family in self.families.values() {
322            if let Some(help) = &family.help {
323                output.push_str(help);
324                output.push('\n');
325            }
326            if let Some(type_line) = &family.type_line {
327                output.push_str(type_line);
328                output.push('\n');
329            }
330            if let Some(unit) = &family.unit {
331                output.push_str(unit);
332                output.push('\n');
333            }
334            for data in &family.data {
335                output.push_str(data);
336                output.push('\n');
337            }
338        }
339        output
340    }
341
342    /// Resolve a data line's metric name to its family key, inserting a new
343    /// family if none exists, and return a mutable reference to it.
344    ///
345    /// OpenMetrics appends type-specific suffixes to data lines that differ
346    /// from the base name in HELP/TYPE headers (e.g., a counter named "votes"
347    /// emits data as "votes_total"). This method uses the TYPE declaration to
348    /// correctly match suffixed data lines to their family, even when another
349    /// family with the suffixed name exists (e.g., a gauge named "votes_total").
350    fn resolve_data_family(&mut self, name: &str) -> &mut MetricFamily {
351        let key = self.find_typed_family(name).unwrap_or(name);
352        self.families.entry(key.to_string()).or_default()
353    }
354
355    /// Try to find an existing family whose TYPE declaration expects the
356    /// suffix present in `name`.
357    fn find_typed_family<'a>(&self, name: &'a str) -> Option<&'a str> {
358        TYPED_SUFFIXES.iter().find_map(|(suffix, valid_types)| {
359            let base = name.strip_suffix(suffix)?;
360            let family = self.families.get(base)?;
361            let t = family.metric_type.as_deref()?;
362            valid_types.contains(&t).then_some(base)
363        })
364    }
365
366    fn flush_line(&mut self) {
367        let line = std::mem::take(&mut self.line_buffer);
368        if line == "# EOF" {
369            self.active_family = None;
370            return;
371        }
372        if let Some(rest) = line.strip_prefix("# HELP ") {
373            let name = rest.split_whitespace().next().unwrap_or("").to_string();
374            let family = self.families.entry(name.clone()).or_default();
375            if family.help.is_none() {
376                family.help = Some(line);
377            }
378            self.active_family = Some(name);
379        } else if let Some(rest) = line.strip_prefix("# TYPE ") {
380            let mut parts = rest.split_whitespace();
381            let name = parts.next().unwrap_or("").to_string();
382            let metric_type = parts.next().map(|s| s.to_string());
383            let family = self.families.entry(name.clone()).or_default();
384            if family.type_line.is_none() {
385                family.type_line = Some(line);
386                family.metric_type = metric_type;
387            }
388            self.active_family = Some(name);
389        } else if let Some(rest) = line.strip_prefix("# UNIT ") {
390            let name = rest.split_whitespace().next().unwrap_or("").to_string();
391            let family = self.families.entry(name.clone()).or_default();
392            if family.unit.is_none() {
393                family.unit = Some(line);
394            }
395            self.active_family = Some(name);
396        } else {
397            let name = extract_metric_name(&line);
398            if let Some(family_name) = &self.active_family {
399                if family_accepts_sample(&self.families, family_name, name) {
400                    self.families
401                        .get_mut(family_name.as_str())
402                        .unwrap()
403                        .data
404                        .push(line);
405                    return;
406                }
407            }
408            let family = self.resolve_data_family(name);
409            family.data.push(line);
410        }
411    }
412}
413
414impl Default for MetricEncoder {
415    fn default() -> Self {
416        Self::new()
417    }
418}
419
420impl std::fmt::Write for MetricEncoder {
421    fn write_str(&mut self, s: &str) -> std::fmt::Result {
422        let mut remaining = s;
423        while let Some(pos) = remaining.find('\n') {
424            self.line_buffer.push_str(&remaining[..pos]);
425            self.flush_line();
426            remaining = &remaining[pos + 1..];
427        }
428        self.line_buffer.push_str(remaining);
429        Ok(())
430    }
431}
432
433/// Internal handle that deregisters a metric scope when dropped.
434///
435/// Stored inside contexts via `Arc<ScopeGuard>`. When the last context clone
436/// holding this handle is dropped, the scope's metrics are automatically removed.
437pub(crate) struct ScopeGuard {
438    scope_id: u64,
439    cleanup: Option<Box<dyn FnOnce(u64) + Send + Sync>>,
440}
441
442impl ScopeGuard {
443    pub(crate) fn new(scope_id: u64, cleanup: impl FnOnce(u64) + Send + Sync + 'static) -> Self {
444        Self {
445            scope_id,
446            cleanup: Some(Box::new(cleanup)),
447        }
448    }
449
450    pub(crate) const fn scope_id(&self) -> u64 {
451        self.scope_id
452    }
453}
454
455impl Drop for ScopeGuard {
456    fn drop(&mut self) {
457        if let Some(cleanup) = self.cleanup.take() {
458            cleanup(self.scope_id);
459        }
460    }
461}
462
463/// Manages multiple prometheus registries with lifecycle-based scoping.
464///
465/// Holds a permanent root registry for long-lived metrics (runtime internals)
466/// and a collection of scoped registries that can be removed when the associated
467/// work (e.g., an epoch's consensus engine) is done.
468pub(crate) struct Registry {
469    root: PrometheusRegistry,
470    scopes: BTreeMap<u64, PrometheusRegistry>,
471    next_scope_id: u64,
472}
473
474impl Registry {
475    pub fn new() -> Self {
476        Self {
477            root: PrometheusRegistry::default(),
478            scopes: BTreeMap::new(),
479            next_scope_id: 0,
480        }
481    }
482
483    pub const fn root_mut(&mut self) -> &mut PrometheusRegistry {
484        &mut self.root
485    }
486
487    pub fn create_scope(&mut self) -> u64 {
488        let id = self.next_scope_id;
489        self.next_scope_id = self.next_scope_id.checked_add(1).expect("scope overflow");
490        self.scopes.insert(id, PrometheusRegistry::default());
491        id
492    }
493
494    pub fn get_scope(&mut self, scope: Option<u64>) -> &mut PrometheusRegistry {
495        match scope {
496            None => &mut self.root,
497            Some(id) => self
498                .scopes
499                .get_mut(&id)
500                .unwrap_or_else(|| panic!("scope {id} not found (already deregistered?)")),
501        }
502    }
503
504    pub fn remove_scope(&mut self, id: u64) {
505        self.scopes.remove(&id);
506    }
507
508    pub fn encode(&self) -> String {
509        let mut encoder = MetricEncoder::new();
510        encode(&mut encoder, &self.root).expect("encoding root failed");
511        for registry in self.scopes.values() {
512            encode(&mut encoder, registry).expect("encoding scope failed");
513        }
514        let mut output = encoder.into_string();
515        output.push_str("# EOF\n");
516        output
517    }
518}
519
520#[cfg(test)]
521mod tests {
522    use super::*;
523    use crate::{deterministic, Metrics, Runner};
524    use commonware_macros::test_traced;
525    use futures::task::waker;
526    use prometheus_client::metrics::counter::Counter;
527    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
528
529    fn encode_dedup(input: &str) -> String {
530        use std::fmt::Write;
531        let mut encoder = MetricEncoder::new();
532        encoder.write_str(input).unwrap();
533        encoder.into_string()
534    }
535
536    #[test]
537    fn test_metric_encoder_empty() {
538        assert_eq!(encode_dedup(""), "");
539        assert_eq!(encode_dedup("# EOF\n"), "");
540    }
541
542    #[test]
543    fn test_metric_encoder_no_duplicates() {
544        let input = r#"# HELP foo_total A counter.
545# TYPE foo_total counter
546foo_total 1
547# HELP bar_gauge A gauge.
548# TYPE bar_gauge gauge
549bar_gauge 42
550# EOF
551"#;
552        let expected = r#"# HELP bar_gauge A gauge.
553# TYPE bar_gauge gauge
554bar_gauge 42
555# HELP foo_total A counter.
556# TYPE foo_total counter
557foo_total 1
558"#;
559        assert_eq!(encode_dedup(input), expected);
560    }
561
562    #[test]
563    fn test_metric_encoder_with_duplicates() {
564        let input = r#"# HELP votes_total vote count.
565# TYPE votes_total counter
566votes_total{epoch="e5"} 1
567# HELP votes_total vote count.
568# TYPE votes_total counter
569votes_total{epoch="e6"} 2
570# EOF
571"#;
572        let expected = r#"# HELP votes_total vote count.
573# TYPE votes_total counter
574votes_total{epoch="e5"} 1
575votes_total{epoch="e6"} 2
576"#;
577        assert_eq!(encode_dedup(input), expected);
578    }
579
580    #[test]
581    fn test_metric_encoder_multiple_metrics() {
582        let input = r#"# HELP a_total First.
583# TYPE a_total counter
584a_total{tag="x"} 1
585# HELP b_total Second.
586# TYPE b_total counter
587b_total 5
588# HELP a_total First.
589# TYPE a_total counter
590a_total{tag="y"} 2
591# EOF
592"#;
593        let expected = r#"# HELP a_total First.
594# TYPE a_total counter
595a_total{tag="x"} 1
596a_total{tag="y"} 2
597# HELP b_total Second.
598# TYPE b_total counter
599b_total 5
600"#;
601        assert_eq!(encode_dedup(input), expected);
602    }
603
604    #[test]
605    fn test_metric_encoder_groups_by_name() {
606        let input = r#"# HELP a_total First.
607# TYPE a_total counter
608a_total{tag="x"} 1
609# HELP b_total Second.
610# TYPE b_total counter
611b_total 5
612# HELP a_total First.
613# TYPE a_total counter
614a_total{tag="y"} 2
615# EOF
616"#;
617        let expected = r#"# HELP a_total First.
618# TYPE a_total counter
619a_total{tag="x"} 1
620a_total{tag="y"} 2
621# HELP b_total Second.
622# TYPE b_total counter
623b_total 5
624"#;
625        assert_eq!(encode_dedup(input), expected);
626    }
627
628    #[test]
629    fn test_metric_encoder_deterministic_order() {
630        let input = r#"# HELP z First alphabetically last.
631# TYPE z counter
632z_total 1
633# HELP a Last alphabetically first.
634# TYPE a counter
635a_total 2
636# EOF
637"#;
638        let expected = r#"# HELP a Last alphabetically first.
639# TYPE a counter
640a_total 2
641# HELP z First alphabetically last.
642# TYPE z counter
643z_total 1
644"#;
645        assert_eq!(encode_dedup(input), expected);
646    }
647
648    #[test]
649    fn test_metric_encoder_counter_suffix_grouping() {
650        // prometheus_client uses the base name for HELP/TYPE (e.g., "ab_votes")
651        // but appends "_total" to the data line (e.g., "ab_votes_total").
652        // A metric whose name sorts between these (e.g., "ab_votes_size")
653        // must not split the family.
654        let input = r#"# HELP ab_votes vote count.
655# TYPE ab_votes counter
656ab_votes_total{epoch="1"} 1
657# HELP ab_votes_size size gauge.
658# TYPE ab_votes_size gauge
659ab_votes_size 99
660# HELP ab_votes vote count.
661# TYPE ab_votes counter
662ab_votes_total{epoch="2"} 2
663# EOF
664"#;
665        let expected = r#"# HELP ab_votes vote count.
666# TYPE ab_votes counter
667ab_votes_total{epoch="1"} 1
668ab_votes_total{epoch="2"} 2
669# HELP ab_votes_size size gauge.
670# TYPE ab_votes_size gauge
671ab_votes_size 99
672"#;
673        assert_eq!(encode_dedup(input), expected);
674    }
675
676    #[test]
677    fn test_metric_encoder_type_aware_suffix() {
678        // A gauge named "foo_total" and a counter named "foo" both produce
679        // data lines called "foo_total". The encoder must use the TYPE info
680        // to route each data line to the correct family.
681        let input = r#"# HELP foo_total A gauge.
682# TYPE foo_total gauge
683foo_total 42
684# HELP foo A counter.
685# TYPE foo counter
686foo_total 1
687# EOF
688"#;
689        let expected = r#"# HELP foo A counter.
690# TYPE foo counter
691foo_total 1
692# HELP foo_total A gauge.
693# TYPE foo_total gauge
694foo_total 42
695"#;
696        assert_eq!(encode_dedup(input), expected);
697    }
698
699    #[test]
700    fn test_metric_encoder_literal_suffix_family_not_hijacked() {
701        // A family may legitimately end with a reserved OpenMetrics suffix.
702        // The encoder must not always remap "foo_created" to base family "foo".
703        let input = r#"# HELP foo A counter.
704# TYPE foo counter
705foo_total 1
706# HELP foo_created A gauge.
707# TYPE foo_created gauge
708foo_created 42
709# EOF
710"#;
711        let expected = r#"# HELP foo A counter.
712# TYPE foo counter
713foo_total 1
714# HELP foo_created A gauge.
715# TYPE foo_created gauge
716foo_created 42
717"#;
718        assert_eq!(encode_dedup(input), expected);
719    }
720
721    #[test]
722    fn test_metric_encoder_type_aware_suffix_interleaved_segments() {
723        // Two families may emit lines with the same sample name (`foo_total`):
724        // a counter named `foo` and a gauge named `foo_total`.
725        //
726        // Repeated counter descriptors (as emitted by separate scoped registries)
727        // must keep all counter samples in family `foo` and not leak them into
728        // family `foo_total`.
729        let input = r#"# HELP foo Counter.
730# TYPE foo counter
731foo_total{scope="a"} 1
732# HELP foo_total Gauge.
733# TYPE foo_total gauge
734foo_total 42
735# HELP foo Counter.
736# TYPE foo counter
737foo_total{scope="b"} 2
738# EOF
739"#;
740        let expected = r#"# HELP foo Counter.
741# TYPE foo counter
742foo_total{scope="a"} 1
743foo_total{scope="b"} 2
744# HELP foo_total Gauge.
745# TYPE foo_total gauge
746foo_total 42
747"#;
748        assert_eq!(encode_dedup(input), expected);
749    }
750
751    #[test]
752    fn test_metric_encoder_unit_metadata_is_grouped() {
753        let input = r#"# HELP latency Latency histogram.
754# TYPE latency histogram
755# UNIT latency seconds
756latency_sum 1.2
757latency_count 3
758# HELP requests Requests.
759# TYPE requests counter
760requests_total 9
761# EOF
762"#;
763        let expected = r#"# HELP latency Latency histogram.
764# TYPE latency histogram
765# UNIT latency seconds
766latency_sum 1.2
767latency_count 3
768# HELP requests Requests.
769# TYPE requests counter
770requests_total 9
771"#;
772        assert_eq!(encode_dedup(input), expected);
773    }
774
775    #[test]
776    fn test_metric_encoder_unit_metadata_deduped_across_segments() {
777        let input = r#"# HELP req Requests.
778# TYPE req counter
779# UNIT req requests
780req_total{scope="a"} 1
781# HELP req Requests.
782# TYPE req counter
783# UNIT req requests
784req_total{scope="b"} 2
785# EOF
786"#;
787        let expected = r#"# HELP req Requests.
788# TYPE req counter
789# UNIT req requests
790req_total{scope="a"} 1
791req_total{scope="b"} 2
792"#;
793        assert_eq!(encode_dedup(input), expected);
794    }
795
796    #[test]
797    fn test_metric_encoder_fallback_uses_typed_suffix_even_if_literal_exists() {
798        // Regression test for a buggy fallback that preferred `contains_key(name)`
799        // before typed-suffix mapping.
800        //
801        // Here `foo_total` exists as its own counter family, but a sample named
802        // `foo_total` can only come from counter family `foo` (because
803        // `foo_total` counter samples are `foo_total_total`).
804        //
805        // We force fallback mode by ending descriptor groups with `# EOF`, which
806        // clears `active_family`.
807        let input = r#"# HELP foo_total Counter with literal suffix.
808# TYPE foo_total counter
809foo_total_total 9
810# EOF
811# HELP foo Base counter.
812# TYPE foo counter
813# EOF
814foo_total{scope="x"} 1
815# EOF
816"#;
817        let expected = r#"# HELP foo Base counter.
818# TYPE foo counter
819foo_total{scope="x"} 1
820# HELP foo_total Counter with literal suffix.
821# TYPE foo_total counter
822foo_total_total 9
823"#;
824        assert_eq!(encode_dedup(input), expected);
825    }
826
827    #[test]
828    fn test_metric_encoder_strips_intermediate_eof() {
829        let input = r#"# HELP a_total Root.
830# TYPE a_total counter
831a_total 1
832# EOF
833# HELP b_total Scoped.
834# TYPE b_total counter
835b_total 2
836# EOF
837"#;
838        let expected = r#"# HELP a_total Root.
839# TYPE a_total counter
840a_total 1
841# HELP b_total Scoped.
842# TYPE b_total counter
843b_total 2
844"#;
845        assert_eq!(encode_dedup(input), expected);
846    }
847
848    #[test]
849    fn test_blocker_waits_until_wake() {
850        let blocker = Blocker::new();
851        let started = Arc::new(AtomicBool::new(false));
852        let completed = Arc::new(AtomicBool::new(false));
853
854        let thread_blocker = blocker.clone();
855        let thread_started = started.clone();
856        let thread_completed = completed.clone();
857        let handle = std::thread::spawn(move || {
858            thread_started.store(true, Ordering::SeqCst);
859            thread_blocker.wait();
860            thread_completed.store(true, Ordering::SeqCst);
861        });
862
863        while !started.load(Ordering::SeqCst) {
864            std::thread::yield_now();
865        }
866
867        assert!(!completed.load(Ordering::SeqCst));
868        waker(blocker).wake();
869        handle.join().unwrap();
870        assert!(completed.load(Ordering::SeqCst));
871    }
872
873    #[test]
874    fn test_blocker_handles_pre_wake() {
875        let blocker = Blocker::new();
876        waker(blocker.clone()).wake();
877
878        let completed = Arc::new(AtomicBool::new(false));
879        let thread_blocker = blocker;
880        let thread_completed = completed.clone();
881        std::thread::spawn(move || {
882            thread_blocker.wait();
883            thread_completed.store(true, Ordering::SeqCst);
884        })
885        .join()
886        .unwrap();
887
888        assert!(completed.load(Ordering::SeqCst));
889    }
890
891    #[test]
892    fn test_blocker_reusable_across_signals() {
893        let blocker = Blocker::new();
894        let completed = Arc::new(AtomicUsize::new(0));
895
896        let thread_blocker = blocker.clone();
897        let thread_completed = completed.clone();
898        let handle = std::thread::spawn(move || {
899            for _ in 0..2 {
900                thread_blocker.wait();
901                thread_completed.fetch_add(1, Ordering::SeqCst);
902            }
903        });
904
905        for expected in 1..=2 {
906            waker(blocker.clone()).wake();
907            while completed.load(Ordering::SeqCst) < expected {
908                std::thread::yield_now();
909            }
910        }
911
912        handle.join().unwrap();
913        assert_eq!(completed.load(Ordering::SeqCst), 2);
914    }
915
916    #[test_traced]
917    fn test_count_running_tasks() {
918        use crate::{Metrics, Runner, Spawner};
919        use futures::future;
920
921        let executor = deterministic::Runner::default();
922        executor.start(|context| async move {
923            // Initially no tasks with "worker" prefix
924            assert_eq!(
925                count_running_tasks(&context, "worker"),
926                0,
927                "no worker tasks initially"
928            );
929
930            // Spawn a task under a labeled context that stays running
931            let worker_ctx = context.with_label("worker");
932            let handle1 = worker_ctx.clone().spawn(|_| async move {
933                future::pending::<()>().await;
934            });
935
936            // Count running tasks with "worker" prefix
937            let count = count_running_tasks(&context, "worker");
938            assert_eq!(count, 1, "worker task should be running");
939
940            // Non-matching prefix should return 0
941            assert_eq!(
942                count_running_tasks(&context, "other"),
943                0,
944                "no tasks with 'other' prefix"
945            );
946
947            // Spawn a nested task (worker_child)
948            let handle2 = worker_ctx.with_label("child").spawn(|_| async move {
949                future::pending::<()>().await;
950            });
951
952            // Count should include both parent and nested tasks
953            let count = count_running_tasks(&context, "worker");
954            assert_eq!(count, 2, "both worker and worker_child should be counted");
955
956            // Abort parent task
957            handle1.abort();
958            let _ = handle1.await;
959
960            // Only nested task remains
961            let count = count_running_tasks(&context, "worker");
962            assert_eq!(count, 1, "only worker_child should remain");
963
964            // Abort nested task
965            handle2.abort();
966            let _ = handle2.await;
967
968            // All tasks stopped
969            assert_eq!(
970                count_running_tasks(&context, "worker"),
971                0,
972                "all worker tasks should be stopped"
973            );
974        });
975    }
976
977    #[test_traced]
978    fn test_no_duplicate_metrics() {
979        let executor = deterministic::Runner::default();
980        executor.start(|context| async move {
981            // Register metrics under different labels (no duplicates)
982            let c1 = Counter::<u64>::default();
983            context.with_label("a").register("test", "help", c1);
984            let c2 = Counter::<u64>::default();
985            context.with_label("b").register("test", "help", c2);
986        });
987        // Test passes if runtime doesn't panic on shutdown
988    }
989
990    #[test]
991    #[should_panic(expected = "duplicate metric:")]
992    fn test_duplicate_metrics_panics() {
993        let executor = deterministic::Runner::default();
994        executor.start(|context| async move {
995            // Register metrics with the same label, causing duplicates
996            let c1 = Counter::<u64>::default();
997            context.with_label("a").register("test", "help", c1);
998            let c2 = Counter::<u64>::default();
999            context.with_label("a").register("test", "help", c2);
1000        });
1001    }
1002}