use crate::types::{CancelKind, Outcome, RegionId, TaskId};
use std::collections::BTreeMap;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
use std::time::Duration;
#[derive(Debug)]
pub struct Counter {
name: String,
value: AtomicU64,
}
impl Counter {
pub(crate) fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
value: AtomicU64::new(0),
}
}
pub fn increment(&self) {
self.add(1);
}
pub fn add(&self, value: u64) {
self.value.fetch_add(value, Ordering::Relaxed);
}
pub fn get(&self) -> u64 {
self.value.load(Ordering::Relaxed)
}
pub fn name(&self) -> &str {
&self.name
}
}
#[derive(Debug)]
pub struct Gauge {
name: String,
value: AtomicI64,
}
impl Gauge {
pub(crate) fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
value: AtomicI64::new(0),
}
}
pub fn set(&self, value: i64) {
self.value.store(value, Ordering::Relaxed);
}
pub fn increment(&self) {
self.add(1);
}
pub fn decrement(&self) {
self.sub(1);
}
pub fn add(&self, value: i64) {
self.value.fetch_add(value, Ordering::Relaxed);
}
pub fn sub(&self, value: i64) {
self.value.fetch_sub(value, Ordering::Relaxed);
}
pub fn get(&self) -> i64 {
self.value.load(Ordering::Relaxed)
}
pub fn name(&self) -> &str {
&self.name
}
}
#[derive(Debug)]
pub struct Histogram {
name: String,
buckets: Vec<f64>,
counts: Vec<AtomicU64>,
sum: AtomicU64, count: AtomicU64,
}
#[derive(Debug, Clone, PartialEq)]
pub struct HistogramSnapshot {
pub name: String,
pub bucket_boundaries: Vec<f64>,
pub bucket_counts: Vec<u64>,
pub count: u64,
pub sum: f64,
}
impl Histogram {
pub(crate) fn new(name: impl Into<String>, buckets: Vec<f64>) -> Self {
let mut buckets = buckets;
buckets.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let len = buckets.len();
let mut counts = Vec::with_capacity(len + 1);
for _ in 0..=len {
counts.push(AtomicU64::new(0));
}
Self {
name: name.into(),
buckets,
counts,
sum: AtomicU64::new(0),
count: AtomicU64::new(0),
}
}
pub fn observe(&self, value: f64) {
let idx = self
.buckets
.iter()
.position(|&b| value <= b)
.unwrap_or(self.buckets.len());
self.counts[idx].fetch_add(1, Ordering::Relaxed);
self.count.fetch_add(1, Ordering::Relaxed);
let mut current = self.sum.load(Ordering::Relaxed);
loop {
let current_f64 = f64::from_bits(current);
let new_f64 = current_f64 + value;
let new_bits = if new_f64.is_finite() {
new_f64.to_bits()
} else {
current };
match self.sum.compare_exchange_weak(
current,
new_bits,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(v) => current = v,
}
}
}
pub fn count(&self) -> u64 {
self.count.load(Ordering::Relaxed)
}
pub fn sum(&self) -> f64 {
f64::from_bits(self.sum.load(Ordering::Relaxed))
}
pub fn name(&self) -> &str {
&self.name
}
#[must_use]
pub fn snapshot(&self) -> HistogramSnapshot {
HistogramSnapshot {
name: self.name.clone(),
bucket_boundaries: self.buckets.clone(),
bucket_counts: self
.counts
.iter()
.map(|atomic| atomic.load(Ordering::Relaxed))
.collect(),
count: self.count(),
sum: self.sum(),
}
}
#[cfg(all(test, feature = "metrics"))]
pub(crate) fn bucket_counts(&self) -> Vec<u64> {
self.counts
.iter()
.map(|atomic| atomic.load(Ordering::Relaxed))
.collect()
}
#[cfg(all(test, feature = "metrics"))]
pub(crate) fn reset(&self) {
for count in &self.counts {
count.store(0, Ordering::Relaxed);
}
self.count.store(0, Ordering::Relaxed);
self.sum.store(0.0f64.to_bits(), Ordering::Relaxed);
}
#[cfg(all(test, feature = "metrics"))]
pub(crate) fn mean(&self) -> f64 {
let total_count = self.count();
if total_count == 0 {
0.0
} else {
self.sum() / (total_count as f64)
}
}
#[cfg(all(test, feature = "metrics"))]
pub(crate) fn bucket_boundaries(&self) -> &[f64] {
&self.buckets
}
#[cfg(test)]
pub(crate) fn percentile(&self, p: f64) -> Option<f64> {
if !(0.0..=1.0).contains(&p) || self.count() == 0 {
return None;
}
let total = self.count();
let target_rank = if p == 0.0 {
1
} else {
let rank_f64 = (total as f64) * p;
if rank_f64.is_finite() && rank_f64 <= (u64::MAX as f64) {
rank_f64.ceil() as u64
} else {
total }
};
let mut cumulative = 0_u64;
for (i, count) in self
.counts
.iter()
.enumerate()
.map(|(i, count)| (i, count.load(Ordering::Relaxed)))
{
cumulative += count;
if cumulative >= target_rank {
if i == self.buckets.len() {
return None;
}
return Some(self.buckets[i]);
}
}
None
}
}
#[derive(Debug)]
pub struct Summary {
name: String,
values: Mutex<Vec<f64>>,
sum: AtomicU64, count: AtomicU64,
}
impl Summary {
pub(crate) fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
values: Mutex::new(Vec::new()),
sum: AtomicU64::new(0.0f64.to_bits()),
count: AtomicU64::new(0),
}
}
pub fn observe(&self, value: f64) {
self.values
.lock()
.expect("summary values mutex poisoned")
.push(value);
self.count.fetch_add(1, Ordering::Relaxed);
let mut current = self.sum.load(Ordering::Relaxed);
loop {
let current_f64 = f64::from_bits(current);
let new_f64 = current_f64 + value;
let new_bits = if new_f64.is_finite() {
new_f64.to_bits()
} else {
current };
match self.sum.compare_exchange_weak(
current,
new_bits,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(v) => current = v,
}
}
}
pub fn count(&self) -> u64 {
self.count.load(Ordering::Relaxed)
}
pub fn sum(&self) -> f64 {
f64::from_bits(self.sum.load(Ordering::Relaxed))
}
pub fn name(&self) -> &str {
&self.name
}
pub fn quantile(&self, q: f64) -> Option<f64> {
if !(0.0..=1.0).contains(&q) {
return None;
}
let mut values = self
.values
.lock()
.expect("summary values mutex poisoned")
.clone();
if values.is_empty() {
return None;
}
values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let last_index = values.len() - 1;
let rank_f64 = (last_index as f64) * q;
let rank = if rank_f64.is_finite() && rank_f64 <= (usize::MAX as f64) {
rank_f64.round() as usize
} else {
last_index };
values.get(rank).copied()
}
}
pub const DEFAULT_METRIC_CARDINALITY_CAP: usize = 10_000;
const OVERFLOW_METRIC_NAME: &str = "asupersync_metric_cardinality_overflow";
#[derive(Debug)]
pub struct Metrics {
counters: BTreeMap<String, Arc<Counter>>,
gauges: BTreeMap<String, Arc<Gauge>>,
histograms: BTreeMap<String, Arc<Histogram>>,
summaries: BTreeMap<String, Arc<Summary>>,
cardinality_cap: usize,
overflow_warned_counter: AtomicBool,
overflow_warned_gauge: AtomicBool,
overflow_warned_histogram: AtomicBool,
overflow_warned_summary: AtomicBool,
overflow_rejections_counter: AtomicU64,
overflow_rejections_gauge: AtomicU64,
overflow_rejections_histogram: AtomicU64,
overflow_rejections_summary: AtomicU64,
}
impl Default for Metrics {
fn default() -> Self {
Self::with_cardinality_cap(DEFAULT_METRIC_CARDINALITY_CAP)
}
}
impl Metrics {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_cardinality_cap(cap: usize) -> Self {
Self {
counters: BTreeMap::new(),
gauges: BTreeMap::new(),
histograms: BTreeMap::new(),
summaries: BTreeMap::new(),
cardinality_cap: cap,
overflow_warned_counter: AtomicBool::new(false),
overflow_warned_gauge: AtomicBool::new(false),
overflow_warned_histogram: AtomicBool::new(false),
overflow_warned_summary: AtomicBool::new(false),
overflow_rejections_counter: AtomicU64::new(0),
overflow_rejections_gauge: AtomicU64::new(0),
overflow_rejections_histogram: AtomicU64::new(0),
overflow_rejections_summary: AtomicU64::new(0),
}
}
#[must_use]
pub fn cardinality_cap(&self) -> usize {
self.cardinality_cap
}
#[must_use]
pub fn overflow_rejections(&self) -> (u64, u64, u64, u64) {
(
self.overflow_rejections_counter.load(Ordering::Relaxed),
self.overflow_rejections_gauge.load(Ordering::Relaxed),
self.overflow_rejections_histogram.load(Ordering::Relaxed),
self.overflow_rejections_summary.load(Ordering::Relaxed),
)
}
#[inline]
fn cap_would_reject(
cardinality_cap: usize,
map_len: usize,
name: &str,
contains: bool,
) -> bool {
if cardinality_cap == 0 {
return false; }
if contains {
return false; }
if name == OVERFLOW_METRIC_NAME {
return false;
}
map_len >= cardinality_cap
}
pub fn counter(&mut self, name: &str) -> Arc<Counter> {
let contains = self.counters.contains_key(name);
if Self::cap_would_reject(self.cardinality_cap, self.counters.len(), name, contains) {
self.overflow_rejections_counter
.fetch_add(1, Ordering::Relaxed);
if !self.overflow_warned_counter.swap(true, Ordering::Relaxed) {
crate::tracing_compat::warn!(
"metrics: counter cardinality cap ({}) reached; \
subsequent fresh names route to '{}' bucket. \
Inspect Metrics::overflow_rejections() to monitor pressure.",
self.cardinality_cap,
OVERFLOW_METRIC_NAME
);
}
return self
.counters
.entry(OVERFLOW_METRIC_NAME.to_string())
.or_insert_with(|| Arc::new(Counter::new(OVERFLOW_METRIC_NAME)))
.clone();
}
self.counters
.entry(name.to_string())
.or_insert_with(|| Arc::new(Counter::new(name)))
.clone()
}
pub fn gauge(&mut self, name: &str) -> Arc<Gauge> {
let contains = self.gauges.contains_key(name);
if Self::cap_would_reject(self.cardinality_cap, self.gauges.len(), name, contains) {
self.overflow_rejections_gauge
.fetch_add(1, Ordering::Relaxed);
if !self.overflow_warned_gauge.swap(true, Ordering::Relaxed) {
crate::tracing_compat::warn!(
"metrics: gauge cardinality cap ({}) reached; \
subsequent fresh names route to '{}' bucket.",
self.cardinality_cap,
OVERFLOW_METRIC_NAME
);
}
return self
.gauges
.entry(OVERFLOW_METRIC_NAME.to_string())
.or_insert_with(|| Arc::new(Gauge::new(OVERFLOW_METRIC_NAME)))
.clone();
}
self.gauges
.entry(name.to_string())
.or_insert_with(|| Arc::new(Gauge::new(name)))
.clone()
}
pub fn histogram(&mut self, name: &str, buckets: Vec<f64>) -> Arc<Histogram> {
let contains = self.histograms.contains_key(name);
if Self::cap_would_reject(self.cardinality_cap, self.histograms.len(), name, contains) {
self.overflow_rejections_histogram
.fetch_add(1, Ordering::Relaxed);
if !self.overflow_warned_histogram.swap(true, Ordering::Relaxed) {
crate::tracing_compat::warn!(
"metrics: histogram cardinality cap ({}) reached; \
subsequent fresh names route to '{}' bucket. \
Overflow histogram uses the FIRST seen bucket layout.",
self.cardinality_cap,
OVERFLOW_METRIC_NAME
);
}
return self
.histograms
.entry(OVERFLOW_METRIC_NAME.to_string())
.or_insert_with(|| Arc::new(Histogram::new(OVERFLOW_METRIC_NAME, buckets)))
.clone();
}
self.histograms
.entry(name.to_string())
.or_insert_with(|| Arc::new(Histogram::new(name, buckets)))
.clone()
}
pub fn summary(&mut self, name: &str) -> Arc<Summary> {
let contains = self.summaries.contains_key(name);
if Self::cap_would_reject(self.cardinality_cap, self.summaries.len(), name, contains) {
self.overflow_rejections_summary
.fetch_add(1, Ordering::Relaxed);
if !self.overflow_warned_summary.swap(true, Ordering::Relaxed) {
crate::tracing_compat::warn!(
"metrics: summary cardinality cap ({}) reached; \
subsequent fresh names route to '{}' bucket.",
self.cardinality_cap,
OVERFLOW_METRIC_NAME
);
}
return self
.summaries
.entry(OVERFLOW_METRIC_NAME.to_string())
.or_insert_with(|| Arc::new(Summary::new(OVERFLOW_METRIC_NAME)))
.clone();
}
self.summaries
.entry(name.to_string())
.or_insert_with(|| Arc::new(Summary::new(name)))
.clone()
}
#[must_use]
pub fn export_prometheus(&self) -> String {
use std::fmt::Write;
let mut output = String::new();
for (name, counter) in &self.counters {
let Some(name) = sanitize_prometheus_metric_name(name) else {
continue;
};
let _ = writeln!(output, "# TYPE {name} counter");
let _ = writeln!(output, "{name} {}", counter.get());
}
for (name, gauge) in &self.gauges {
let Some(name) = sanitize_prometheus_metric_name(name) else {
continue;
};
let _ = writeln!(output, "# TYPE {name} gauge");
let _ = writeln!(output, "{name} {}", gauge.get());
}
for (name, hist) in &self.histograms {
let Some(name) = sanitize_prometheus_metric_name(name) else {
continue;
};
let _ = writeln!(output, "# TYPE {name} histogram");
let mut cumulative = 0;
for (i, count) in hist.counts.iter().enumerate() {
let val = count.load(Ordering::Relaxed);
cumulative += val;
let le = if i < hist.buckets.len() {
hist.buckets[i].to_string()
} else {
"+Inf".to_string()
};
let le = escape_prometheus_label_value(&le);
let _ = writeln!(output, "{name}_bucket{{le=\"{le}\"}} {cumulative}");
}
let _ = writeln!(output, "{name}_sum {}", hist.sum());
let _ = writeln!(output, "{name}_count {}", hist.count());
}
for (name, summary) in &self.summaries {
let Some(name) = sanitize_prometheus_metric_name(name) else {
continue;
};
let _ = writeln!(output, "# TYPE {name} summary");
for quantile in [0.5, 0.9, 0.99] {
if let Some(value) = summary.quantile(quantile) {
let q = escape_prometheus_label_value(&quantile.to_string());
let _ = writeln!(output, "{name}{{quantile=\"{q}\"}} {value}");
}
}
let _ = writeln!(output, "{name}_sum {}", summary.sum());
let _ = writeln!(output, "{name}_count {}", summary.count());
}
output
}
}
fn sanitize_prometheus_metric_name(name: &str) -> Option<String> {
if name.is_empty() {
return None;
}
let mut out = String::with_capacity(name.len());
for (i, b) in name.bytes().enumerate() {
let allowed = if i == 0 {
b.is_ascii_alphabetic() || b == b'_' || b == b':'
} else {
b.is_ascii_alphanumeric() || b == b'_' || b == b':'
};
out.push(if allowed { b as char } else { '_' });
}
Some(out)
}
#[allow(dead_code)] fn sanitize_prometheus_label_name(name: &str) -> Option<String> {
if name.is_empty() {
return None;
}
let mut out = String::with_capacity(name.len());
for (i, b) in name.bytes().enumerate() {
let allowed = if i == 0 {
b.is_ascii_alphabetic() || b == b'_'
} else {
b.is_ascii_alphanumeric() || b == b'_'
};
out.push(if allowed { b as char } else { '_' });
}
Some(out)
}
fn escape_prometheus_label_value(value: &str) -> String {
let mut out = String::with_capacity(value.len());
for c in value.chars() {
match c {
'\\' => out.push_str(r"\\"),
'\n' => out.push_str(r"\n"),
'"' => {
out.push('\\');
out.push('"');
}
'\r' => out.push_str(r"\r"),
'\t' => out.push_str(r"\t"),
'\u{0000}' => out.push_str(r"\x00"),
'\u{2028}' => out.push_str(r"\u{2028}"),
'\u{2029}' => out.push_str(r"\u{2029}"),
c if (c as u32) < 0x20 || c == '\u{007F}' => {
use std::fmt::Write;
let _ = write!(&mut out, "\\x{:02x}", c as u32);
}
c if (0x80..=0x9F).contains(&(c as u32)) => {
use std::fmt::Write;
let _ = write!(&mut out, "\\x{:02x}", c as u32);
}
_ => out.push(c),
}
}
out
}
#[derive(Debug, Clone, Copy)]
pub enum MetricValue {
Counter(u64),
Gauge(i64),
Histogram(u64, f64),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum OutcomeKind {
Ok,
Err,
Cancelled,
Panicked,
}
impl<T, E> From<&Outcome<T, E>> for OutcomeKind {
fn from(outcome: &Outcome<T, E>) -> Self {
match outcome {
Outcome::Ok(_) => Self::Ok,
Outcome::Err(_) => Self::Err,
Outcome::Cancelled(_) => Self::Cancelled,
Outcome::Panicked(_) => Self::Panicked,
}
}
}
pub trait MetricsProvider: Send + Sync + 'static {
fn task_spawned(&self, region_id: RegionId, task_id: TaskId);
fn task_completed(&self, task_id: TaskId, outcome: OutcomeKind, duration: Duration);
fn region_created(&self, region_id: RegionId, parent: Option<RegionId>);
fn region_closed(&self, region_id: RegionId, lifetime: Duration);
fn cancellation_requested(&self, region_id: RegionId, kind: CancelKind);
fn drain_completed(&self, region_id: RegionId, duration: Duration);
fn deadline_set(&self, region_id: RegionId, deadline: Duration);
fn deadline_exceeded(&self, region_id: RegionId);
fn deadline_warning(&self, task_type: &str, reason: &'static str, remaining: Duration);
fn deadline_violation(&self, task_type: &str, over_by: Duration);
fn deadline_remaining(&self, task_type: &str, remaining: Duration);
fn checkpoint_interval(&self, task_type: &str, interval: Duration);
fn task_stuck_detected(&self, task_type: &str);
fn obligation_created(&self, region_id: RegionId);
fn obligation_discharged(&self, region_id: RegionId);
fn obligation_leaked(&self, region_id: RegionId);
fn scheduler_tick(&self, tasks_polled: usize, duration: Duration);
fn record_panic(&self, _location: &'static str) {}
}
#[derive(Debug, Default, Clone, Copy)]
pub struct NoOpMetrics;
impl MetricsProvider for NoOpMetrics {
fn task_spawned(&self, _: RegionId, _: TaskId) {}
fn task_completed(&self, _: TaskId, _: OutcomeKind, _: Duration) {}
fn region_created(&self, _: RegionId, _: Option<RegionId>) {}
fn region_closed(&self, _: RegionId, _: Duration) {}
fn cancellation_requested(&self, _: RegionId, _: CancelKind) {}
fn drain_completed(&self, _: RegionId, _: Duration) {}
fn deadline_set(&self, _: RegionId, _: Duration) {}
fn deadline_exceeded(&self, _: RegionId) {}
fn deadline_warning(&self, _: &str, _: &'static str, _: Duration) {}
fn deadline_violation(&self, _: &str, _: Duration) {}
fn deadline_remaining(&self, _: &str, _: Duration) {}
fn checkpoint_interval(&self, _: &str, _: Duration) {}
fn task_stuck_detected(&self, _: &str) {}
fn obligation_created(&self, _: RegionId) {}
fn obligation_discharged(&self, _: RegionId) {}
fn obligation_leaked(&self, _: RegionId) {}
fn scheduler_tick(&self, _: usize, _: Duration) {}
}
#[cfg(test)]
#[allow(dead_code)]
mod tests {
use super::*;
#[test]
fn test_counter_increment() {
let counter = Counter::new("test");
counter.increment();
assert_eq!(counter.get(), 1);
counter.add(5);
assert_eq!(counter.get(), 6);
}
#[test]
fn test_gauge_set() {
let gauge = Gauge::new("test");
gauge.set(42);
assert_eq!(gauge.get(), 42);
gauge.increment();
assert_eq!(gauge.get(), 43);
gauge.decrement();
assert_eq!(gauge.get(), 42);
}
#[test]
#[allow(clippy::float_cmp)]
fn test_histogram_observe() {
let hist = Histogram::new("test", vec![1.0, 2.0, 5.0]);
hist.observe(0.5); hist.observe(1.5); hist.observe(10.0);
assert_eq!(hist.count(), 3);
assert_eq!(hist.sum(), 12.0);
}
#[test]
fn test_registry_register() {
let mut metrics = Metrics::new();
let c1 = metrics.counter("c1");
c1.increment();
let c2 = metrics.counter("c1"); assert_eq!(c2.get(), 1);
}
#[test]
fn test_registry_export() {
let mut metrics = Metrics::new();
metrics.counter("requests").add(10);
metrics.gauge("memory").set(1024);
let output = metrics.export_prometheus();
assert!(output.contains("requests 10"));
assert!(output.contains("memory 1024"));
}
#[test]
fn cardinality_cap_routes_fresh_names_to_overflow_bucket() {
let mut metrics = Metrics::with_cardinality_cap(3);
metrics.counter("a").increment();
metrics.counter("b").increment();
metrics.counter("c").increment();
assert_eq!(metrics.counters.len(), 3);
assert_eq!(metrics.overflow_rejections().0, 0);
let a_again = metrics.counter("a");
a_again.add(5);
assert_eq!(metrics.counters.len(), 3);
assert_eq!(metrics.overflow_rejections().0, 0);
let overflow = metrics.counter("d");
overflow.add(7);
assert_eq!(
metrics.overflow_rejections().0,
1,
"first over-cap name should bump rejection counter"
);
metrics.counter("e").add(3);
metrics.counter("f").add(11);
assert_eq!(
metrics.overflow_rejections().0,
3,
"every fresh-name-over-cap should bump rejection counter"
);
assert_eq!(metrics.counters.len(), 4);
assert_eq!(overflow.get(), 21);
}
#[test]
fn cardinality_cap_is_per_kind_not_shared() {
let mut metrics = Metrics::with_cardinality_cap(1);
metrics.counter("c1");
metrics.counter("c2"); assert_eq!(metrics.overflow_rejections().0, 1);
metrics.gauge("g1");
assert_eq!(metrics.overflow_rejections().1, 0);
metrics.gauge("g2"); assert_eq!(metrics.overflow_rejections().1, 1);
metrics.histogram("h1", vec![1.0, 2.0]);
assert_eq!(metrics.overflow_rejections().2, 0);
metrics.summary("s1");
assert_eq!(metrics.overflow_rejections().3, 0);
}
#[test]
fn cardinality_cap_zero_disables_limit() {
let mut metrics = Metrics::with_cardinality_cap(0);
for i in 0..50 {
metrics.counter(&format!("c{i}"));
}
assert_eq!(metrics.counters.len(), 50);
assert_eq!(metrics.overflow_rejections().0, 0);
}
#[test]
fn cardinality_cap_warn_latches_per_kind() {
let mut metrics = Metrics::with_cardinality_cap(1);
metrics.counter("c1");
metrics.counter("c2");
assert!(metrics.overflow_warned_counter.load(Ordering::Relaxed));
for i in 0..100 {
metrics.counter(&format!("c_extra_{i}"));
}
assert!(metrics.overflow_warned_counter.load(Ordering::Relaxed));
assert!(!metrics.overflow_warned_gauge.load(Ordering::Relaxed));
}
#[test]
fn test_metrics_provider_object_safe() {
fn assert_object_safe(_: &dyn MetricsProvider) {}
let provider = NoOpMetrics;
assert_object_safe(&provider);
let boxed: Box<dyn MetricsProvider> = Box::new(NoOpMetrics);
boxed.task_spawned(RegionId::testing_default(), TaskId::testing_default());
}
#[test]
fn counter_name() {
let c = Counter::new("requests_total");
assert_eq!(c.name(), "requests_total");
assert_eq!(c.get(), 0);
}
#[test]
fn counter_debug() {
let c = Counter::new("ctr");
c.add(42);
let dbg = format!("{c:?}");
assert!(dbg.contains("ctr"));
}
#[test]
fn gauge_sub() {
let g = Gauge::new("g");
g.set(10);
g.sub(3);
assert_eq!(g.get(), 7);
}
#[test]
fn gauge_name_debug() {
let g = Gauge::new("active_conns");
assert_eq!(g.name(), "active_conns");
let dbg = format!("{g:?}");
assert!(dbg.contains("active_conns"));
}
#[test]
fn gauge_negative_values() {
let g = Gauge::new("g");
g.set(-5);
assert_eq!(g.get(), -5);
g.increment();
assert_eq!(g.get(), -4);
}
#[test]
fn histogram_name_debug() {
let h = Histogram::new("latency", vec![0.1, 0.5, 1.0]);
assert_eq!(h.name(), "latency");
let dbg = format!("{h:?}");
assert!(dbg.contains("latency"));
}
#[test]
#[allow(clippy::float_cmp)]
fn summary_observe_and_quantiles() {
let summary = Summary::new("request_size_bytes");
summary.observe(10.0);
summary.observe(20.0);
summary.observe(40.0);
summary.observe(80.0);
summary.observe(160.0);
assert_eq!(summary.name(), "request_size_bytes");
assert_eq!(summary.count(), 5);
assert_eq!(summary.sum(), 310.0);
assert_eq!(summary.quantile(0.5), Some(40.0));
assert_eq!(summary.quantile(0.9), Some(160.0));
assert_eq!(summary.quantile(0.99), Some(160.0));
}
#[test]
#[allow(clippy::float_cmp)]
fn histogram_empty() {
let h = Histogram::new("h", vec![1.0, 5.0]);
assert_eq!(h.count(), 0);
assert_eq!(h.sum(), 0.0);
}
#[test]
fn histogram_bucket_sorting() {
let h = Histogram::new("h", vec![5.0, 1.0, 10.0]);
h.observe(0.5); h.observe(3.0); h.observe(100.0); assert_eq!(h.count(), 3);
}
#[test]
#[allow(clippy::float_cmp)]
fn histogram_snapshot_exposes_live_bucket_state() {
let h = Histogram::new("request_latency", vec![5.0, 1.0, 10.0]);
h.observe(0.5);
h.observe(3.0);
h.observe(10.0);
h.observe(100.0);
let snapshot = h.snapshot();
assert_eq!(snapshot.name, "request_latency");
assert_eq!(snapshot.bucket_boundaries, vec![1.0, 5.0, 10.0]);
assert_eq!(snapshot.bucket_counts, vec![1, 1, 1, 1]);
assert_eq!(snapshot.count, 4);
assert_eq!(snapshot.sum, 113.5);
}
#[test]
fn histogram_percentile_skips_empty_leading_buckets() {
let h = Histogram::new("h", vec![1.0, 5.0, 10.0]);
h.observe(6.0);
assert_eq!(h.percentile(0.0), Some(10.0));
assert_eq!(h.percentile(0.5), Some(10.0));
}
#[cfg(feature = "metrics")]
#[test]
#[allow(clippy::float_cmp)]
fn histogram_metrics_feature_test_helpers_round_trip() {
let h = Histogram::new("h", vec![5.0, 1.0, 10.0]);
assert_eq!(h.bucket_boundaries(), &[1.0, 5.0, 10.0]);
assert_eq!(h.bucket_counts(), vec![0, 0, 0, 0]);
assert_eq!(h.mean(), 0.0);
h.observe(0.5);
h.observe(4.5);
h.observe(20.0);
assert_eq!(h.bucket_counts(), vec![1, 1, 0, 1]);
assert_eq!(h.mean(), 25.0 / 3.0);
h.reset();
assert_eq!(h.count(), 0);
assert_eq!(h.sum(), 0.0);
assert_eq!(h.bucket_counts(), vec![0, 0, 0, 0]);
assert_eq!(h.mean(), 0.0);
}
#[test]
fn metric_value_debug_copy() {
let c = MetricValue::Counter(42);
let g = MetricValue::Gauge(-7);
let h = MetricValue::Histogram(10, 2.75);
let dbg_c = format!("{c:?}");
assert!(dbg_c.contains("Counter"));
assert!(dbg_c.contains("42"));
let dbg_g = format!("{g:?}");
assert!(dbg_g.contains("Gauge"));
let dbg_h = format!("{h:?}");
assert!(dbg_h.contains("Histogram"));
let c2 = c;
let _ = c; let _ = c2;
}
#[test]
fn metric_value_clone() {
let v = MetricValue::Counter(99);
let v2 = v;
let _ = v; let _ = v2;
}
#[test]
fn outcome_kind_debug_copy_eq_hash() {
use std::collections::HashSet;
let ok = OutcomeKind::Ok;
let err = OutcomeKind::Err;
let canc = OutcomeKind::Cancelled;
let pan = OutcomeKind::Panicked;
assert_ne!(ok, err);
assert_ne!(canc, pan);
assert_eq!(ok, OutcomeKind::Ok);
let dbg = format!("{ok:?}");
assert!(dbg.contains("Ok"));
let ok2 = ok;
assert_eq!(ok, ok2);
let mut set = HashSet::new();
set.insert(ok);
set.insert(err);
set.insert(canc);
set.insert(pan);
assert_eq!(set.len(), 4);
}
#[test]
fn noop_metrics_debug_default_copy() {
let m = NoOpMetrics;
let dbg = format!("{m:?}");
assert!(dbg.contains("NoOpMetrics"));
let m2 = NoOpMetrics;
let _ = m2;
let m3 = m;
let _ = m;
let _ = m3;
let m4 = m;
let _ = m4;
}
#[test]
fn metrics_default_empty() {
let m = Metrics::default();
let export = m.export_prometheus();
assert!(export.is_empty());
}
#[test]
fn metrics_same_name_returns_same_counter() {
let mut m = Metrics::new();
let c1 = m.counter("x");
c1.add(5);
let c2 = m.counter("x");
assert_eq!(c2.get(), 5); }
#[test]
fn metrics_same_name_returns_same_gauge() {
let mut m = Metrics::new();
let g1 = m.gauge("y");
g1.set(42);
let g2 = m.gauge("y");
assert_eq!(g2.get(), 42);
}
#[test]
fn metrics_export_histogram() {
let mut m = Metrics::new();
let h = m.histogram("latency", vec![1.0, 5.0]);
h.observe(0.5);
h.observe(3.0);
let output = m.export_prometheus();
assert!(output.contains("latency_bucket"));
assert!(output.contains("latency_sum"));
assert!(output.contains("latency_count 2"));
}
#[test]
fn metrics_export_prometheus_snapshot() {
let mut metrics = Metrics::new();
metrics.counter("requests_total").add(7);
metrics.gauge("active_connections").set(3);
let histogram = metrics.histogram("latency_seconds", vec![0.5, 1.0, 5.0]);
histogram.observe(0.25);
histogram.observe(0.75);
histogram.observe(3.5);
insta::assert_snapshot!(
"metrics_export_prometheus_mixed_registry",
metrics.export_prometheus()
);
}
#[test]
fn metrics_export_prometheus_full_registry_snapshot() {
let mut metrics = Metrics::new();
metrics.counter("requests_total").add(11);
metrics.gauge("memory_usage_bytes").set(4096);
let histogram = metrics.histogram("latency_seconds", vec![0.5, 1.0, 5.0]);
histogram.observe(0.25);
histogram.observe(0.75);
histogram.observe(3.5);
let summary = metrics.summary("request_size_bytes");
for value in [128.0, 256.0, 512.0, 1024.0, 2048.0] {
summary.observe(value);
}
insta::assert_snapshot!(
"metrics_export_prometheus_full_registry",
metrics.export_prometheus()
);
}
fn sorted_metric_blocks_snapshot(rendered: &str) -> String {
let mut blocks = Vec::new();
let mut current = Vec::new();
for line in rendered.lines() {
if line.starts_with("# TYPE ") && !current.is_empty() {
blocks.push(current.join("\n"));
current.clear();
}
current.push(line);
}
if !current.is_empty() {
blocks.push(current.join("\n"));
}
blocks.sort_unstable();
let mut snapshot = blocks.join("\n");
if !snapshot.is_empty() {
snapshot.push('\n');
}
snapshot
}
#[test]
fn metrics_export_prometheus_runtime_scheduler_region_snapshot() {
let mut metrics = Metrics::new();
metrics
.counter("runtime_regions_total{state=\"open\"}")
.add(3);
metrics
.counter("runtime_regions_total{state=\"closed\"}")
.add(1);
metrics
.counter("scheduler_dispatch_total{lane=\"ready\",worker=\"primary\"}")
.add(11);
metrics
.counter("scheduler_dispatch_total{lane=\"cancel\",worker=\"primary\"}")
.add(2);
metrics
.gauge("scheduler_queue_depth{lane=\"ready\"}")
.set(4);
metrics
.gauge("scheduler_queue_depth{lane=\"timed\"}")
.set(1);
metrics
.gauge("region_live_tasks{region=\"root\",phase=\"draining\"}")
.set(2);
metrics
.gauge("region_live_tasks{region=\"worker\",phase=\"steady\"}")
.set(5);
let histogram = metrics.histogram("runtime_poll_latency_seconds", vec![0.001, 0.01, 0.1]);
for value in [0.0005, 0.004, 0.08] {
histogram.observe(value);
}
insta::assert_snapshot!(
"metrics_export_prometheus_runtime_scheduler_region",
sorted_metric_blocks_snapshot(&metrics.export_prometheus())
);
}
#[test]
fn counter_metamorphic_fixed_schedule_never_decreases() {
let counter = Counter::new("metamorphic_counter");
let mut rng = crate::util::DetRng::new(0xC0FF_EE11);
let mut expected_total = 0_u64;
let mut previous = counter.get();
for _ in 0..64 {
let delta = (rng.next_u64() % 7) + 1;
counter.add(delta);
expected_total += delta;
let current = counter.get();
assert!(
current >= previous,
"counter must remain monotonic: previous={previous}, current={current}"
);
assert_eq!(
current, expected_total,
"counter should equal the cumulative sum of applied increments"
);
previous = current;
}
}
#[test]
fn counter_metamorphic_label_sum_matches_total() {
let mut metrics = Metrics::new();
let total = metrics.counter("requests_total");
let ok = metrics.counter("requests_total{outcome=\"ok\"}");
let err = metrics.counter("requests_total{outcome=\"err\"}");
let cancelled = metrics.counter("requests_total{outcome=\"cancelled\"}");
let mut rng = crate::util::DetRng::new(0x51A8_EE01);
for _ in 0..48 {
let delta = (rng.next_u64() % 5) + 1;
match rng.next_u64() % 3 {
0 => ok.add(delta),
1 => err.add(delta),
_ => cancelled.add(delta),
}
total.add(delta);
let labeled_sum = ok.get() + err.get() + cancelled.get();
assert_eq!(
total.get(),
labeled_sum,
"sum across labeled counters should match the total counter"
);
}
}
#[test]
fn counter_metamorphic_concurrent_schedule_matches_sequential() {
let mut rng = crate::util::DetRng::new(0xF17E_D5E5);
let mut workloads = Vec::new();
let mut expected_total = 0_u64;
for _ in 0..4 {
let mut shard = Vec::new();
for _ in 0..16 {
let delta = (rng.next_u64() % 11) + 1;
expected_total += delta;
shard.push(delta);
}
workloads.push(shard);
}
let sequential = Counter::new("sequential_counter");
for shard in &workloads {
for &delta in shard {
sequential.add(delta);
}
}
let concurrent = std::sync::Arc::new(Counter::new("concurrent_counter"));
let mut handles = Vec::new();
for shard in workloads.clone() {
let counter = std::sync::Arc::clone(&concurrent);
handles.push(std::thread::spawn(move || {
for delta in shard {
counter.add(delta);
}
}));
}
for handle in handles {
handle.join().expect("counter worker should not panic");
}
assert_eq!(
sequential.get(),
expected_total,
"sequential replay should match the fixed workload sum"
);
assert_eq!(
concurrent.get(),
expected_total,
"concurrent replay should preserve the same cumulative count semantics"
);
assert_eq!(
concurrent.get(),
sequential.get(),
"concurrent and sequential application of the same schedule should agree"
);
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct OtelMetricDescriptor {
pub name: String,
pub description: String,
pub unit: String,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct OtelDataPoint {
pub timestamp_nanos: u64,
pub value: OtelValue,
pub attributes: BTreeMap<String, String>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum OtelValue {
Counter(u64),
Gauge(f64),
Histogram {
count: u64,
sum: f64,
buckets: Vec<(f64, u64)>, },
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct OtelResource {
pub attributes: BTreeMap<String, String>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct OtelMetricsRequest {
pub resource: OtelResource,
pub metrics: Vec<OtelMetric>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct OtelMetric {
pub descriptor: OtelMetricDescriptor,
pub data_points: Vec<OtelDataPoint>,
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub enum OtelTransportMode {
#[default]
CaptureSuccess,
FailNetwork(String),
FailAuth,
FailRateLimit,
}
#[derive(Debug, Clone)]
pub struct OtelExporterConfig {
pub endpoint: String,
pub api_key: Option<String>,
pub timeout_secs: u64,
pub compression: bool,
pub batch_size: usize,
pub transport_mode: OtelTransportMode,
}
impl Default for OtelExporterConfig {
fn default() -> Self {
Self {
endpoint: "http://localhost:4317/v1/metrics".to_string(),
api_key: None,
timeout_secs: 10,
compression: true,
batch_size: 100,
transport_mode: OtelTransportMode::CaptureSuccess,
}
}
}
#[derive(Debug, Clone)]
pub struct OtelDispatchRecord {
pub endpoint: String,
pub timeout_secs: u64,
pub headers: BTreeMap<String, String>,
pub body: Vec<u8>,
pub serialized_json: String,
}
#[derive(Debug)]
pub struct OtelMetricsExporter {
config: OtelExporterConfig,
resource: OtelResource,
dispatches: Mutex<Vec<OtelDispatchRecord>>,
}
impl OtelMetricsExporter {
pub fn new(config: OtelExporterConfig) -> Self {
let mut resource_attrs = BTreeMap::new();
resource_attrs.insert("service.name".to_string(), "asupersync".to_string());
resource_attrs.insert(
"service.version".to_string(),
env!("CARGO_PKG_VERSION").to_string(),
);
Self {
config,
resource: OtelResource {
attributes: resource_attrs,
},
dispatches: Mutex::new(Vec::new()),
}
}
pub async fn export(&self, metrics: &Metrics) -> Result<(), OtelExportError> {
let request = self.build_request(metrics)?;
self.send_request(&request).await
}
fn serialize_request(request: &OtelMetricsRequest) -> Result<String, OtelExportError> {
serde_json::to_string(request)
.map_err(|err| OtelExportError::InvalidData(err.to_string()))
}
fn build_request(&self, metrics: &Metrics) -> Result<OtelMetricsRequest, OtelExportError> {
let mut otel_metrics = Vec::new();
let timestamp = crate::observability::replayable_system_time()
.duration_since(std::time::UNIX_EPOCH)
.map_err(|_| OtelExportError::TimestampError)?
.as_nanos() as u64;
for (name, counter) in &metrics.counters {
let metric = OtelMetric {
descriptor: OtelMetricDescriptor {
name: name.clone(),
description: format!("Counter: {name}"),
unit: "1".to_string(),
},
data_points: vec![OtelDataPoint {
timestamp_nanos: timestamp,
value: OtelValue::Counter(counter.get()),
attributes: BTreeMap::new(),
}],
};
otel_metrics.push(metric);
}
for (name, gauge) in &metrics.gauges {
let metric = OtelMetric {
descriptor: OtelMetricDescriptor {
name: name.clone(),
description: format!("Gauge: {name}"),
unit: "1".to_string(),
},
data_points: vec![OtelDataPoint {
timestamp_nanos: timestamp,
value: OtelValue::Gauge(gauge.get() as f64),
attributes: BTreeMap::new(),
}],
};
otel_metrics.push(metric);
}
for (name, histogram) in &metrics.histograms {
let mut buckets = Vec::new();
let mut cumulative = 0;
for (i, count_atomic) in histogram.counts.iter().enumerate() {
let count = count_atomic.load(Ordering::Relaxed);
cumulative += count;
let upper_bound = if i < histogram.buckets.len() {
histogram.buckets[i]
} else {
f64::INFINITY
};
buckets.push((upper_bound, cumulative));
}
let metric = OtelMetric {
descriptor: OtelMetricDescriptor {
name: name.clone(),
description: format!("Histogram: {name}"),
unit: "s".to_string(),
},
data_points: vec![OtelDataPoint {
timestamp_nanos: timestamp,
value: OtelValue::Histogram {
count: histogram.count(),
sum: histogram.sum(),
buckets,
},
attributes: BTreeMap::new(),
}],
};
otel_metrics.push(metric);
}
Ok(OtelMetricsRequest {
resource: self.resource.clone(),
metrics: otel_metrics,
})
}
fn dispatches(&self) -> Vec<OtelDispatchRecord> {
self.dispatches
.lock()
.expect("dispatches mutex poisoned")
.clone()
}
async fn send_request(&self, request: &OtelMetricsRequest) -> Result<(), OtelExportError> {
use std::io::Write;
let serialized_json = Self::serialize_request(request)?;
let mut headers = BTreeMap::new();
headers.insert("content-type".to_string(), "application/json".to_string());
if let Some(api_key) = &self.config.api_key {
headers.insert("authorization".to_string(), format!("Bearer {api_key}"));
}
let body = if self.config.compression {
use flate2::Compression;
use flate2::write::GzEncoder;
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder
.write_all(serialized_json.as_bytes())
.map_err(|err| OtelExportError::NetworkError(err.to_string()))?;
headers.insert("content-encoding".to_string(), "gzip".to_string());
encoder
.finish()
.map_err(|err| OtelExportError::NetworkError(err.to_string()))?
} else {
serialized_json.as_bytes().to_vec()
};
self.dispatches
.lock()
.expect("dispatches mutex poisoned")
.push(OtelDispatchRecord {
endpoint: self.config.endpoint.clone(),
timeout_secs: self.config.timeout_secs,
headers,
body,
serialized_json,
});
match &self.config.transport_mode {
OtelTransportMode::CaptureSuccess => Ok(()),
OtelTransportMode::FailNetwork(message) => {
Err(OtelExportError::NetworkError(message.clone()))
}
OtelTransportMode::FailAuth => Err(OtelExportError::AuthError),
OtelTransportMode::FailRateLimit => Err(OtelExportError::RateLimited),
}
}
}
#[derive(Debug, Clone)]
pub enum OtelExportError {
TimestampError,
NetworkError(String),
AuthError,
RateLimited,
InvalidData(String),
}
impl std::fmt::Display for OtelExportError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::TimestampError => write!(f, "Failed to get system timestamp"),
Self::NetworkError(msg) => write!(f, "Network error: {msg}"),
Self::AuthError => write!(f, "Authentication failed"),
Self::RateLimited => write!(f, "Rate limited"),
Self::InvalidData(msg) => write!(f, "Invalid metric data: {msg}"),
}
}
}
impl std::error::Error for OtelExportError {}
#[test]
fn conf_otel_resource_attribution() {
let config = OtelExporterConfig::default();
let exporter = OtelMetricsExporter::new(config);
assert!(exporter.resource.attributes.contains_key("service.name"));
assert!(exporter.resource.attributes.contains_key("service.version"));
let service_name = exporter.resource.attributes.get("service.name").unwrap();
assert_eq!(service_name, "asupersync");
let version = exporter.resource.attributes.get("service.version").unwrap();
assert_eq!(version, env!("CARGO_PKG_VERSION"));
}
#[test]
fn conf_otel_metric_descriptor_conformance() {
let config = OtelExporterConfig::default();
let exporter = OtelMetricsExporter::new(config);
let mut metrics = Metrics::new();
metrics.counter("http_requests_total").add(100);
metrics.gauge("memory_usage_bytes").set(1024);
metrics
.histogram("request_duration_seconds", vec![0.1, 0.5, 1.0])
.observe(0.25);
let request = exporter
.build_request(&metrics)
.expect("build_request failed");
assert_eq!(request.metrics.len(), 3);
let counter_metric = request
.metrics
.iter()
.find(|m| m.descriptor.name == "http_requests_total")
.expect("counter metric not found");
assert!(!counter_metric.descriptor.name.is_empty());
assert!(!counter_metric.descriptor.description.is_empty());
assert_eq!(counter_metric.descriptor.unit, "1");
let gauge_metric = request
.metrics
.iter()
.find(|m| m.descriptor.name == "memory_usage_bytes")
.expect("gauge metric not found");
assert!(gauge_metric.descriptor.description.contains("Gauge"));
let hist_metric = request
.metrics
.iter()
.find(|m| m.descriptor.name == "request_duration_seconds")
.expect("histogram metric not found");
assert_eq!(hist_metric.descriptor.unit, "s");
}
#[test]
fn conf_otel_data_point_structure() {
let config = OtelExporterConfig::default();
let exporter = OtelMetricsExporter::new(config);
let mut metrics = Metrics::new();
metrics.counter("test_counter").add(42);
let request = exporter
.build_request(&metrics)
.expect("build_request failed");
let metric = &request.metrics[0];
let data_point = &metric.data_points[0];
assert!(data_point.timestamp_nanos > 0);
assert!(data_point.timestamp_nanos < u64::MAX);
match &data_point.value {
OtelValue::Counter(value) => assert_eq!(*value, 42),
_ => panic!("Expected Counter value"),
}
assert!(data_point.attributes.is_empty()); }
#[test]
fn conf_otel_aggregation_temporality() {
let config = OtelExporterConfig::default();
let exporter = OtelMetricsExporter::new(config);
let mut metrics = Metrics::new();
let counter = metrics.counter("requests");
counter.add(10);
counter.add(5);
let gauge = metrics.gauge("cpu_usage");
gauge.set(50);
gauge.set(75);
let hist = metrics.histogram("latencies", vec![0.1, 1.0]);
hist.observe(0.05);
hist.observe(0.5);
hist.observe(2.0);
let request = exporter
.build_request(&metrics)
.expect("build_request failed");
let counter_metric = request
.metrics
.iter()
.find(|m| m.descriptor.name == "requests")
.expect("counter not found");
if let OtelValue::Counter(value) = counter_metric.data_points[0].value {
assert_eq!(value, 15); }
let gauge_metric = request
.metrics
.iter()
.find(|m| m.descriptor.name == "cpu_usage")
.expect("gauge not found");
if let OtelValue::Gauge(value) = gauge_metric.data_points[0].value {
assert_eq!(value, 75.0); }
let hist_metric = request
.metrics
.iter()
.find(|m| m.descriptor.name == "latencies")
.expect("histogram not found");
if let OtelValue::Histogram {
count,
sum,
buckets,
} = &hist_metric.data_points[0].value
{
assert_eq!(*count, 3); assert!(*sum > 2.5); assert!(!buckets.is_empty()); }
}
#[test]
fn conf_otel_batch_export_conformance() {
let config = OtelExporterConfig {
batch_size: 100,
..Default::default()
};
let exporter = OtelMetricsExporter::new(config);
let mut metrics = Metrics::new();
for i in 0..5 {
metrics.counter(&format!("counter_{i}")).add(i as u64 * 10);
metrics.gauge(&format!("gauge_{i}")).set(i as i64);
metrics
.histogram(&format!("hist_{i}"), vec![1.0, 10.0])
.observe(i as f64);
}
let request = exporter
.build_request(&metrics)
.expect("build_request failed");
assert_eq!(request.metrics.len(), 15);
assert!(!request.resource.attributes.is_empty());
let counter_count = request
.metrics
.iter()
.filter(|m| m.descriptor.name.starts_with("counter_"))
.count();
let gauge_count = request
.metrics
.iter()
.filter(|m| m.descriptor.name.starts_with("gauge_"))
.count();
let hist_count = request
.metrics
.iter()
.filter(|m| m.descriptor.name.starts_with("hist_"))
.count();
assert_eq!(counter_count, 5);
assert_eq!(gauge_count, 5);
assert_eq!(hist_count, 5);
}
#[test]
fn conf_otel_configuration_validation() {
let default_config = OtelExporterConfig::default();
assert!(!default_config.endpoint.is_empty());
assert!(default_config.endpoint.contains("http"));
assert!(default_config.endpoint.contains("4317")); assert!(default_config.endpoint.contains("/v1/metrics")); assert!(default_config.timeout_secs > 0);
assert!(default_config.batch_size > 0);
let custom_config = OtelExporterConfig {
endpoint: "https://otel-collector.example.com/v1/metrics".to_string(),
api_key: Some("secret_key_123".to_string()),
timeout_secs: 30,
compression: false,
batch_size: 50,
transport_mode: OtelTransportMode::CaptureSuccess,
};
let exporter = OtelMetricsExporter::new(custom_config.clone());
assert_eq!(exporter.config.endpoint, custom_config.endpoint);
assert_eq!(exporter.config.api_key, custom_config.api_key);
assert_eq!(exporter.config.timeout_secs, 30);
assert!(!exporter.config.compression);
assert_eq!(exporter.config.batch_size, 50);
assert_eq!(exporter.config.transport_mode, custom_config.transport_mode);
}
#[test]
fn conf_otel_error_handling_conformance() {
let errors = vec![
OtelExportError::TimestampError,
OtelExportError::NetworkError("connection timeout".to_string()),
OtelExportError::AuthError,
OtelExportError::RateLimited,
OtelExportError::InvalidData("malformed metric name".to_string()),
];
for error in errors {
let display_str = format!("{error}");
assert!(!display_str.is_empty());
let debug_str = format!("{error:?}");
assert!(!debug_str.is_empty());
}
let net_err = OtelExportError::NetworkError("timeout".to_string());
assert!(format!("{net_err}").contains("Network error"));
assert!(format!("{net_err}").contains("timeout"));
let data_err = OtelExportError::InvalidData("bad name".to_string());
assert!(format!("{data_err}").contains("Invalid metric data"));
assert!(format!("{data_err}").contains("bad name"));
}
#[test]
fn conf_otel_histogram_bucket_conformance() {
let config = OtelExporterConfig::default();
let exporter = OtelMetricsExporter::new(config);
let mut metrics = Metrics::new();
let hist = metrics.histogram("response_times", vec![0.1, 0.5, 1.0, 5.0]);
hist.observe(0.05); hist.observe(0.3); hist.observe(0.8); hist.observe(2.0); hist.observe(10.0);
let request = exporter
.build_request(&metrics)
.expect("build_request failed");
let hist_metric = &request.metrics[0];
if let OtelValue::Histogram {
count,
sum,
buckets,
} = &hist_metric.data_points[0].value
{
assert_eq!(*count, 5);
assert!((*sum - 13.15).abs() < 0.01);
assert_eq!(buckets.len(), 5);
for i in 1..buckets.len() {
assert!(
buckets[i].1 >= buckets[i - 1].1,
"Bucket {i} count {} should be >= previous bucket count {}",
buckets[i].1,
buckets[i - 1].1
);
}
assert_eq!(buckets.last().unwrap().1, 5);
assert_eq!(buckets.last().unwrap().0, f64::INFINITY);
} else {
panic!("Expected Histogram value");
}
}
#[test]
fn conf_otel_serialized_request_structure_is_deterministic() {
let request = OtelMetricsRequest {
resource: OtelResource {
attributes: BTreeMap::from([
("service.name".to_string(), "asupersync".to_string()),
("service.version".to_string(), "0.3.1-test".to_string()),
]),
},
metrics: vec![OtelMetric {
descriptor: OtelMetricDescriptor {
name: "requests_total".to_string(),
description: "Counter: requests_total".to_string(),
unit: "1".to_string(),
},
data_points: vec![OtelDataPoint {
timestamp_nanos: 123,
value: OtelValue::Counter(7),
attributes: BTreeMap::new(),
}],
}],
};
let serialized =
OtelMetricsExporter::serialize_request(&request).expect("serialize_request failed");
insta::assert_snapshot!("metrics_export_otel_serialized_request", serialized);
}
#[test]
fn conf_otel_export_dispatch_records_headers_and_body() {
use flate2::read::GzDecoder;
use futures_lite::future::block_on;
use std::io::Read;
let config = OtelExporterConfig {
endpoint: "http://collector.test/v1/metrics".to_string(),
api_key: Some("test-key".to_string()),
timeout_secs: 3,
compression: true,
batch_size: 16,
transport_mode: OtelTransportMode::CaptureSuccess,
};
let exporter = OtelMetricsExporter::new(config);
let mut metrics = Metrics::new();
metrics.counter("requests_total").add(7);
block_on(exporter.export(&metrics)).expect("export should succeed");
let dispatches = exporter.dispatches();
assert_eq!(dispatches.len(), 1);
let dispatch = &dispatches[0];
assert_eq!(dispatch.endpoint, "http://collector.test/v1/metrics");
assert_eq!(dispatch.timeout_secs, 3);
assert_eq!(
dispatch.headers.get("content-type").map(String::as_str),
Some("application/json")
);
assert_eq!(
dispatch.headers.get("authorization").map(String::as_str),
Some("Bearer test-key")
);
assert_eq!(
dispatch.headers.get("content-encoding").map(String::as_str),
Some("gzip")
);
let mut decoder = GzDecoder::new(dispatch.body.as_slice());
let mut decompressed = String::new();
decoder
.read_to_string(&mut decompressed)
.expect("gzip body should decode");
assert_eq!(decompressed, dispatch.serialized_json);
assert!(dispatch.serialized_json.contains("\"requests_total\""));
}
#[test]
fn conf_otel_export_transport_errors_are_not_silent() {
use futures_lite::future::block_on;
let cases = [
(
OtelTransportMode::FailNetwork("socket closed".to_string()),
OtelExportError::NetworkError("socket closed".to_string()),
),
(OtelTransportMode::FailAuth, OtelExportError::AuthError),
(
OtelTransportMode::FailRateLimit,
OtelExportError::RateLimited,
),
];
for (transport_mode, expected) in cases {
let config = OtelExporterConfig {
transport_mode,
compression: false,
..Default::default()
};
let exporter = OtelMetricsExporter::new(config);
let mut metrics = Metrics::new();
metrics.counter("requests_total").increment();
let err = block_on(exporter.export(&metrics)).expect_err("export should fail");
match (err, expected) {
(
OtelExportError::NetworkError(actual),
OtelExportError::NetworkError(expected),
) => {
assert_eq!(actual, expected);
}
(OtelExportError::AuthError, OtelExportError::AuthError)
| (OtelExportError::RateLimited, OtelExportError::RateLimited) => {}
(actual, expected) => {
panic!("unexpected transport error: got {actual:?}, expected {expected:?}")
}
}
assert_eq!(
exporter.dispatches().len(),
1,
"failed dispatches should still be recorded for deterministic verification"
);
}
}
#[test]
fn metrics_export_prometheus_exposition_format_compliance_snapshot() {
let mut metrics = Metrics::new();
metrics.counter("http_requests_total").add(0); metrics.counter("bytes_processed_total").add(u64::MAX); metrics.counter("errors_total{status=\"404\"}").add(42); metrics
.counter("requests_with_underscore_name_total")
.add(123);
metrics.gauge("temperature_celsius").set(-273); metrics.gauge("memory_usage_bytes").set(0); metrics.gauge("cpu_usage_percent{cpu=\"0\"}").set(99); metrics.gauge("queue_depth").set(i64::MAX); metrics.gauge("offset_microseconds").set(i64::MIN);
let response_time_hist = metrics.histogram(
"http_request_duration_seconds",
vec![0.001, 0.01, 0.1, 1.0, 10.0],
);
response_time_hist.observe(0.0005); response_time_hist.observe(0.005); response_time_hist.observe(0.05); response_time_hist.observe(0.5); response_time_hist.observe(5.0); response_time_hist.observe(50.0);
let size_hist = metrics.histogram(
"request_size_bytes{endpoint=\"/api/v1/data\"}",
vec![100.0, 1000.0, 10000.0],
);
size_hist.observe(0.0); size_hist.observe(50.0); size_hist.observe(500.0); size_hist.observe(5000.0); size_hist.observe(100000.0);
let latency_summary = metrics.summary("response_latency_summary");
for &value in &[1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0] {
latency_summary.observe(value);
}
let throughput_summary = metrics.summary("throughput_ops_per_second{worker=\"primary\"}");
throughput_summary.observe(1000.0);
let _empty_summary = metrics.summary("empty_metric_summary");
metrics.counter("metric_with_1234_numbers").add(1);
metrics.gauge("CamelCaseMetric").set(42); metrics.counter("metric.with.dots").add(7);
insta::assert_snapshot!(
"metrics_export_prometheus_exposition_format_compliance",
metrics.export_prometheus()
);
}
#[test]
fn aog3fz_metric_name_injection_via_newlines_sanitized() {
let mut metrics = Metrics::new();
let crafted = "real_metric\n# TYPE forged_metric counter\nforged_metric 999";
metrics.counter(crafted).add(1);
let exported = metrics.export_prometheus();
assert!(
!exported.contains("forged_metric 999"),
"injection bypassed sanitization: {exported}"
);
assert!(
!exported.contains("# TYPE forged_metric counter"),
"injection bypassed sanitization: {exported}"
);
assert!(
exported.contains("real_metric_"),
"expected sanitized real_metric_ prefix in: {exported}"
);
}
#[test]
fn aog3fz_metric_name_with_curly_brace_injection_sanitized() {
let mut metrics = Metrics::new();
metrics.counter("evil{job=\"hacker\"}").add(1);
let exported = metrics.export_prometheus();
assert!(
!exported.contains("evil{job=\"hacker\"}"),
"raw injected name leaked: {exported}"
);
assert!(
exported.contains("evil_job__hacker__"),
"expected sanitized form in: {exported}"
);
}
#[test]
fn aog3fz_sanitize_metric_name_first_char_constraints() {
assert_eq!(
sanitize_prometheus_metric_name("0bad").as_deref(),
Some("_bad")
);
assert_eq!(
sanitize_prometheus_metric_name("-bad").as_deref(),
Some("_bad")
);
assert_eq!(
sanitize_prometheus_metric_name(":valid").as_deref(),
Some(":valid")
);
assert_eq!(
sanitize_prometheus_metric_name("_valid").as_deref(),
Some("_valid")
);
assert_eq!(
sanitize_prometheus_metric_name("Valid").as_deref(),
Some("Valid")
);
}
#[test]
fn aog3fz_sanitize_metric_name_continuation_constraints() {
assert_eq!(
sanitize_prometheus_metric_name("a:b_c").as_deref(),
Some("a:b_c")
);
assert_eq!(
sanitize_prometheus_metric_name("a-b.c d").as_deref(),
Some("a_b_c_d")
);
assert_eq!(
sanitize_prometheus_metric_name("a\nb").as_deref(),
Some("a_b")
);
assert_eq!(
sanitize_prometheus_metric_name("a\tb\rc\0d").as_deref(),
Some("a_b_c_d")
);
}
#[test]
fn aog3fz_sanitize_metric_name_empty_returns_none() {
assert_eq!(sanitize_prometheus_metric_name(""), None);
}
#[test]
fn aog3fz_sanitize_label_name_excludes_colon() {
assert_eq!(
sanitize_prometheus_label_name("a:b").as_deref(),
Some("a_b")
);
assert_eq!(
sanitize_prometheus_label_name(":start").as_deref(),
Some("_start")
);
assert_eq!(
sanitize_prometheus_label_name("0digit").as_deref(),
Some("_digit")
);
assert_eq!(
sanitize_prometheus_label_name("valid_name").as_deref(),
Some("valid_name")
);
assert_eq!(sanitize_prometheus_label_name(""), None);
}
#[test]
fn aog3fz_escape_label_value_handles_all_three_specials() {
assert_eq!(escape_prometheus_label_value("plain"), "plain");
assert_eq!(escape_prometheus_label_value(r"a\b"), r"a\\b");
assert_eq!(escape_prometheus_label_value("a\nb"), r"a\nb");
assert_eq!(escape_prometheus_label_value(r#"a"b"#), r#"a\"b"#);
assert_eq!(
escape_prometheus_label_value("a\\b\nc\"d e"),
r#"a\\b\nc\"d e"#
);
}
#[test]
fn pdu7wg_escape_label_value_escapes_carriage_return() {
assert_eq!(escape_prometheus_label_value("a\rb"), r"a\rb");
}
#[test]
fn pdu7wg_escape_label_value_escapes_tab() {
assert_eq!(escape_prometheus_label_value("a\tb"), r"a\tb");
}
#[test]
fn pdu7wg_escape_label_value_escapes_nul() {
assert_eq!(escape_prometheus_label_value("a\0b"), r"a\x00b");
}
#[test]
fn pdu7wg_escape_label_value_escapes_unicode_line_separators() {
assert_eq!(escape_prometheus_label_value("a\u{2028}b"), r"a\u{2028}b");
assert_eq!(escape_prometheus_label_value("a\u{2029}b"), r"a\u{2029}b");
}
#[test]
fn pdu7wg_escape_label_value_escapes_c0_c1_and_del() {
assert_eq!(escape_prometheus_label_value("\x01"), r"\x01");
assert_eq!(escape_prometheus_label_value("\x07"), r"\x07"); assert_eq!(escape_prometheus_label_value("\x1b"), r"\x1b"); assert_eq!(escape_prometheus_label_value("\x7f"), r"\x7f"); assert_eq!(escape_prometheus_label_value("\u{009b}"), r"\x9b");
}
#[test]
fn pdu7wg_escape_label_value_does_not_change_printable_ascii() {
let printable = "hello, world! 123 @#$%^&*()_+-={}[]|;':,./<>?";
assert_eq!(escape_prometheus_label_value(printable), printable);
}
#[test]
fn aog3fz_export_prometheus_output_has_no_control_characters() {
let mut metrics = Metrics::new();
metrics.counter("evil\n{}\"\\\r\t\0").add(1);
metrics.gauge("\x07ring\x08").set(1);
metrics
.histogram("\x1b[31mansi\x1b[0m", vec![1.0])
.observe(0.5);
let exported = metrics.export_prometheus();
for b in exported.bytes() {
assert!(
b == b'\n' || (b'\x20'..=b'\x7e').contains(&b),
"control byte {b:#04x} in exported output: {exported:?}"
);
}
}
#[test]
fn prometheus_exposition_5_counter_3_histogram_1_gauge_golden() {
let mut metrics = Metrics::new();
metrics.counter("http_requests_total").add(1247);
metrics.counter("tcp_connections_opened_total").add(89);
metrics.counter("bytes_transmitted_total").add(524288);
metrics.counter("task_spawns_total").add(0); metrics.counter("region_closures_total").add(u64::MAX);
let request_latency =
metrics.histogram("request_latency_seconds", vec![0.001, 0.01, 0.1, 1.0]);
request_latency.observe(0.0005); request_latency.observe(0.025); request_latency.observe(0.15); request_latency.observe(2.5);
let memory_alloc = metrics.histogram(
"memory_allocation_bytes",
vec![1024.0, 4096.0, 16384.0, 65536.0],
);
memory_alloc.observe(512.0); memory_alloc.observe(2048.0); memory_alloc.observe(8192.0); memory_alloc.observe(32768.0); memory_alloc.observe(131072.0);
let task_duration = metrics.histogram(
"task_execution_duration_ms",
vec![1.0, 5.0, 10.0, 50.0, 100.0],
);
task_duration.observe(0.5); task_duration.observe(3.0); task_duration.observe(7.5); task_duration.observe(25.0); task_duration.observe(75.0); task_duration.observe(250.0);
metrics.gauge("active_worker_threads").set(8);
let expected = r#"# TYPE bytes_transmitted_total counter
bytes_transmitted_total 524288
# TYPE http_requests_total counter
http_requests_total 1247
# TYPE region_closures_total counter
region_closures_total 18446744073709551615
# TYPE task_spawns_total counter
task_spawns_total 0
# TYPE tcp_connections_opened_total counter
tcp_connections_opened_total 89
# TYPE active_worker_threads gauge
active_worker_threads 8
# TYPE memory_allocation_bytes histogram
memory_allocation_bytes_bucket{le="1024"} 1
memory_allocation_bytes_bucket{le="4096"} 2
memory_allocation_bytes_bucket{le="16384"} 3
memory_allocation_bytes_bucket{le="65536"} 4
memory_allocation_bytes_bucket{le="+Inf"} 5
memory_allocation_bytes_sum 174592
memory_allocation_bytes_count 5
# TYPE request_latency_seconds histogram
request_latency_seconds_bucket{le="0.001"} 1
request_latency_seconds_bucket{le="0.01"} 1
request_latency_seconds_bucket{le="0.1"} 2
request_latency_seconds_bucket{le="1"} 3
request_latency_seconds_bucket{le="+Inf"} 4
request_latency_seconds_sum 2.6755
request_latency_seconds_count 4
# TYPE task_execution_duration_ms histogram
task_execution_duration_ms_bucket{le="1"} 1
task_execution_duration_ms_bucket{le="5"} 2
task_execution_duration_ms_bucket{le="10"} 3
task_execution_duration_ms_bucket{le="50"} 4
task_execution_duration_ms_bucket{le="100"} 5
task_execution_duration_ms_bucket{le="+Inf"} 6
task_execution_duration_ms_sum 361
task_execution_duration_ms_count 6
"#;
assert_eq!(metrics.export_prometheus(), expected);
}
}