use commonware_utils::sync::{Condvar, Mutex};
use futures::task::ArcWake;
use prometheus_client::{encoding::text::encode, registry::Registry as PrometheusRegistry};
use std::{
any::Any,
collections::BTreeMap,
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
commonware_macros::stability_mod!(BETA, pub mod buffer);
pub mod signal;
#[cfg(not(target_arch = "wasm32"))]
pub(crate) mod thread;
mod handle;
pub use handle::Handle;
#[commonware_macros::stability(ALPHA)]
pub(crate) use handle::Panicked;
pub(crate) use handle::{Aborter, MetricHandle, Panicker};
mod cell;
pub use cell::Cell as ContextCell;
pub(crate) mod supervision;
#[derive(Copy, Clone, Debug)]
pub enum Execution {
Dedicated,
Shared(bool),
}
impl Default for Execution {
fn default() -> Self {
Self::Shared(false)
}
}
pub async fn reschedule() {
struct Reschedule {
yielded: bool,
}
impl Future for Reschedule {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if self.yielded {
Poll::Ready(())
} else {
self.yielded = true;
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
Reschedule { yielded: false }.await
}
fn extract_panic_message(err: &(dyn Any + Send)) -> String {
err.downcast_ref::<&str>().map_or_else(
|| {
err.downcast_ref::<String>()
.map_or_else(|| format!("{err:?}"), |s| s.clone())
},
|s| s.to_string(),
)
}
pub struct Blocker {
state: Mutex<bool>,
cv: Condvar,
}
impl Blocker {
pub fn new() -> Arc<Self> {
Arc::new(Self {
state: Mutex::new(false),
cv: Condvar::new(),
})
}
pub fn wait(&self) {
let mut signaled = self.state.lock();
while !*signaled {
self.cv.wait(&mut signaled);
}
*signaled = false;
}
}
impl ArcWake for Blocker {
fn wake_by_ref(arc_self: &Arc<Self>) {
{
let mut signaled = arc_self.state.lock();
*signaled = true;
}
arc_self.cv.notify_one();
}
}
#[cfg(any(test, feature = "test-utils"))]
pub fn count_running_tasks(metrics: &impl crate::Metrics, prefix: &str) -> usize {
let encoded = metrics.encode();
encoded
.lines()
.filter(|line| {
line.starts_with("runtime_tasks_running{")
&& line.contains("kind=\"Task\"")
&& line.trim_end().ends_with(" 1")
&& line
.split("name=\"")
.nth(1)
.is_some_and(|s| s.split('"').next().unwrap_or("").starts_with(prefix))
})
.count()
}
pub fn validate_label(label: &str) {
let mut chars = label.chars();
assert!(
chars.next().is_some_and(|c| c.is_ascii_alphabetic()),
"label must start with [a-zA-Z]: {label}"
);
assert!(
chars.all(|c| c.is_ascii_alphanumeric() || c == '_'),
"label must only contain [a-zA-Z0-9_]: {label}"
);
}
pub fn add_attribute(
attributes: &mut Vec<(String, String)>,
key: &str,
value: impl std::fmt::Display,
) -> bool {
let key_string = key.to_string();
let value_string = value.to_string();
match attributes.binary_search_by(|(k, _)| k.cmp(&key_string)) {
Ok(pos) => {
attributes[pos].1 = value_string;
false
}
Err(pos) => {
attributes.insert(pos, (key_string, value_string));
true
}
}
}
pub struct MetricEncoder {
line_buffer: String,
families: BTreeMap<String, MetricFamily>,
active_family: Option<String>,
}
#[derive(Default)]
struct MetricFamily {
help: Option<String>,
type_line: Option<String>,
unit: Option<String>,
metric_type: Option<String>,
data: Vec<String>,
}
const TYPED_SUFFIXES: &[(&str, &[&str])] = &[
("_total", &["counter"]),
("_bucket", &["histogram", "gaugehistogram"]),
("_count", &["histogram", "summary"]),
("_sum", &["histogram", "summary"]),
("_gcount", &["gaugehistogram"]),
("_gsum", &["gaugehistogram"]),
("_created", &["counter", "histogram", "summary"]),
("_info", &["info"]),
];
fn family_accepts_sample(
families: &BTreeMap<String, MetricFamily>,
family_name: &str,
sample_name: &str,
) -> bool {
if sample_name == family_name {
return true;
}
let Some(metric_type) = families
.get(family_name)
.and_then(|family| family.metric_type.as_deref())
else {
return false;
};
let Some(suffix) = sample_name.strip_prefix(family_name) else {
return false;
};
TYPED_SUFFIXES.iter().any(|(known_suffix, valid_types)| {
suffix == *known_suffix && valid_types.contains(&metric_type)
})
}
fn extract_metric_name(line: &str) -> &str {
let end = line.find(['{', ' ']).unwrap_or(line.len());
&line[..end]
}
impl MetricEncoder {
pub const fn new() -> Self {
Self {
line_buffer: String::new(),
families: BTreeMap::new(),
active_family: None,
}
}
pub fn into_string(mut self) -> String {
if !self.line_buffer.is_empty() {
self.flush_line();
}
let total: usize = self
.families
.values()
.map(|f| {
f.help.as_ref().map_or(0, |h| h.len() + 1)
+ f.type_line.as_ref().map_or(0, |t| t.len() + 1)
+ f.unit.as_ref().map_or(0, |u| u.len() + 1)
+ f.data.iter().map(|d| d.len() + 1).sum::<usize>()
})
.sum();
let mut output = String::with_capacity(total);
for family in self.families.values() {
if let Some(help) = &family.help {
output.push_str(help);
output.push('\n');
}
if let Some(type_line) = &family.type_line {
output.push_str(type_line);
output.push('\n');
}
if let Some(unit) = &family.unit {
output.push_str(unit);
output.push('\n');
}
for data in &family.data {
output.push_str(data);
output.push('\n');
}
}
output
}
fn resolve_data_family(&mut self, name: &str) -> &mut MetricFamily {
let key = self.find_typed_family(name).unwrap_or(name);
self.families.entry(key.to_string()).or_default()
}
fn find_typed_family<'a>(&self, name: &'a str) -> Option<&'a str> {
TYPED_SUFFIXES.iter().find_map(|(suffix, valid_types)| {
let base = name.strip_suffix(suffix)?;
let family = self.families.get(base)?;
let t = family.metric_type.as_deref()?;
valid_types.contains(&t).then_some(base)
})
}
fn flush_line(&mut self) {
let line = std::mem::take(&mut self.line_buffer);
if line == "# EOF" {
self.active_family = None;
return;
}
if let Some(rest) = line.strip_prefix("# HELP ") {
let name = rest.split_whitespace().next().unwrap_or("").to_string();
let family = self.families.entry(name.clone()).or_default();
if family.help.is_none() {
family.help = Some(line);
}
self.active_family = Some(name);
} else if let Some(rest) = line.strip_prefix("# TYPE ") {
let mut parts = rest.split_whitespace();
let name = parts.next().unwrap_or("").to_string();
let metric_type = parts.next().map(|s| s.to_string());
let family = self.families.entry(name.clone()).or_default();
if family.type_line.is_none() {
family.type_line = Some(line);
family.metric_type = metric_type;
}
self.active_family = Some(name);
} else if let Some(rest) = line.strip_prefix("# UNIT ") {
let name = rest.split_whitespace().next().unwrap_or("").to_string();
let family = self.families.entry(name.clone()).or_default();
if family.unit.is_none() {
family.unit = Some(line);
}
self.active_family = Some(name);
} else {
let name = extract_metric_name(&line);
if let Some(family_name) = &self.active_family {
if family_accepts_sample(&self.families, family_name, name) {
self.families
.get_mut(family_name.as_str())
.unwrap()
.data
.push(line);
return;
}
}
let family = self.resolve_data_family(name);
family.data.push(line);
}
}
}
impl Default for MetricEncoder {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Write for MetricEncoder {
fn write_str(&mut self, s: &str) -> std::fmt::Result {
let mut remaining = s;
while let Some(pos) = remaining.find('\n') {
self.line_buffer.push_str(&remaining[..pos]);
self.flush_line();
remaining = &remaining[pos + 1..];
}
self.line_buffer.push_str(remaining);
Ok(())
}
}
pub(crate) struct ScopeGuard {
scope_id: u64,
cleanup: Option<Box<dyn FnOnce(u64) + Send + Sync>>,
}
impl ScopeGuard {
pub(crate) fn new(scope_id: u64, cleanup: impl FnOnce(u64) + Send + Sync + 'static) -> Self {
Self {
scope_id,
cleanup: Some(Box::new(cleanup)),
}
}
pub(crate) const fn scope_id(&self) -> u64 {
self.scope_id
}
}
impl Drop for ScopeGuard {
fn drop(&mut self) {
if let Some(cleanup) = self.cleanup.take() {
cleanup(self.scope_id);
}
}
}
pub(crate) struct Registry {
root: PrometheusRegistry,
scopes: BTreeMap<u64, PrometheusRegistry>,
next_scope_id: u64,
}
impl Registry {
pub fn new() -> Self {
Self {
root: PrometheusRegistry::default(),
scopes: BTreeMap::new(),
next_scope_id: 0,
}
}
pub const fn root_mut(&mut self) -> &mut PrometheusRegistry {
&mut self.root
}
pub fn create_scope(&mut self) -> u64 {
let id = self.next_scope_id;
self.next_scope_id = self.next_scope_id.checked_add(1).expect("scope overflow");
self.scopes.insert(id, PrometheusRegistry::default());
id
}
pub fn get_scope(&mut self, scope: Option<u64>) -> &mut PrometheusRegistry {
match scope {
None => &mut self.root,
Some(id) => self
.scopes
.get_mut(&id)
.unwrap_or_else(|| panic!("scope {id} not found (already deregistered?)")),
}
}
pub fn remove_scope(&mut self, id: u64) {
self.scopes.remove(&id);
}
pub fn encode(&self) -> String {
let mut encoder = MetricEncoder::new();
encode(&mut encoder, &self.root).expect("encoding root failed");
for registry in self.scopes.values() {
encode(&mut encoder, registry).expect("encoding scope failed");
}
let mut output = encoder.into_string();
output.push_str("# EOF\n");
output
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{deterministic, Metrics, Runner};
use commonware_macros::test_traced;
use futures::task::waker;
use prometheus_client::metrics::counter::Counter;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
fn encode_dedup(input: &str) -> String {
use std::fmt::Write;
let mut encoder = MetricEncoder::new();
encoder.write_str(input).unwrap();
encoder.into_string()
}
#[test]
fn test_metric_encoder_empty() {
assert_eq!(encode_dedup(""), "");
assert_eq!(encode_dedup("# EOF\n"), "");
}
#[test]
fn test_metric_encoder_no_duplicates() {
let input = r#"# HELP foo_total A counter.
# TYPE foo_total counter
foo_total 1
# HELP bar_gauge A gauge.
# TYPE bar_gauge gauge
bar_gauge 42
# EOF
"#;
let expected = r#"# HELP bar_gauge A gauge.
# TYPE bar_gauge gauge
bar_gauge 42
# HELP foo_total A counter.
# TYPE foo_total counter
foo_total 1
"#;
assert_eq!(encode_dedup(input), expected);
}
#[test]
fn test_metric_encoder_with_duplicates() {
let input = r#"# HELP votes_total vote count.
# TYPE votes_total counter
votes_total{epoch="e5"} 1
# HELP votes_total vote count.
# TYPE votes_total counter
votes_total{epoch="e6"} 2
# EOF
"#;
let expected = r#"# HELP votes_total vote count.
# TYPE votes_total counter
votes_total{epoch="e5"} 1
votes_total{epoch="e6"} 2
"#;
assert_eq!(encode_dedup(input), expected);
}
#[test]
fn test_metric_encoder_multiple_metrics() {
let input = r#"# HELP a_total First.
# TYPE a_total counter
a_total{tag="x"} 1
# HELP b_total Second.
# TYPE b_total counter
b_total 5
# HELP a_total First.
# TYPE a_total counter
a_total{tag="y"} 2
# EOF
"#;
let expected = r#"# HELP a_total First.
# TYPE a_total counter
a_total{tag="x"} 1
a_total{tag="y"} 2
# HELP b_total Second.
# TYPE b_total counter
b_total 5
"#;
assert_eq!(encode_dedup(input), expected);
}
#[test]
fn test_metric_encoder_groups_by_name() {
let input = r#"# HELP a_total First.
# TYPE a_total counter
a_total{tag="x"} 1
# HELP b_total Second.
# TYPE b_total counter
b_total 5
# HELP a_total First.
# TYPE a_total counter
a_total{tag="y"} 2
# EOF
"#;
let expected = r#"# HELP a_total First.
# TYPE a_total counter
a_total{tag="x"} 1
a_total{tag="y"} 2
# HELP b_total Second.
# TYPE b_total counter
b_total 5
"#;
assert_eq!(encode_dedup(input), expected);
}
#[test]
fn test_metric_encoder_deterministic_order() {
let input = r#"# HELP z First alphabetically last.
# TYPE z counter
z_total 1
# HELP a Last alphabetically first.
# TYPE a counter
a_total 2
# EOF
"#;
let expected = r#"# HELP a Last alphabetically first.
# TYPE a counter
a_total 2
# HELP z First alphabetically last.
# TYPE z counter
z_total 1
"#;
assert_eq!(encode_dedup(input), expected);
}
#[test]
fn test_metric_encoder_counter_suffix_grouping() {
let input = r#"# HELP ab_votes vote count.
# TYPE ab_votes counter
ab_votes_total{epoch="1"} 1
# HELP ab_votes_size size gauge.
# TYPE ab_votes_size gauge
ab_votes_size 99
# HELP ab_votes vote count.
# TYPE ab_votes counter
ab_votes_total{epoch="2"} 2
# EOF
"#;
let expected = r#"# HELP ab_votes vote count.
# TYPE ab_votes counter
ab_votes_total{epoch="1"} 1
ab_votes_total{epoch="2"} 2
# HELP ab_votes_size size gauge.
# TYPE ab_votes_size gauge
ab_votes_size 99
"#;
assert_eq!(encode_dedup(input), expected);
}
#[test]
fn test_metric_encoder_type_aware_suffix() {
let input = r#"# HELP foo_total A gauge.
# TYPE foo_total gauge
foo_total 42
# HELP foo A counter.
# TYPE foo counter
foo_total 1
# EOF
"#;
let expected = r#"# HELP foo A counter.
# TYPE foo counter
foo_total 1
# HELP foo_total A gauge.
# TYPE foo_total gauge
foo_total 42
"#;
assert_eq!(encode_dedup(input), expected);
}
#[test]
fn test_metric_encoder_literal_suffix_family_not_hijacked() {
let input = r#"# HELP foo A counter.
# TYPE foo counter
foo_total 1
# HELP foo_created A gauge.
# TYPE foo_created gauge
foo_created 42
# EOF
"#;
let expected = r#"# HELP foo A counter.
# TYPE foo counter
foo_total 1
# HELP foo_created A gauge.
# TYPE foo_created gauge
foo_created 42
"#;
assert_eq!(encode_dedup(input), expected);
}
#[test]
fn test_metric_encoder_type_aware_suffix_interleaved_segments() {
let input = r#"# HELP foo Counter.
# TYPE foo counter
foo_total{scope="a"} 1
# HELP foo_total Gauge.
# TYPE foo_total gauge
foo_total 42
# HELP foo Counter.
# TYPE foo counter
foo_total{scope="b"} 2
# EOF
"#;
let expected = r#"# HELP foo Counter.
# TYPE foo counter
foo_total{scope="a"} 1
foo_total{scope="b"} 2
# HELP foo_total Gauge.
# TYPE foo_total gauge
foo_total 42
"#;
assert_eq!(encode_dedup(input), expected);
}
#[test]
fn test_metric_encoder_unit_metadata_is_grouped() {
let input = r#"# HELP latency Latency histogram.
# TYPE latency histogram
# UNIT latency seconds
latency_sum 1.2
latency_count 3
# HELP requests Requests.
# TYPE requests counter
requests_total 9
# EOF
"#;
let expected = r#"# HELP latency Latency histogram.
# TYPE latency histogram
# UNIT latency seconds
latency_sum 1.2
latency_count 3
# HELP requests Requests.
# TYPE requests counter
requests_total 9
"#;
assert_eq!(encode_dedup(input), expected);
}
#[test]
fn test_metric_encoder_unit_metadata_deduped_across_segments() {
let input = r#"# HELP req Requests.
# TYPE req counter
# UNIT req requests
req_total{scope="a"} 1
# HELP req Requests.
# TYPE req counter
# UNIT req requests
req_total{scope="b"} 2
# EOF
"#;
let expected = r#"# HELP req Requests.
# TYPE req counter
# UNIT req requests
req_total{scope="a"} 1
req_total{scope="b"} 2
"#;
assert_eq!(encode_dedup(input), expected);
}
#[test]
fn test_metric_encoder_fallback_uses_typed_suffix_even_if_literal_exists() {
let input = r#"# HELP foo_total Counter with literal suffix.
# TYPE foo_total counter
foo_total_total 9
# EOF
# HELP foo Base counter.
# TYPE foo counter
# EOF
foo_total{scope="x"} 1
# EOF
"#;
let expected = r#"# HELP foo Base counter.
# TYPE foo counter
foo_total{scope="x"} 1
# HELP foo_total Counter with literal suffix.
# TYPE foo_total counter
foo_total_total 9
"#;
assert_eq!(encode_dedup(input), expected);
}
#[test]
fn test_metric_encoder_strips_intermediate_eof() {
let input = r#"# HELP a_total Root.
# TYPE a_total counter
a_total 1
# EOF
# HELP b_total Scoped.
# TYPE b_total counter
b_total 2
# EOF
"#;
let expected = r#"# HELP a_total Root.
# TYPE a_total counter
a_total 1
# HELP b_total Scoped.
# TYPE b_total counter
b_total 2
"#;
assert_eq!(encode_dedup(input), expected);
}
#[test]
fn test_blocker_waits_until_wake() {
let blocker = Blocker::new();
let started = Arc::new(AtomicBool::new(false));
let completed = Arc::new(AtomicBool::new(false));
let thread_blocker = blocker.clone();
let thread_started = started.clone();
let thread_completed = completed.clone();
let handle = std::thread::spawn(move || {
thread_started.store(true, Ordering::SeqCst);
thread_blocker.wait();
thread_completed.store(true, Ordering::SeqCst);
});
while !started.load(Ordering::SeqCst) {
std::thread::yield_now();
}
assert!(!completed.load(Ordering::SeqCst));
waker(blocker).wake();
handle.join().unwrap();
assert!(completed.load(Ordering::SeqCst));
}
#[test]
fn test_blocker_handles_pre_wake() {
let blocker = Blocker::new();
waker(blocker.clone()).wake();
let completed = Arc::new(AtomicBool::new(false));
let thread_blocker = blocker;
let thread_completed = completed.clone();
std::thread::spawn(move || {
thread_blocker.wait();
thread_completed.store(true, Ordering::SeqCst);
})
.join()
.unwrap();
assert!(completed.load(Ordering::SeqCst));
}
#[test]
fn test_blocker_reusable_across_signals() {
let blocker = Blocker::new();
let completed = Arc::new(AtomicUsize::new(0));
let thread_blocker = blocker.clone();
let thread_completed = completed.clone();
let handle = std::thread::spawn(move || {
for _ in 0..2 {
thread_blocker.wait();
thread_completed.fetch_add(1, Ordering::SeqCst);
}
});
for expected in 1..=2 {
waker(blocker.clone()).wake();
while completed.load(Ordering::SeqCst) < expected {
std::thread::yield_now();
}
}
handle.join().unwrap();
assert_eq!(completed.load(Ordering::SeqCst), 2);
}
#[test_traced]
fn test_count_running_tasks() {
use crate::{Metrics, Runner, Spawner};
use futures::future;
let executor = deterministic::Runner::default();
executor.start(|context| async move {
assert_eq!(
count_running_tasks(&context, "worker"),
0,
"no worker tasks initially"
);
let worker_ctx = context.with_label("worker");
let handle1 = worker_ctx.clone().spawn(|_| async move {
future::pending::<()>().await;
});
let count = count_running_tasks(&context, "worker");
assert_eq!(count, 1, "worker task should be running");
assert_eq!(
count_running_tasks(&context, "other"),
0,
"no tasks with 'other' prefix"
);
let handle2 = worker_ctx.with_label("child").spawn(|_| async move {
future::pending::<()>().await;
});
let count = count_running_tasks(&context, "worker");
assert_eq!(count, 2, "both worker and worker_child should be counted");
handle1.abort();
let _ = handle1.await;
let count = count_running_tasks(&context, "worker");
assert_eq!(count, 1, "only worker_child should remain");
handle2.abort();
let _ = handle2.await;
assert_eq!(
count_running_tasks(&context, "worker"),
0,
"all worker tasks should be stopped"
);
});
}
#[test_traced]
fn test_no_duplicate_metrics() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let c1 = Counter::<u64>::default();
context.with_label("a").register("test", "help", c1);
let c2 = Counter::<u64>::default();
context.with_label("b").register("test", "help", c2);
});
}
#[test]
#[should_panic(expected = "duplicate metric:")]
fn test_duplicate_metrics_panics() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let c1 = Counter::<u64>::default();
context.with_label("a").register("test", "help", c1);
let c2 = Counter::<u64>::default();
context.with_label("a").register("test", "help", c2);
});
}
}