use crate::observability::metrics::{MetricsProvider, OutcomeKind};
use crate::types::{CancelKind, RegionId, TaskId};
use opentelemetry::KeyValue;
use opentelemetry::metrics::{Counter, Histogram, Meter, ObservableGauge};
use parking_lot::{Mutex, RwLock};
use std::collections::{HashMap, HashSet};
use std::io::Write;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum CardinalityOverflow {
#[default]
Drop,
Aggregate,
Warn,
}
#[derive(Debug, Clone)]
pub struct MetricsConfig {
pub max_cardinality: usize,
pub overflow_strategy: CardinalityOverflow,
pub drop_labels: Vec<String>,
pub sampling: Option<SamplingConfig>,
}
impl Default for MetricsConfig {
fn default() -> Self {
Self {
max_cardinality: 1000,
overflow_strategy: CardinalityOverflow::Drop,
drop_labels: Vec::new(),
sampling: None,
}
}
}
impl MetricsConfig {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_max_cardinality(mut self, max: usize) -> Self {
self.max_cardinality = max;
self
}
#[must_use]
pub fn with_overflow_strategy(mut self, strategy: CardinalityOverflow) -> Self {
self.overflow_strategy = strategy;
self
}
#[must_use]
pub fn with_drop_label(mut self, label: impl Into<String>) -> Self {
self.drop_labels.push(label.into());
self
}
#[must_use]
pub fn with_sampling(mut self, sampling: SamplingConfig) -> Self {
self.sampling = Some(sampling);
self
}
}
#[derive(Debug, Clone)]
pub struct SamplingConfig {
pub sample_rate: f64,
pub sampled_metrics: Vec<String>,
}
impl Default for SamplingConfig {
fn default() -> Self {
Self {
sample_rate: 1.0,
sampled_metrics: Vec::new(),
}
}
}
impl SamplingConfig {
#[must_use]
pub fn new(sample_rate: f64) -> Self {
Self {
sample_rate: sample_rate.clamp(0.0, 1.0),
sampled_metrics: Vec::new(),
}
}
#[must_use]
pub fn with_sampled_metric(mut self, metric: impl Into<String>) -> Self {
self.sampled_metrics.push(metric.into());
self
}
}
#[derive(Debug, Default)]
struct CardinalityTracker {
seen: RwLock<HashMap<String, HashSet<u64>>>,
overflow_count: AtomicU64,
}
impl CardinalityTracker {
fn new() -> Self {
Self::default()
}
#[cfg(test)]
fn would_exceed(&self, metric: &str, labels: &[KeyValue], max_cardinality: usize) -> bool {
let hash = Self::hash_labels(labels);
let seen = self.seen.read();
if max_cardinality == 0 {
return seen.get(metric).is_none_or(|set| !set.contains(&hash));
}
if let Some(set) = seen.get(metric) {
if set.contains(&hash) {
return false; }
set.len() >= max_cardinality
} else {
false }
}
fn record(&self, metric: &str, labels: &[KeyValue]) {
let hash = Self::hash_labels(labels);
let mut seen = self.seen.write();
seen.entry(metric.to_string()).or_default().insert(hash);
}
fn check_and_record(&self, metric: &str, labels: &[KeyValue], max_cardinality: usize) -> bool {
let hash = Self::hash_labels(labels);
let mut seen = self.seen.write();
let set = seen.entry(metric.to_string()).or_default();
if set.contains(&hash) {
return false;
}
if set.len() >= max_cardinality {
return true;
}
set.insert(hash);
false
}
fn record_overflow(&self) {
self.overflow_count.fetch_add(1, Ordering::Relaxed);
}
fn overflow_count(&self) -> u64 {
self.overflow_count.load(Ordering::Relaxed)
}
fn hash_labels(labels: &[KeyValue]) -> u64 {
use crate::util::DetHasher;
use std::hash::{Hash, Hasher};
let mut normalized: Vec<(&str, String)> = labels
.iter()
.map(|kv| (kv.key.as_str(), format!("{:?}", kv.value)))
.collect();
normalized.sort_unstable_by(|(a_key, a_val), (b_key, b_val)| {
a_key.cmp(b_key).then_with(|| a_val.cmp(b_val))
});
let mut hasher = DetHasher::default();
for (key, value) in normalized {
key.hash(&mut hasher);
value.hash(&mut hasher);
}
hasher.finish()
}
#[cfg(test)]
fn cardinality(&self, metric: &str) -> usize {
self.seen
.read()
.get(metric)
.map_or(0, std::collections::HashSet::len)
}
}
pub type MetricLabels = Vec<(String, String)>;
pub type CounterDataPoint = (String, MetricLabels, u64);
pub type GaugeDataPoint = (String, MetricLabels, i64);
pub type HistogramDataPoint = (String, MetricLabels, u64, f64);
#[derive(Debug, Clone, Default)]
pub struct MetricsSnapshot {
pub counters: Vec<CounterDataPoint>,
pub gauges: Vec<GaugeDataPoint>,
pub histograms: Vec<HistogramDataPoint>,
}
impl MetricsSnapshot {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn add_counter(
&mut self,
name: impl Into<String>,
labels: Vec<(String, String)>,
value: u64,
) {
self.counters.push((name.into(), labels, value));
}
pub fn add_gauge(
&mut self,
name: impl Into<String>,
labels: Vec<(String, String)>,
value: i64,
) {
self.gauges.push((name.into(), labels, value));
}
pub fn add_histogram(
&mut self,
name: impl Into<String>,
labels: Vec<(String, String)>,
count: u64,
sum: f64,
) {
self.histograms.push((name.into(), labels, count, sum));
}
}
#[derive(Debug, Clone)]
pub struct ExportError {
message: String,
}
impl ExportError {
#[must_use]
pub fn new(message: impl Into<String>) -> Self {
Self {
message: message.into(),
}
}
}
impl std::fmt::Display for ExportError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "export error: {}", self.message)
}
}
impl std::error::Error for ExportError {}
pub trait MetricsExporter: Send + Sync {
fn export(&self, metrics: &MetricsSnapshot) -> Result<(), ExportError>;
fn flush(&self) -> Result<(), ExportError>;
}
#[derive(Debug)]
pub struct StdoutExporter {
prefix: String,
}
impl Default for StdoutExporter {
fn default() -> Self {
Self::new()
}
}
impl StdoutExporter {
#[must_use]
pub fn new() -> Self {
Self {
prefix: String::new(),
}
}
#[must_use]
pub fn with_prefix(prefix: impl Into<String>) -> Self {
Self {
prefix: prefix.into(),
}
}
fn format_labels(labels: &[(String, String)]) -> String {
if labels.is_empty() {
String::new()
} else {
let parts: Vec<_> = labels.iter().map(|(k, v)| format!("{k}=\"{v}\"")).collect();
format!("{{{}}}", parts.join(","))
}
}
}
impl MetricsExporter for StdoutExporter {
fn export(&self, metrics: &MetricsSnapshot) -> Result<(), ExportError> {
let mut stdout = std::io::stdout().lock();
for (name, labels, value) in &metrics.counters {
let label_str = Self::format_labels(labels);
writeln!(
stdout,
"{}COUNTER {}{} {}",
self.prefix, name, label_str, value
)
.map_err(|e| ExportError::new(e.to_string()))?;
}
for (name, labels, value) in &metrics.gauges {
let label_str = Self::format_labels(labels);
writeln!(
stdout,
"{}GAUGE {}{} {}",
self.prefix, name, label_str, value
)
.map_err(|e| ExportError::new(e.to_string()))?;
}
for (name, labels, count, sum) in &metrics.histograms {
let label_str = Self::format_labels(labels);
writeln!(
stdout,
"{}HISTOGRAM {}{} count={} sum={}",
self.prefix, name, label_str, count, sum
)
.map_err(|e| ExportError::new(e.to_string()))?;
}
Ok(())
}
fn flush(&self) -> Result<(), ExportError> {
std::io::stdout()
.flush()
.map_err(|e| ExportError::new(e.to_string()))
}
}
#[derive(Debug, Default)]
pub struct NullExporter;
impl NullExporter {
#[must_use]
pub fn new() -> Self {
Self
}
}
impl MetricsExporter for NullExporter {
fn export(&self, _metrics: &MetricsSnapshot) -> Result<(), ExportError> {
Ok(())
}
fn flush(&self) -> Result<(), ExportError> {
Ok(())
}
}
#[derive(Default)]
pub struct MultiExporter {
exporters: Vec<Box<dyn MetricsExporter>>,
}
impl MultiExporter {
#[must_use]
pub fn new(exporters: Vec<Box<dyn MetricsExporter>>) -> Self {
Self { exporters }
}
pub fn add(&mut self, exporter: Box<dyn MetricsExporter>) {
self.exporters.push(exporter);
}
#[must_use]
pub fn len(&self) -> usize {
self.exporters.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.exporters.is_empty()
}
}
impl std::fmt::Debug for MultiExporter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MultiExporter")
.field("exporters_count", &self.exporters.len())
.finish()
}
}
impl MetricsExporter for MultiExporter {
fn export(&self, metrics: &MetricsSnapshot) -> Result<(), ExportError> {
let mut errors = Vec::new();
for exporter in &self.exporters {
if let Err(e) = exporter.export(metrics) {
errors.push(e.message);
}
}
if errors.is_empty() {
Ok(())
} else {
Err(ExportError::new(errors.join("; ")))
}
}
fn flush(&self) -> Result<(), ExportError> {
let mut errors = Vec::new();
for exporter in &self.exporters {
if let Err(e) = exporter.flush() {
errors.push(e.message);
}
}
if errors.is_empty() {
Ok(())
} else {
Err(ExportError::new(errors.join("; ")))
}
}
}
#[derive(Debug, Default)]
pub struct InMemoryExporter {
snapshots: Mutex<Vec<MetricsSnapshot>>,
}
impl InMemoryExporter {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn snapshots(&self) -> Vec<MetricsSnapshot> {
self.snapshots.lock().clone()
}
pub fn clear(&self) {
self.snapshots.lock().clear();
}
#[must_use]
pub fn total_metrics(&self) -> usize {
let snapshots = self.snapshots.lock();
snapshots
.iter()
.map(|s| s.counters.len() + s.gauges.len() + s.histograms.len())
.sum()
}
}
impl MetricsExporter for InMemoryExporter {
fn export(&self, metrics: &MetricsSnapshot) -> Result<(), ExportError> {
self.snapshots.lock().push(metrics.clone());
Ok(())
}
fn flush(&self) -> Result<(), ExportError> {
Ok(())
}
}
#[derive(Clone)]
pub struct OtelMetrics {
#[allow(dead_code)]
tasks_active: ObservableGauge<u64>,
tasks_spawned: Counter<u64>,
tasks_completed: Counter<u64>,
task_duration: Histogram<f64>,
#[allow(dead_code)]
regions_active: ObservableGauge<u64>,
regions_created: Counter<u64>,
regions_closed: Counter<u64>,
region_lifetime: Histogram<f64>,
cancellations: Counter<u64>,
drain_duration: Histogram<f64>,
deadlines_set: Counter<u64>,
deadlines_exceeded: Counter<u64>,
deadline_warnings: Counter<u64>,
deadline_violations: Counter<u64>,
deadline_remaining: Histogram<f64>,
checkpoint_interval: Histogram<f64>,
task_stuck_detected: Counter<u64>,
#[allow(dead_code)]
obligations_active: ObservableGauge<u64>,
obligations_created: Counter<u64>,
obligations_discharged: Counter<u64>,
obligations_leaked: Counter<u64>,
scheduler_poll_time: Histogram<f64>,
scheduler_tasks_polled: Histogram<f64>,
state: Arc<MetricsState>,
config: MetricsConfig,
cardinality_tracker: Arc<CardinalityTracker>,
sample_counter: Arc<AtomicU64>,
}
impl std::fmt::Debug for OtelMetrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OtelMetrics")
.field("config", &self.config)
.field("state", &self.state)
.finish_non_exhaustive()
}
}
#[derive(Debug, Default)]
#[allow(clippy::struct_field_names)]
struct MetricsState {
active_tasks: AtomicU64,
active_regions: AtomicU64,
active_obligations: AtomicU64,
}
impl MetricsState {
fn inc_tasks(&self) {
self.active_tasks.fetch_add(1, Ordering::Relaxed);
}
fn dec_tasks(&self) {
let _ = self
.active_tasks
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
Some(v.saturating_sub(1))
});
}
fn inc_regions(&self) {
self.active_regions.fetch_add(1, Ordering::Relaxed);
}
fn dec_regions(&self) {
let _ = self
.active_regions
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
Some(v.saturating_sub(1))
});
}
fn inc_obligations(&self) {
self.active_obligations.fetch_add(1, Ordering::Relaxed);
}
fn dec_obligations(&self) {
let _ = self
.active_obligations
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
Some(v.saturating_sub(1))
});
}
}
impl OtelMetrics {
#[must_use]
pub fn new(meter: Meter) -> Self {
Self::new_with_config(meter, MetricsConfig::default())
}
#[must_use]
#[allow(clippy::too_many_lines)]
#[allow(clippy::needless_pass_by_value)] pub fn new_with_config(meter: Meter, config: MetricsConfig) -> Self {
let state = Arc::new(MetricsState::default());
let tasks_active = meter
.u64_observable_gauge("asupersync.tasks.active")
.with_description("Currently running tasks")
.with_callback({
let state = Arc::clone(&state);
move |observer| {
observer.observe(state.active_tasks.load(Ordering::Relaxed), &[]);
}
})
.build();
let regions_active = meter
.u64_observable_gauge("asupersync.regions.active")
.with_description("Currently active regions")
.with_callback({
let state = Arc::clone(&state);
move |observer| {
observer.observe(state.active_regions.load(Ordering::Relaxed), &[]);
}
})
.build();
let obligations_active = meter
.u64_observable_gauge("asupersync.obligations.active")
.with_description("Currently active obligations")
.with_callback({
let state = Arc::clone(&state);
move |observer| {
observer.observe(state.active_obligations.load(Ordering::Relaxed), &[]);
}
})
.build();
Self {
tasks_active,
tasks_spawned: meter
.u64_counter("asupersync.tasks.spawned")
.with_description("Total tasks spawned")
.build(),
tasks_completed: meter
.u64_counter("asupersync.tasks.completed")
.with_description("Total tasks completed")
.build(),
task_duration: meter
.f64_histogram("asupersync.tasks.duration")
.with_description("Task execution duration in seconds")
.build(),
regions_active,
regions_created: meter
.u64_counter("asupersync.regions.created")
.with_description("Total regions created")
.build(),
regions_closed: meter
.u64_counter("asupersync.regions.closed")
.with_description("Total regions closed")
.build(),
region_lifetime: meter
.f64_histogram("asupersync.regions.lifetime")
.with_description("Region lifetime in seconds")
.build(),
cancellations: meter
.u64_counter("asupersync.cancellations")
.with_description("Cancellation requests")
.build(),
drain_duration: meter
.f64_histogram("asupersync.cancellation.drain_duration")
.with_description("Cancellation drain duration in seconds")
.build(),
deadlines_set: meter
.u64_counter("asupersync.deadlines.set")
.with_description("Deadlines configured")
.build(),
deadlines_exceeded: meter
.u64_counter("asupersync.deadlines.exceeded")
.with_description("Deadline exceeded events")
.build(),
deadline_warnings: meter
.u64_counter("asupersync.deadline.warnings_total")
.with_description("Deadline warning events")
.build(),
deadline_violations: meter
.u64_counter("asupersync.deadline.violations_total")
.with_description("Deadline violation events")
.build(),
deadline_remaining: meter
.f64_histogram("asupersync.deadline.remaining_seconds")
.with_description("Time remaining at completion in seconds")
.build(),
checkpoint_interval: meter
.f64_histogram("asupersync.checkpoint.interval_seconds")
.with_description("Time between checkpoints in seconds")
.build(),
task_stuck_detected: meter
.u64_counter("asupersync.task.stuck_detected_total")
.with_description("Tasks detected as stuck (no progress)")
.build(),
obligations_active,
obligations_created: meter
.u64_counter("asupersync.obligations.created")
.with_description("Obligations created")
.build(),
obligations_discharged: meter
.u64_counter("asupersync.obligations.discharged")
.with_description("Obligations discharged")
.build(),
obligations_leaked: meter
.u64_counter("asupersync.obligations.leaked")
.with_description("Obligations leaked")
.build(),
scheduler_poll_time: meter
.f64_histogram("asupersync.scheduler.poll_time")
.with_description("Scheduler poll duration in seconds")
.build(),
scheduler_tasks_polled: meter
.f64_histogram("asupersync.scheduler.tasks_polled")
.with_description("Tasks polled per scheduler tick")
.build(),
state,
config,
cardinality_tracker: Arc::new(CardinalityTracker::new()),
sample_counter: Arc::new(AtomicU64::new(0)),
}
}
#[must_use]
pub fn config(&self) -> &MetricsConfig {
&self.config
}
#[must_use]
pub fn cardinality_overflow_count(&self) -> u64 {
self.cardinality_tracker.overflow_count()
}
fn check_cardinality(&self, metric: &str, labels: &[KeyValue]) -> Option<Vec<KeyValue>> {
let filtered: Vec<KeyValue> = labels
.iter()
.filter(|kv| !self.config.drop_labels.contains(&kv.key.to_string()))
.cloned()
.collect();
if self
.cardinality_tracker
.check_and_record(metric, &filtered, self.config.max_cardinality)
{
self.cardinality_tracker.record_overflow();
match self.config.overflow_strategy {
CardinalityOverflow::Drop => return None,
CardinalityOverflow::Aggregate => {
let aggregated: Vec<KeyValue> = filtered
.into_iter()
.map(|kv| KeyValue::new(kv.key, "other"))
.collect();
if self.cardinality_tracker.check_and_record(
metric,
&aggregated,
self.config.max_cardinality,
) {
return None;
}
return Some(aggregated);
}
CardinalityOverflow::Warn => {
crate::tracing_compat::warn!(
metric = metric,
"cardinality limit reached for metric"
);
self.cardinality_tracker.record(metric, &filtered);
}
}
}
Some(filtered)
}
fn should_sample(&self, metric: &str) -> bool {
let Some(ref sampling) = self.config.sampling else {
return true; };
if !sampling.sampled_metrics.is_empty()
&& !sampling.sampled_metrics.iter().any(|m| metric.contains(m))
{
return true; }
if sampling.sample_rate >= 1.0 {
return true;
}
if sampling.sample_rate <= 0.0 {
return false;
}
let count = self.sample_counter.fetch_add(1, Ordering::Relaxed);
#[allow(clippy::cast_sign_loss)]
let threshold = (sampling.sample_rate * 100.0) as u64;
(count % 100) < threshold
}
}
impl MetricsProvider for OtelMetrics {
fn task_spawned(&self, _region_id: RegionId, _task_id: TaskId) {
self.state.inc_tasks();
self.tasks_spawned.add(1, &[]);
}
fn task_completed(&self, _task_id: TaskId, outcome: OutcomeKind, duration: Duration) {
self.state.dec_tasks();
let labels = [KeyValue::new("outcome", outcome_label(outcome))];
if let Some(filtered) = self.check_cardinality("asupersync.tasks.completed", &labels) {
self.tasks_completed.add(1, &filtered);
}
if self.should_sample("asupersync.tasks.duration") {
if let Some(filtered) = self.check_cardinality("asupersync.tasks.duration", &labels) {
self.task_duration.record(duration.as_secs_f64(), &filtered);
}
}
}
fn region_created(&self, _region_id: RegionId, _parent: Option<RegionId>) {
self.state.inc_regions();
self.regions_created.add(1, &[]);
}
fn region_closed(&self, _region_id: RegionId, lifetime: Duration) {
self.state.dec_regions();
self.regions_closed.add(1, &[]);
if self.should_sample("asupersync.regions.lifetime") {
self.region_lifetime.record(lifetime.as_secs_f64(), &[]);
}
}
fn cancellation_requested(&self, _region_id: RegionId, kind: CancelKind) {
let labels = [KeyValue::new("kind", cancel_kind_label(kind))];
if let Some(filtered) = self.check_cardinality("asupersync.cancellations", &labels) {
self.cancellations.add(1, &filtered);
}
}
fn drain_completed(&self, _region_id: RegionId, duration: Duration) {
if self.should_sample("asupersync.cancellation.drain_duration") {
self.drain_duration.record(duration.as_secs_f64(), &[]);
}
}
fn deadline_set(&self, _region_id: RegionId, _deadline: Duration) {
self.deadlines_set.add(1, &[]);
}
fn deadline_exceeded(&self, _region_id: RegionId) {
self.deadlines_exceeded.add(1, &[]);
}
fn deadline_warning(&self, task_type: &str, reason: &'static str, remaining: Duration) {
let task_type = task_type.to_string();
let labels = [
KeyValue::new("task_type", task_type),
KeyValue::new("reason", reason),
];
if let Some(filtered) =
self.check_cardinality("asupersync.deadline.warnings_total", &labels)
{
self.deadline_warnings.add(1, &filtered);
}
let _ = remaining;
}
fn deadline_violation(&self, task_type: &str, _over_by: Duration) {
let task_type = task_type.to_string();
let labels = [KeyValue::new("task_type", task_type)];
if let Some(filtered) =
self.check_cardinality("asupersync.deadline.violations_total", &labels)
{
self.deadline_violations.add(1, &filtered);
}
}
fn deadline_remaining(&self, task_type: &str, remaining: Duration) {
if self.should_sample("asupersync.deadline.remaining_seconds") {
let task_type = task_type.to_string();
let labels = [KeyValue::new("task_type", task_type)];
if let Some(filtered) =
self.check_cardinality("asupersync.deadline.remaining_seconds", &labels)
{
self.deadline_remaining
.record(remaining.as_secs_f64(), &filtered);
}
}
}
fn checkpoint_interval(&self, task_type: &str, interval: Duration) {
if self.should_sample("asupersync.checkpoint.interval_seconds") {
let task_type = task_type.to_string();
let labels = [KeyValue::new("task_type", task_type)];
if let Some(filtered) =
self.check_cardinality("asupersync.checkpoint.interval_seconds", &labels)
{
self.checkpoint_interval
.record(interval.as_secs_f64(), &filtered);
}
}
}
fn task_stuck_detected(&self, task_type: &str) {
let task_type = task_type.to_string();
let labels = [KeyValue::new("task_type", task_type)];
if let Some(filtered) =
self.check_cardinality("asupersync.task.stuck_detected_total", &labels)
{
self.task_stuck_detected.add(1, &filtered);
}
}
fn obligation_created(&self, _region_id: RegionId) {
self.state.inc_obligations();
self.obligations_created.add(1, &[]);
}
fn obligation_discharged(&self, _region_id: RegionId) {
self.state.dec_obligations();
self.obligations_discharged.add(1, &[]);
}
fn obligation_leaked(&self, _region_id: RegionId) {
self.state.dec_obligations();
self.obligations_leaked.add(1, &[]);
}
fn scheduler_tick(&self, tasks_polled: usize, duration: Duration) {
if self.should_sample("asupersync.scheduler") {
self.scheduler_poll_time.record(duration.as_secs_f64(), &[]);
#[allow(clippy::cast_precision_loss)]
self.scheduler_tasks_polled.record(tasks_polled as f64, &[]);
}
}
}
const fn outcome_label(outcome: OutcomeKind) -> &'static str {
match outcome {
OutcomeKind::Ok => "ok",
OutcomeKind::Err => "err",
OutcomeKind::Cancelled => "cancelled",
OutcomeKind::Panicked => "panicked",
}
}
const fn cancel_kind_label(kind: CancelKind) -> &'static str {
match kind {
CancelKind::User => "user",
CancelKind::Timeout => "timeout",
CancelKind::Deadline => "deadline",
CancelKind::PollQuota => "poll_quota",
CancelKind::CostBudget => "cost_budget",
CancelKind::FailFast => "fail_fast",
CancelKind::RaceLost => "race_lost",
CancelKind::ParentCancelled => "parent_cancelled",
CancelKind::ResourceUnavailable => "resource_unavailable",
CancelKind::Shutdown => "shutdown",
CancelKind::LinkedExit => "linked_exit",
}
}
#[cfg(all(test, feature = "metrics"))]
mod tests {
use super::*;
use crate::runtime::RuntimeBuilder;
use crate::test_utils::init_test_logging;
use opentelemetry::metrics::MeterProvider;
use opentelemetry_sdk::metrics::{
InMemoryMetricExporter as OtelInMemoryExporter, PeriodicReader, SdkMeterProvider,
data::ResourceMetrics,
};
use std::collections::HashSet;
use std::path::Path;
use std::sync::{Arc, Barrier};
const EXPECTED_METRICS: &[&str] = &[
"asupersync.tasks.spawned",
"asupersync.tasks.completed",
"asupersync.tasks.duration",
"asupersync.regions.created",
"asupersync.regions.closed",
"asupersync.regions.lifetime",
"asupersync.cancellations",
"asupersync.cancellation.drain_duration",
"asupersync.deadlines.set",
"asupersync.deadlines.exceeded",
"asupersync.deadline.warnings_total",
"asupersync.deadline.violations_total",
"asupersync.deadline.remaining_seconds",
"asupersync.checkpoint.interval_seconds",
"asupersync.task.stuck_detected_total",
"asupersync.obligations.created",
"asupersync.obligations.discharged",
"asupersync.obligations.leaked",
"asupersync.scheduler.poll_time",
"asupersync.scheduler.tasks_polled",
];
fn metric_names(finished: &[ResourceMetrics]) -> HashSet<String> {
let mut names = HashSet::new();
for resource_metrics in finished {
for scope_metrics in resource_metrics.scope_metrics() {
for metric in scope_metrics.metrics() {
names.insert(metric.name().to_string());
}
}
}
names
}
fn assert_expected_metrics_present(names: &HashSet<String>, expected: &[&str]) {
for name in expected {
assert!(names.contains(*name), "missing metric: {name}");
}
}
fn collect_grafana_queries(value: &serde_json::Value, output: &mut Vec<String>) {
match value {
serde_json::Value::Object(map) => {
for (key, val) in map {
if key == "expr" || key == "query" {
if let serde_json::Value::String(text) = val {
output.push(text.clone());
}
} else {
collect_grafana_queries(val, output);
}
}
}
serde_json::Value::Array(items) => {
for item in items {
collect_grafana_queries(item, output);
}
}
_ => {}
}
}
#[test]
fn otel_metrics_exports_in_memory() {
init_test_logging();
let exporter = OtelInMemoryExporter::default();
let reader = PeriodicReader::builder(exporter.clone()).build();
let provider = SdkMeterProvider::builder().with_reader(reader).build();
let meter = provider.meter("asupersync");
let metrics = OtelMetrics::new(meter);
metrics.task_spawned(RegionId::testing_default(), TaskId::testing_default());
metrics.task_completed(
TaskId::testing_default(),
OutcomeKind::Ok,
Duration::from_millis(10),
);
metrics.region_created(RegionId::testing_default(), None);
metrics.region_closed(RegionId::testing_default(), Duration::from_secs(1));
metrics.cancellation_requested(RegionId::testing_default(), CancelKind::User);
metrics.drain_completed(RegionId::testing_default(), Duration::from_millis(5));
metrics.deadline_set(RegionId::testing_default(), Duration::from_secs(2));
metrics.deadline_exceeded(RegionId::testing_default());
metrics.deadline_warning("test", "no_progress", Duration::from_secs(1));
metrics.deadline_violation("test", Duration::from_secs(1));
metrics.deadline_remaining("test", Duration::from_secs(5));
metrics.checkpoint_interval("test", Duration::from_millis(200));
metrics.task_stuck_detected("test");
metrics.obligation_created(RegionId::testing_default());
metrics.obligation_discharged(RegionId::testing_default());
metrics.obligation_leaked(RegionId::testing_default());
metrics.scheduler_tick(3, Duration::from_millis(1));
provider.force_flush().expect("force_flush");
let finished = exporter.get_finished_metrics().expect("finished metrics");
assert!(!finished.is_empty());
let names = metric_names(&finished);
assert_expected_metrics_present(&names, EXPECTED_METRICS);
provider.shutdown().expect("shutdown");
}
#[test]
fn otel_metrics_runtime_integration_emits_task_metrics() {
init_test_logging();
let exporter = OtelInMemoryExporter::default();
let reader = PeriodicReader::builder(exporter.clone()).build();
let provider = SdkMeterProvider::builder().with_reader(reader).build();
let meter = provider.meter("asupersync");
let metrics = OtelMetrics::new(meter);
let runtime = RuntimeBuilder::new()
.metrics(metrics)
.build()
.expect("runtime build");
let handle = runtime.handle().spawn(async { 7u8 });
let result = runtime.block_on(handle);
assert_eq!(result, 7);
for _ in 0..1024 {
if runtime.is_quiescent() {
break;
}
std::thread::yield_now();
}
assert!(runtime.is_quiescent(), "runtime did not reach quiescence");
provider.force_flush().expect("force_flush");
let finished = exporter.get_finished_metrics().expect("finished metrics");
assert!(!finished.is_empty());
let names = metric_names(&finished);
assert_expected_metrics_present(
&names,
&[
"asupersync.tasks.spawned",
"asupersync.tasks.completed",
"asupersync.tasks.duration",
],
);
provider.shutdown().expect("shutdown");
}
#[test]
fn grafana_dashboard_references_expected_metrics() {
init_test_logging();
let path = Path::new(env!("CARGO_MANIFEST_DIR")).join("examples/grafana_dashboard.json");
let contents = std::fs::read_to_string(path).expect("read grafana dashboard");
let json: serde_json::Value =
serde_json::from_str(&contents).expect("parse grafana dashboard");
let mut queries = Vec::new();
collect_grafana_queries(&json, &mut queries);
assert!(!queries.is_empty(), "expected grafana queries to exist");
let joined = queries.join("\n");
let expected = [
"asupersync_tasks_spawned_total",
"asupersync_tasks_completed_total",
"asupersync_tasks_duration_bucket",
"asupersync_regions_active",
"asupersync_cancellations_total",
"asupersync_deadline_warnings_total",
"asupersync_deadline_violations_total",
"asupersync_deadline_remaining_seconds_bucket",
"asupersync_checkpoint_interval_seconds_bucket",
"asupersync_task_stuck_detected_total",
];
for metric in expected {
assert!(
joined.contains(metric),
"missing grafana query metric: {metric}"
);
}
}
#[test]
fn otel_metrics_with_config() {
let exporter = OtelInMemoryExporter::default();
let reader = PeriodicReader::builder(exporter).build();
let provider = SdkMeterProvider::builder().with_reader(reader).build();
let meter = provider.meter("asupersync");
let config = MetricsConfig::new()
.with_max_cardinality(500)
.with_overflow_strategy(CardinalityOverflow::Aggregate);
let metrics = OtelMetrics::new_with_config(meter, config);
assert_eq!(metrics.config().max_cardinality, 500);
assert_eq!(
metrics.config().overflow_strategy,
CardinalityOverflow::Aggregate
);
provider.shutdown().expect("shutdown");
}
#[test]
fn cardinality_tracker_basic() {
let tracker = CardinalityTracker::new();
let labels = [KeyValue::new("outcome", "ok")];
assert!(!tracker.would_exceed("test", &labels, 10));
tracker.record("test", &labels);
assert_eq!(tracker.cardinality("test"), 1);
tracker.record("test", &labels);
assert_eq!(tracker.cardinality("test"), 1);
let labels2 = [KeyValue::new("outcome", "err")];
tracker.record("test", &labels2);
assert_eq!(tracker.cardinality("test"), 2);
}
#[test]
fn cardinality_limit_enforced() {
let tracker = CardinalityTracker::new();
for i in 0..5 {
let labels = [KeyValue::new("id", i.to_string())];
tracker.record("test", &labels);
}
assert_eq!(tracker.cardinality("test"), 5);
let labels = [KeyValue::new("id", "new")];
assert!(tracker.would_exceed("test", &labels, 5));
}
#[test]
fn cardinality_limit_zero_rejects_new_series() {
let tracker = CardinalityTracker::new();
let labels = [KeyValue::new("id", "first")];
assert!(
tracker.would_exceed("test", &labels, 0),
"zero-cardinality budget must reject unseen label sets"
);
assert!(tracker.check_and_record("test", &labels, 0));
assert_eq!(tracker.cardinality("test"), 0);
}
#[test]
fn cardinality_enforcement_is_atomic_under_concurrency() {
let tracker = Arc::new(CardinalityTracker::new());
let barrier = Arc::new(Barrier::new(8));
let handles: Vec<_> = (0..8)
.map(|i| {
let tracker = Arc::clone(&tracker);
let barrier = Arc::clone(&barrier);
std::thread::spawn(move || {
let labels = [KeyValue::new("id", i.to_string())];
barrier.wait();
!tracker.check_and_record("test", &labels, 1)
})
})
.collect();
let accepted = handles
.into_iter()
.map(|handle| handle.join().expect("thread join"))
.filter(|accepted| *accepted)
.count();
assert_eq!(accepted, 1, "exactly one series should fit under max=1");
assert_eq!(tracker.cardinality("test"), 1);
}
#[test]
fn cardinality_label_order_is_ignored() {
let tracker = CardinalityTracker::new();
let labels_a = [
KeyValue::new("outcome", "ok"),
KeyValue::new("region", "root"),
];
let labels_b = [
KeyValue::new("region", "root"),
KeyValue::new("outcome", "ok"),
];
tracker.record("test", &labels_a);
assert!(
!tracker.would_exceed("test", &labels_b, 1),
"label order should not increase cardinality"
);
tracker.record("test", &labels_b);
assert_eq!(tracker.cardinality("test"), 1);
}
#[test]
fn drop_labels_filtered() {
let exporter = OtelInMemoryExporter::default();
let reader = PeriodicReader::builder(exporter).build();
let provider = SdkMeterProvider::builder().with_reader(reader).build();
let meter = provider.meter("asupersync");
let config = MetricsConfig::new().with_drop_label("request_id");
let metrics = OtelMetrics::new_with_config(meter, config);
let labels = [
KeyValue::new("outcome", "ok"),
KeyValue::new("request_id", "12345"),
];
let filtered = metrics.check_cardinality("test", &labels);
assert!(filtered.is_some());
let filtered = filtered.unwrap();
assert_eq!(filtered.len(), 1);
assert_eq!(filtered[0].key.as_str(), "outcome");
provider.shutdown().expect("shutdown");
}
#[test]
fn aggregate_overflow_does_not_exceed_configured_budget() {
let exporter = OtelInMemoryExporter::default();
let reader = PeriodicReader::builder(exporter).build();
let provider = SdkMeterProvider::builder().with_reader(reader).build();
let meter = provider.meter("asupersync");
let config = MetricsConfig::new()
.with_max_cardinality(1)
.with_overflow_strategy(CardinalityOverflow::Aggregate);
let metrics = OtelMetrics::new_with_config(meter, config);
let first = [KeyValue::new("task_type", "fast")];
let second = [KeyValue::new("task_type", "slow")];
let first_labels = metrics
.check_cardinality("test.metric", &first)
.expect("first label set should fit");
assert_eq!(first_labels, first);
assert_eq!(metrics.cardinality_tracker.cardinality("test.metric"), 1);
assert!(
metrics.check_cardinality("test.metric", &second).is_none(),
"aggregate overflow must not create a second series beyond the configured cap"
);
assert_eq!(metrics.cardinality_tracker.cardinality("test.metric"), 1);
provider.shutdown().expect("shutdown");
}
#[test]
fn sampling_config() {
let sampling = SamplingConfig::new(0.5).with_sampled_metric("duration");
assert!((sampling.sample_rate - 0.5).abs() < f64::EPSILON);
assert_eq!(sampling.sampled_metrics.len(), 1);
}
#[test]
fn sampling_rate_clamped() {
let sampling = SamplingConfig::new(1.5);
assert!((sampling.sample_rate - 1.0).abs() < f64::EPSILON);
let sampling = SamplingConfig::new(-0.5);
assert!(sampling.sample_rate.abs() < f64::EPSILON);
}
}
#[cfg(test)]
mod exporter_tests {
use super::*;
#[test]
fn null_exporter_works() {
let exporter = NullExporter::new();
let snapshot = MetricsSnapshot::new();
assert!(exporter.export(&snapshot).is_ok());
assert!(exporter.flush().is_ok());
}
#[test]
fn in_memory_exporter_collects() {
let exporter = InMemoryExporter::new();
let mut snapshot = MetricsSnapshot::new();
snapshot.add_counter("test.counter", vec![], 42);
snapshot.add_gauge(
"test.gauge",
vec![("label".to_string(), "value".to_string())],
100,
);
snapshot.add_histogram("test.histogram", vec![], 10, 5.5);
assert!(exporter.export(&snapshot).is_ok());
assert_eq!(exporter.total_metrics(), 3);
let snapshots = exporter.snapshots();
assert_eq!(snapshots.len(), 1);
assert_eq!(snapshots[0].counters.len(), 1);
assert_eq!(snapshots[0].gauges.len(), 1);
assert_eq!(snapshots[0].histograms.len(), 1);
exporter.clear();
assert_eq!(exporter.total_metrics(), 0);
}
#[test]
fn multi_exporter_fans_out() {
struct ArcExporter(Arc<InMemoryExporter>);
impl MetricsExporter for ArcExporter {
fn export(&self, metrics: &MetricsSnapshot) -> Result<(), ExportError> {
self.0.export(metrics)
}
fn flush(&self) -> Result<(), ExportError> {
self.0.flush()
}
}
let exp1 = InMemoryExporter::new();
let exp2 = InMemoryExporter::new();
let exp1_arc = Arc::new(exp1);
let exp2_arc = Arc::new(exp2);
let mut multi = MultiExporter::new(vec![]);
multi.add(Box::new(ArcExporter(Arc::clone(&exp1_arc))));
multi.add(Box::new(ArcExporter(Arc::clone(&exp2_arc))));
assert_eq!(multi.len(), 2);
let mut snapshot = MetricsSnapshot::new();
snapshot.add_counter("test", vec![], 1);
assert!(multi.export(&snapshot).is_ok());
assert!(multi.flush().is_ok());
assert_eq!(exp1_arc.total_metrics(), 1);
assert_eq!(exp2_arc.total_metrics(), 1);
}
#[test]
fn metrics_snapshot_building() {
let mut snapshot = MetricsSnapshot::new();
snapshot.add_counter(
"requests",
vec![("method".to_string(), "GET".to_string())],
100,
);
snapshot.add_gauge("connections", vec![], 42);
snapshot.add_histogram("latency", vec![], 1000, 125.5);
assert_eq!(snapshot.counters.len(), 1);
assert_eq!(snapshot.gauges.len(), 1);
assert_eq!(snapshot.histograms.len(), 1);
let (name, labels, value) = &snapshot.counters[0];
assert_eq!(name, "requests");
assert_eq!(labels.len(), 1);
assert_eq!(*value, 100);
}
#[test]
fn export_error_display() {
let err = ExportError::new("test error");
assert!(err.to_string().contains("test error"));
}
#[test]
fn cardinality_overflow_debug_clone_copy_eq_default() {
let overflow = CardinalityOverflow::default();
assert_eq!(overflow, CardinalityOverflow::Drop);
let dbg = format!("{overflow:?}");
assert!(dbg.contains("Drop"));
let aggregate = CardinalityOverflow::Aggregate;
let cloned = aggregate;
assert_eq!(cloned, CardinalityOverflow::Aggregate);
assert_ne!(aggregate, CardinalityOverflow::Warn);
let warn = CardinalityOverflow::Warn;
let copied = warn;
assert_eq!(copied, warn);
}
#[test]
fn metrics_config_debug_clone_default() {
let config = MetricsConfig::default();
assert_eq!(config.max_cardinality, 1000);
assert_eq!(config.overflow_strategy, CardinalityOverflow::Drop);
assert!(config.drop_labels.is_empty());
assert!(config.sampling.is_none());
let dbg = format!("{config:?}");
assert!(dbg.contains("MetricsConfig"));
let cloned = config;
assert_eq!(cloned.max_cardinality, 1000);
}
#[test]
fn sampling_config_debug_clone_default() {
let config = SamplingConfig::default();
assert!((config.sample_rate - 1.0).abs() < f64::EPSILON);
assert!(config.sampled_metrics.is_empty());
let dbg = format!("{config:?}");
assert!(dbg.contains("SamplingConfig"));
let cloned = config;
assert!((cloned.sample_rate - 1.0).abs() < f64::EPSILON);
}
#[test]
fn metrics_snapshot_debug_clone_default() {
let snapshot = MetricsSnapshot::default();
assert!(snapshot.counters.is_empty());
assert!(snapshot.gauges.is_empty());
assert!(snapshot.histograms.is_empty());
let dbg = format!("{snapshot:?}");
assert!(dbg.contains("MetricsSnapshot"));
let mut s = MetricsSnapshot::new();
s.add_counter("c", vec![], 1);
let cloned = s.clone();
assert_eq!(cloned.counters.len(), 1);
}
#[test]
fn export_error_debug_clone() {
let err = ExportError::new("something failed");
let dbg = format!("{err:?}");
assert!(dbg.contains("ExportError"));
let cloned = err.clone();
assert_eq!(cloned.to_string(), err.to_string());
}
#[test]
fn stdout_exporter_debug_default() {
let exporter = StdoutExporter::default();
let dbg = format!("{exporter:?}");
assert!(dbg.contains("StdoutExporter"));
let with_prefix = StdoutExporter::with_prefix("[test] ");
let dbg2 = format!("{with_prefix:?}");
assert!(dbg2.contains("StdoutExporter"));
}
#[test]
fn null_exporter_debug_default() {
let exporter = NullExporter;
let dbg = format!("{exporter:?}");
assert!(dbg.contains("NullExporter"));
}
#[test]
fn multi_exporter_debug_default() {
let exporter = MultiExporter::default();
assert!(exporter.is_empty());
assert_eq!(exporter.len(), 0);
let dbg = format!("{exporter:?}");
assert!(dbg.contains("MultiExporter"));
}
#[test]
fn in_memory_exporter_debug_default() {
let exporter = InMemoryExporter::default();
assert_eq!(exporter.total_metrics(), 0);
let dbg = format!("{exporter:?}");
assert!(dbg.contains("InMemoryExporter"));
}
}
#[cfg(feature = "tracing-integration")]
pub mod span_semantics {
use opentelemetry::trace::{
SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState,
};
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
static NEXT_TEST_SPAN_SEED: AtomicU64 = AtomicU64::new(1);
static NEXT_TEST_TIME_TICK: AtomicU64 = AtomicU64::new(1);
fn next_test_trace_id() -> TraceId {
let seed = NEXT_TEST_SPAN_SEED.fetch_add(1, Ordering::Relaxed);
let hi = splitmix64(seed);
let lo = splitmix64(seed ^ 0x9e37_79b9_7f4a_7c15);
let trace_id = TraceId::from_bytes([
(hi >> 56) as u8,
(hi >> 48) as u8,
(hi >> 40) as u8,
(hi >> 32) as u8,
(hi >> 24) as u8,
(hi >> 16) as u8,
(hi >> 8) as u8,
hi as u8,
(lo >> 56) as u8,
(lo >> 48) as u8,
(lo >> 40) as u8,
(lo >> 32) as u8,
(lo >> 24) as u8,
(lo >> 16) as u8,
(lo >> 8) as u8,
lo as u8,
]);
if trace_id == TraceId::INVALID {
TraceId::from_bytes([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1])
} else {
trace_id
}
}
fn next_test_span_id() -> SpanId {
let seed = NEXT_TEST_SPAN_SEED.fetch_add(1, Ordering::Relaxed);
let raw = splitmix64(seed ^ 0xa5a5_a5a5_a5a5_a5a5);
let span_id = SpanId::from_bytes([
(raw >> 56) as u8,
(raw >> 48) as u8,
(raw >> 40) as u8,
(raw >> 32) as u8,
(raw >> 24) as u8,
(raw >> 16) as u8,
(raw >> 8) as u8,
raw as u8,
]);
if span_id == SpanId::INVALID {
SpanId::from_bytes([0, 0, 0, 0, 0, 0, 0, 1])
} else {
span_id
}
}
fn splitmix64(mut state: u64) -> u64 {
state = state.wrapping_add(0x9e37_79b9_7f4a_7c15);
let mut z = state;
z = (z ^ (z >> 30)).wrapping_mul(0xbf58_476d_1ce4_e5b9);
z = (z ^ (z >> 27)).wrapping_mul(0x94d0_49bb_1331_11eb);
z ^ (z >> 31)
}
fn next_test_time() -> SystemTime {
let tick = NEXT_TEST_TIME_TICK.fetch_add(1, Ordering::Relaxed);
UNIX_EPOCH + Duration::from_nanos(tick)
}
fn truncate_value(value: &str, max_len: Option<usize>) -> String {
match max_len {
Some(limit) => value.chars().take(limit).collect(),
None => value.to_string(),
}
}
#[derive(Debug, Clone)]
pub struct SpanConformanceConfig {
pub max_attributes: usize,
pub max_events: usize,
pub max_attribute_length: Option<usize>,
pub test_sampling: bool,
pub test_context_propagation: bool,
}
impl Default for SpanConformanceConfig {
fn default() -> Self {
Self {
max_attributes: 128,
max_events: 128,
max_attribute_length: None,
test_sampling: true,
test_context_propagation: true,
}
}
}
#[derive(Debug)]
pub struct SpanConformanceResult {
pub tests_run: usize,
pub tests_passed: usize,
pub tests_failed: usize,
pub failures: Vec<String>,
}
impl SpanConformanceResult {
pub fn new() -> Self {
Self {
tests_run: 0,
tests_passed: 0,
tests_failed: 0,
failures: Vec::new(),
}
}
pub fn record_pass(&mut self, _test_name: &str) {
self.tests_run += 1;
self.tests_passed += 1;
}
pub fn record_failure(&mut self, test_name: &str, reason: &str) {
self.tests_run += 1;
self.tests_failed += 1;
self.failures.push(format!("{}: {}", test_name, reason));
}
pub fn is_success(&self) -> bool {
self.tests_failed == 0
}
pub fn success_rate(&self) -> f64 {
if self.tests_run == 0 {
0.0
} else {
(self.tests_passed as f64 / self.tests_run as f64) * 100.0
}
}
}
#[derive(Debug)]
pub struct TestSpan {
pub context: SpanContext,
pub name: String,
pub kind: SpanKind,
pub start_time: SystemTime,
pub end_time: Option<SystemTime>,
pub attributes: HashMap<String, String>,
pub events: Vec<SpanEvent>,
pub status: Status,
pub parent_context: Option<SpanContext>,
pub baggage: HashMap<String, String>,
max_attributes: usize,
max_events: usize,
max_attribute_length: Option<usize>,
}
#[derive(Debug, Clone)]
pub struct SpanEvent {
pub name: String,
pub timestamp: SystemTime,
pub attributes: HashMap<String, String>,
}
impl TestSpan {
pub fn new(name: &str, kind: SpanKind) -> Self {
Self::new_with_config(name, kind, &SpanConformanceConfig::default())
}
pub fn new_with_config(name: &str, kind: SpanKind, config: &SpanConformanceConfig) -> Self {
let context = SpanContext::new(
next_test_trace_id(),
next_test_span_id(),
TraceFlags::SAMPLED,
false,
TraceState::default(),
);
Self::from_parts(
name,
kind,
context,
None,
HashMap::new(),
config.max_attributes,
config.max_events,
config.max_attribute_length,
)
}
pub fn new_child(&self, name: &str, kind: SpanKind) -> Self {
let parent_context = self.context.clone();
let context = SpanContext::new(
parent_context.trace_id(),
next_test_span_id(),
parent_context.trace_flags(),
false,
parent_context.trace_state().clone(),
);
Self::from_parts(
name,
kind,
context,
Some(parent_context),
self.baggage.clone(),
self.max_attributes,
self.max_events,
self.max_attribute_length,
)
}
pub fn child_from_remote_parent(
parent_context: SpanContext,
baggage: HashMap<String, String>,
name: &str,
kind: SpanKind,
config: &SpanConformanceConfig,
) -> Self {
let context = SpanContext::new(
parent_context.trace_id(),
next_test_span_id(),
parent_context.trace_flags(),
false,
parent_context.trace_state().clone(),
);
Self::from_parts(
name,
kind,
context,
Some(parent_context),
baggage,
config.max_attributes,
config.max_events,
config.max_attribute_length,
)
}
fn from_parts(
name: &str,
kind: SpanKind,
context: SpanContext,
parent_context: Option<SpanContext>,
baggage: HashMap<String, String>,
max_attributes: usize,
max_events: usize,
max_attribute_length: Option<usize>,
) -> Self {
Self {
context,
name: name.to_string(),
kind,
start_time: next_test_time(),
end_time: None,
attributes: HashMap::new(),
events: Vec::new(),
status: Status::Unset,
parent_context,
baggage,
max_attributes,
max_events,
max_attribute_length,
}
}
pub fn set_attribute(&mut self, key: &str, value: &str) {
let value = truncate_value(value, self.max_attribute_length);
if self.attributes.contains_key(key) || self.attributes.len() < self.max_attributes {
self.attributes.insert(key.to_string(), value);
}
}
pub fn set_baggage_item(&mut self, key: &str, value: &str) {
self.baggage.insert(key.to_string(), value.to_string());
}
pub fn add_event(&mut self, name: &str, mut attributes: HashMap<String, String>) {
if self.events.len() >= self.max_events {
return;
}
for value in attributes.values_mut() {
*value = truncate_value(value, self.max_attribute_length);
}
let event = SpanEvent {
name: name.to_string(),
timestamp: next_test_time(),
attributes,
};
self.events.push(event);
}
pub fn set_status(&mut self, status: Status) {
match status {
Status::Error { .. } => self.status = status,
Status::Ok => {
if !matches!(self.status, Status::Error { .. }) {
self.status = Status::Ok;
}
}
Status::Unset => {
if matches!(self.status, Status::Unset) {
self.status = Status::Unset;
}
}
}
}
pub fn end(&mut self) {
if self.end_time.is_none() {
self.end_time = Some(next_test_time());
}
}
pub fn duration(&self) -> Option<Duration> {
if let Some(end_time) = self.end_time {
end_time.duration_since(self.start_time).ok()
} else {
None
}
}
pub fn is_ended(&self) -> bool {
self.end_time.is_some()
}
}
pub fn run_span_conformance_tests() -> Result<SpanConformanceResult, Box<dyn std::error::Error>>
{
let config = SpanConformanceConfig::default();
run_span_conformance_tests_with_config(&config)
}
pub fn run_span_conformance_tests_with_config(
config: &SpanConformanceConfig,
) -> Result<SpanConformanceResult, Box<dyn std::error::Error>> {
let mut result = SpanConformanceResult::new();
test_span_lifecycle(&mut result, config);
test_span_hierarchy(&mut result, config);
test_span_attributes(&mut result, config);
test_span_events(&mut result, config);
test_span_status(&mut result, config);
test_span_context(&mut result, config);
if config.test_sampling {
test_span_sampling(&mut result, config);
}
if config.test_context_propagation {
test_context_propagation(&mut result, config);
}
Ok(result)
}
fn test_span_lifecycle(result: &mut SpanConformanceResult, _config: &SpanConformanceConfig) {
{
let mut span = TestSpan::new("test_span", SpanKind::Internal);
let start_time = span.start_time;
if span.is_ended() {
result.record_failure("span_lifecycle_start", "New span should not be ended");
return;
}
span.end();
if !span.is_ended() {
result.record_failure(
"span_lifecycle_end",
"Span should be ended after end() call",
);
return;
}
if let Some(duration) = span.duration() {
if duration.is_zero() && span.end_time.unwrap() < start_time {
result.record_failure(
"span_lifecycle_duration",
"End time should be >= start time",
);
return;
}
} else {
result.record_failure(
"span_lifecycle_duration",
"Ended span should have calculable duration",
);
return;
}
result.record_pass("span_lifecycle_basic");
}
{
let mut span = TestSpan::new("test_span_double_end", SpanKind::Internal);
span.end();
let first_end_time = span.end_time;
std::thread::sleep(Duration::from_millis(1));
span.end();
if span.end_time != first_end_time {
result.record_failure(
"span_lifecycle_idempotent",
"Multiple end() calls should be idempotent",
);
return;
}
result.record_pass("span_lifecycle_idempotent");
}
}
fn test_span_hierarchy(result: &mut SpanConformanceResult, _config: &SpanConformanceConfig) {
{
let parent = TestSpan::new("parent_span", SpanKind::Internal);
let child = parent.new_child("child_span", SpanKind::Internal);
if child.context.trace_id() != parent.context.trace_id() {
result.record_failure(
"span_hierarchy_trace_id",
"Child span should have same trace ID as parent",
);
return;
}
if child.context.span_id() == parent.context.span_id() {
result.record_failure(
"span_hierarchy_span_id",
"Child span should have different span ID from parent",
);
return;
}
if child.parent_context.is_none() {
result.record_failure(
"span_hierarchy_parent_context",
"Child span should have parent context",
);
return;
}
if child.parent_context.unwrap() != parent.context {
result.record_failure(
"span_hierarchy_parent_reference",
"Child span should reference correct parent context",
);
return;
}
result.record_pass("span_hierarchy_basic");
}
{
let grandparent = TestSpan::new("grandparent", SpanKind::Internal);
let parent = grandparent.new_child("parent", SpanKind::Internal);
let child = parent.new_child("child", SpanKind::Internal);
if child.context.trace_id() != grandparent.context.trace_id()
|| parent.context.trace_id() != grandparent.context.trace_id()
{
result.record_failure(
"span_hierarchy_multi_level",
"All spans in hierarchy should share trace ID",
);
return;
}
result.record_pass("span_hierarchy_multi_level");
}
}
fn test_span_attributes(result: &mut SpanConformanceResult, config: &SpanConformanceConfig) {
{
let mut span = TestSpan::new_with_config("test_span", SpanKind::Internal, config);
span.set_attribute("service.name", "test-service");
span.set_attribute("http.method", "GET");
if span.attributes.len() != 2 {
result.record_failure("span_attributes_basic", "Span should have 2 attributes");
return;
}
if span.attributes.get("service.name") != Some(&"test-service".to_string()) {
result.record_failure("span_attributes_basic", "Attribute value should match");
return;
}
result.record_pass("span_attributes_basic");
}
{
let mut span = TestSpan::new_with_config("test_span", SpanKind::Internal, config);
span.set_attribute("test.key", "original_value");
span.set_attribute("test.key", "new_value");
if span.attributes.get("test.key") != Some(&"new_value".to_string()) {
result.record_failure(
"span_attributes_overwrite",
"Attribute should be overwritten",
);
return;
}
result.record_pass("span_attributes_overwrite");
}
{
let mut span = TestSpan::new_with_config("test_span", SpanKind::Internal, config);
for i in 0..config.max_attributes + 10 {
span.set_attribute(&format!("attr_{}", i), "value");
}
if span.attributes.len() != config.max_attributes {
result.record_failure(
"span_attributes_limits",
"Attribute count should respect max_attributes",
);
return;
}
result.record_pass("span_attributes_limits");
}
if let Some(limit) = config.max_attribute_length {
let mut span = TestSpan::new_with_config("test_span", SpanKind::Internal, config);
let oversized = "x".repeat(limit + 5);
span.set_attribute("oversized", &oversized);
if span.attributes.get("oversized").map(String::len) != Some(limit) {
result.record_failure(
"span_attributes_value_length",
"Attribute values should respect max_attribute_length",
);
return;
}
result.record_pass("span_attributes_value_length");
}
}
fn test_span_events(result: &mut SpanConformanceResult, config: &SpanConformanceConfig) {
{
let mut span = TestSpan::new_with_config("test_span", SpanKind::Internal, config);
let mut event_attrs = HashMap::new();
event_attrs.insert("event.severity".to_string(), "info".to_string());
span.add_event("test_event", event_attrs);
if span.events.len() != 1 {
result.record_failure("span_events_basic", "Span should have 1 event");
return;
}
let event = &span.events[0];
if event.name != "test_event" {
result.record_failure("span_events_basic", "Event name should match");
return;
}
result.record_pass("span_events_basic");
}
{
let mut span = TestSpan::new_with_config("test_span", SpanKind::Internal, config);
span.add_event("first_event", HashMap::new());
std::thread::sleep(Duration::from_millis(1));
span.add_event("second_event", HashMap::new());
if span.events.len() != 2 {
result.record_failure("span_events_multiple", "Span should have 2 events");
return;
}
if span.events[0].timestamp > span.events[1].timestamp {
result.record_failure(
"span_events_ordering",
"Events should be in chronological order",
);
return;
}
result.record_pass("span_events_multiple");
}
{
let mut span = TestSpan::new_with_config("test_span", SpanKind::Internal, config);
for i in 0..config.max_events + 10 {
span.add_event(&format!("event_{}", i), HashMap::new());
}
if span.events.len() != config.max_events {
result.record_failure(
"span_events_limits",
"Event count should respect max_events",
);
return;
}
result.record_pass("span_events_limits");
}
}
fn test_span_status(result: &mut SpanConformanceResult, _config: &SpanConformanceConfig) {
{
let span = TestSpan::new("test_span", SpanKind::Internal);
if !matches!(span.status, Status::Unset) {
result.record_failure("span_status_default", "Default span status should be Unset");
return;
}
result.record_pass("span_status_default");
}
{
let mut span = TestSpan::new("test_span", SpanKind::Internal);
span.set_status(Status::Error {
description: "Something went wrong".into(),
});
if let Status::Error { description } = &span.status {
if description != "Something went wrong" {
result.record_failure("span_status_set", "Status description should match");
return;
}
} else {
result.record_failure("span_status_set", "Status should be Error");
return;
}
result.record_pass("span_status_set");
}
{
let mut span = TestSpan::new("test_span", SpanKind::Internal);
span.set_status(Status::Ok);
span.set_status(Status::Error {
description: "Error occurred".into(),
});
if !matches!(span.status, Status::Error { .. }) {
result.record_failure(
"span_status_precedence",
"Error status should take precedence",
);
return;
}
result.record_pass("span_status_precedence");
}
}
fn test_span_context(result: &mut SpanConformanceResult, _config: &SpanConformanceConfig) {
{
let span1 = TestSpan::new("span1", SpanKind::Internal);
let span2 = TestSpan::new("span2", SpanKind::Internal);
if span1.context.span_id() == span2.context.span_id() {
result.record_failure(
"span_context_unique_ids",
"Different spans should have different span IDs",
);
return;
}
result.record_pass("span_context_unique_ids");
}
{
let span = TestSpan::new("test_span", SpanKind::Internal);
let trace_id = span.context.trace_id();
if trace_id == TraceId::INVALID {
result.record_failure(
"span_context_trace_id",
"Trace ID should not be invalid/zero",
);
return;
}
result.record_pass("span_context_trace_id");
}
{
let span = TestSpan::new("test_span", SpanKind::Internal);
let span_id = span.context.span_id();
if span_id == SpanId::INVALID {
result.record_failure("span_context_span_id", "Span ID should not be invalid/zero");
return;
}
result.record_pass("span_context_span_id");
}
}
fn test_span_sampling(result: &mut SpanConformanceResult, _config: &SpanConformanceConfig) {
{
let span = TestSpan::new("test_span", SpanKind::Internal);
if !span.context.trace_flags().is_sampled() {
result.record_failure("span_sampling_flag", "Test spans should be sampled");
return;
}
result.record_pass("span_sampling_basic");
}
{
let parent = TestSpan::new("parent", SpanKind::Internal);
let child = parent.new_child("child", SpanKind::Internal);
if parent.context.trace_flags().is_sampled() != child.context.trace_flags().is_sampled()
{
result.record_failure(
"span_sampling_inheritance",
"Child should inherit parent sampling decision",
);
return;
}
result.record_pass("span_sampling_inheritance");
}
}
fn test_context_propagation(
result: &mut SpanConformanceResult,
config: &SpanConformanceConfig,
) {
{
let trace_id = TraceId::from_bytes([
0x12, 0x34, 0x56, 0x78, 0x90, 0xab, 0xcd, 0xef, 0x12, 0x34, 0x56, 0x78, 0x90, 0xab,
0xcd, 0xef,
]);
let span_id = SpanId::from_bytes([0x12, 0x34, 0x56, 0x78, 0x90, 0xab, 0xcd, 0xef]);
let trace_state =
TraceState::from_key_value([("vendor", "upstream")]).expect("valid trace state");
let incoming_context = SpanContext::new(
trace_id,
span_id,
TraceFlags::SAMPLED,
true,
trace_state.clone(),
);
let mut baggage = HashMap::new();
baggage.insert("tenant".to_string(), "alpha".to_string());
let child = TestSpan::child_from_remote_parent(
incoming_context.clone(),
baggage,
"remote_child",
SpanKind::Server,
config,
);
if child.context.trace_id() != incoming_context.trace_id() {
result.record_failure(
"context_propagation_trace_id",
"Trace ID should be preserved across boundaries",
);
return;
}
if child.context.trace_flags() != incoming_context.trace_flags() {
result.record_failure(
"context_propagation_flags",
"Trace flags should be preserved",
);
return;
}
if !incoming_context.is_remote() || child.context.is_remote() {
result.record_failure(
"context_propagation_remote_flag",
"Incoming context should stay remote while child becomes local",
);
return;
}
result.record_pass("context_propagation_basic");
}
{
let trace_state =
TraceState::from_key_value([("vendor", "upstream")]).expect("valid trace state");
let incoming_context = SpanContext::new(
TraceId::from_bytes([
0xaa, 0xaa, 0xaa, 0xaa, 0xbb, 0xbb, 0xbb, 0xbb, 0xcc, 0xcc, 0xcc, 0xcc, 0xdd,
0xdd, 0xdd, 0xdd,
]),
SpanId::from_bytes([0x11; 8]),
TraceFlags::SAMPLED,
true,
trace_state,
);
let child = TestSpan::child_from_remote_parent(
incoming_context,
HashMap::new(),
"remote_child",
SpanKind::Consumer,
config,
);
if child.context.trace_state().get("vendor") != Some("upstream") {
result.record_failure(
"context_propagation_state",
"TraceState should propagate to child spans",
);
return;
}
result.record_pass("context_propagation_state");
}
{
let incoming_context = SpanContext::new(
TraceId::from_bytes([
0xee, 0xee, 0xee, 0xee, 0xff, 0xff, 0xff, 0xff, 0x11, 0x11, 0x11, 0x11, 0x22,
0x22, 0x22, 0x22,
]),
SpanId::from_bytes([0x22; 8]),
TraceFlags::SAMPLED,
true,
TraceState::default(),
);
let mut baggage = HashMap::new();
baggage.insert("tenant".to_string(), "alpha".to_string());
baggage.insert("request.class".to_string(), "gold".to_string());
let child = TestSpan::child_from_remote_parent(
incoming_context,
baggage,
"remote_child",
SpanKind::Server,
config,
);
if child.baggage.get("tenant").map(String::as_str) != Some("alpha")
|| child.baggage.get("request.class").map(String::as_str) != Some("gold")
{
result.record_failure(
"context_propagation_baggage",
"Baggage should propagate across service boundaries",
);
return;
}
result.record_pass("context_propagation_baggage");
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::{Value, json};
use std::collections::BTreeMap;
fn scrub_span_field(key: &str, value: &str) -> String {
match key {
"trace_id" | "span_id" | "parent_span_id" => "[ID]".to_string(),
"start_time" | "end_time" | "timestamp" => "[TIMESTAMP]".to_string(),
"request_id" | "traceparent" => "[ID]".to_string(),
_ => value.to_string(),
}
}
fn sorted_string_map_snapshot(map: &HashMap<String, String>) -> BTreeMap<String, String> {
map.iter()
.map(|(key, value)| (key.clone(), scrub_span_field(key, value)))
.collect()
}
fn span_status_snapshot(status: &Status) -> Value {
match status {
Status::Unset => json!({"kind": "unset"}),
Status::Ok => json!({"kind": "ok"}),
Status::Error { description } => json!({
"kind": "error",
"description": description,
}),
}
}
fn span_event_snapshot(event: &SpanEvent) -> Value {
json!({
"name": event.name,
"timestamp": "[TIMESTAMP]",
"attributes": sorted_string_map_snapshot(&event.attributes),
})
}
fn test_span_snapshot(span: &TestSpan) -> Value {
json!({
"name": span.name,
"kind": format!("{:?}", span.kind),
"trace_id": "[ID]",
"span_id": "[ID]",
"parent_span_id": span.parent_context.as_ref().map(|_| "[ID]"),
"is_remote": span.context.is_remote(),
"sampled": span.context.trace_flags().is_sampled(),
"trace_state_vendor": span.context.trace_state().get("vendor"),
"start_time": "[TIMESTAMP]",
"end_time": span.end_time.map(|_| "[TIMESTAMP]"),
"status": span_status_snapshot(&span.status),
"attributes": sorted_string_map_snapshot(&span.attributes),
"baggage": sorted_string_map_snapshot(&span.baggage),
"events": span.events.iter().map(span_event_snapshot).collect::<Vec<_>>(),
})
}
fn otlp_attributes_snapshot(map: &HashMap<String, String>) -> Vec<Value> {
let mut entries: Vec<_> = map.iter().collect();
entries.sort_by(|(left, _), (right, _)| left.cmp(right));
entries
.into_iter()
.map(|(key, value)| {
json!({
"key": key,
"value": {
"string_value": scrub_span_field(key, value),
}
})
})
.collect()
}
fn otlp_metric_labels_snapshot(labels: &[(String, String)]) -> Vec<Value> {
let mut entries: Vec<_> = labels.iter().collect();
entries.sort_by(|(left, _), (right, _)| left.cmp(right));
entries
.into_iter()
.map(|(key, value)| {
json!({
"key": key,
"value": {
"string_value": value,
}
})
})
.collect()
}
fn otlp_status_snapshot(status: &Status) -> Value {
match status {
Status::Unset => json!({"code": 0, "message": ""}),
Status::Ok => json!({"code": 1, "message": ""}),
Status::Error { description } => json!({
"code": 2,
"message": description,
}),
}
}
fn otlp_event_wire_snapshot(event: &SpanEvent) -> Value {
json!({
"name": event.name,
"time_unix_nano": "[TIMESTAMP]",
"attributes": otlp_attributes_snapshot(&event.attributes),
})
}
fn otlp_span_wire_snapshot(span: &TestSpan) -> Value {
json!({
"trace_id": "[ID]",
"span_id": "[ID]",
"parent_span_id": span.parent_context.as_ref().map(|_| "[ID]").unwrap_or(""),
"name": span.name,
"kind": format!("{:?}", span.kind),
"start_time_unix_nano": "[TIMESTAMP]",
"end_time_unix_nano": span.end_time.map(|_| "[TIMESTAMP]"),
"attributes": otlp_attributes_snapshot(&span.attributes),
"events": span.events.iter().map(otlp_event_wire_snapshot).collect::<Vec<_>>(),
"status": otlp_status_snapshot(&span.status),
"trace_state_vendor": span.context.trace_state().get("vendor"),
"sampled": span.context.trace_flags().is_sampled(),
})
}
fn otlp_metrics_wire_snapshot(snapshot: &MetricsSnapshot) -> Value {
let mut counters: Vec<_> = snapshot.counters.iter().collect();
counters.sort_by(|(left, _, _), (right, _, _)| left.cmp(right));
let mut gauges: Vec<_> = snapshot.gauges.iter().collect();
gauges.sort_by(|(left, _, _), (right, _, _)| left.cmp(right));
let mut histograms: Vec<_> = snapshot.histograms.iter().collect();
histograms.sort_by(|(left, _, _, _), (right, _, _, _)| left.cmp(right));
json!({
"scope_metrics": [{
"scope": {
"name": "asupersync.observability.otel",
"version": "0.2.9",
},
"metrics": {
"counters": counters.into_iter().map(|(name, labels, value)| {
json!({
"name": name,
"sum": {
"data_points": [{
"attributes": otlp_metric_labels_snapshot(labels),
"as_int": value,
}]
}
})
}).collect::<Vec<_>>(),
"gauges": gauges.into_iter().map(|(name, labels, value)| {
json!({
"name": name,
"gauge": {
"data_points": [{
"attributes": otlp_metric_labels_snapshot(labels),
"as_int": value,
}]
}
})
}).collect::<Vec<_>>(),
"histograms": histograms.into_iter().map(|(name, labels, count, sum)| {
json!({
"name": name,
"histogram": {
"data_points": [{
"attributes": otlp_metric_labels_snapshot(labels),
"count": count,
"sum": sum,
}]
}
})
}).collect::<Vec<_>>(),
}
}]
})
}
fn otlp_log_record_snapshot(body: &str, attributes: HashMap<String, String>) -> Value {
json!({
"time_unix_nano": "[TIMESTAMP]",
"trace_id": "[ID]",
"span_id": "[ID]",
"severity_text": "INFO",
"body": body,
"attributes": otlp_attributes_snapshot(&attributes),
})
}
#[test]
fn test_span_conformance_config_default() {
let config = SpanConformanceConfig::default();
assert_eq!(config.max_attributes, 128);
assert_eq!(config.max_events, 128);
assert!(config.test_sampling);
assert!(config.test_context_propagation);
}
#[test]
fn test_span_conformance_result() {
let mut result = SpanConformanceResult::new();
assert_eq!(result.tests_run, 0);
assert!(result.is_success());
result.record_pass("test1");
assert_eq!(result.tests_run, 1);
assert_eq!(result.tests_passed, 1);
assert!(result.is_success());
result.record_failure("test2", "failed");
assert_eq!(result.tests_run, 2);
assert_eq!(result.tests_failed, 1);
assert!(!result.is_success());
assert_eq!(result.success_rate(), 50.0);
}
#[test]
fn test_span_basic_operations() {
let mut span = TestSpan::new("test", SpanKind::Internal);
assert!(!span.is_ended());
assert!(span.duration().is_none());
span.set_attribute("key", "value");
assert_eq!(span.attributes.get("key"), Some(&"value".to_string()));
span.add_event("event", HashMap::new());
assert_eq!(span.events.len(), 1);
span.end();
assert!(span.is_ended());
assert!(span.duration().is_some());
}
#[test]
fn test_span_end_is_idempotent() {
let mut span = TestSpan::new("test", SpanKind::Internal);
span.end();
let first_end_time = span.end_time;
span.end();
assert_eq!(span.end_time, first_end_time);
}
#[test]
fn test_span_hierarchy() {
let parent = TestSpan::new("parent", SpanKind::Internal);
let child = parent.new_child("child", SpanKind::Internal);
assert_eq!(child.context.trace_id(), parent.context.trace_id());
assert_ne!(child.context.span_id(), parent.context.span_id());
assert!(child.parent_context.is_some());
assert_eq!(child.parent_context.unwrap(), parent.context);
}
#[test]
fn test_span_remote_parent_propagates_trace_state_and_baggage() {
let config = SpanConformanceConfig {
max_attributes: 8,
max_events: 8,
max_attribute_length: Some(8),
test_sampling: true,
test_context_propagation: true,
};
let trace_state =
TraceState::from_key_value([("vendor", "edge")]).expect("valid trace state");
let remote_parent = SpanContext::new(
TraceId::from_bytes([
0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x11, 0x12, 0x13, 0x14, 0x15,
0x16, 0x17, 0x18,
]),
SpanId::from_bytes([0x11; 8]),
TraceFlags::SAMPLED,
true,
trace_state,
);
let mut baggage = HashMap::new();
baggage.insert("tenant".to_string(), "alpha".to_string());
let child = TestSpan::child_from_remote_parent(
remote_parent,
baggage,
"child",
SpanKind::Server,
&config,
);
assert_eq!(child.context.trace_state().get("vendor"), Some("edge"));
assert_eq!(
child.baggage.get("tenant").map(String::as_str),
Some("alpha")
);
assert!(!child.context.is_remote());
assert!(child.parent_context.expect("parent").is_remote());
}
#[test]
fn test_span_attribute_and_event_limits_are_enforced() {
let config = SpanConformanceConfig {
max_attributes: 2,
max_events: 1,
max_attribute_length: Some(4),
test_sampling: true,
test_context_propagation: true,
};
let mut span = TestSpan::new_with_config("test", SpanKind::Internal, &config);
span.set_attribute("k1", "value");
span.set_attribute("k2", "value");
span.set_attribute("k3", "value");
assert_eq!(span.attributes.len(), 2);
assert_eq!(span.attributes.get("k1").map(String::as_str), Some("valu"));
span.add_event("one", HashMap::new());
span.add_event("two", HashMap::new());
assert_eq!(span.events.len(), 1);
}
#[test]
fn test_span_timestamps_are_monotonic() {
let mut span = TestSpan::new("test", SpanKind::Internal);
let start_time = span.start_time;
span.add_event("first", HashMap::new());
span.add_event("second", HashMap::new());
span.end();
let first_event = &span.events[0];
let second_event = &span.events[1];
let end_time = span.end_time.expect("span end time");
assert!(first_event.timestamp >= start_time);
assert!(second_event.timestamp >= first_event.timestamp);
assert!(end_time >= second_event.timestamp);
assert!(span.duration().is_some());
}
#[test]
fn run_basic_conformance_tests() {
let config = SpanConformanceConfig::default();
let result = run_span_conformance_tests_with_config(&config)
.expect("Conformance tests should run");
assert!(result.tests_run > 0);
assert!(
result.is_success(),
"span conformance failures: {:?}",
result.failures
);
}
#[test]
fn span_export_snapshot_scrubs_ids_and_timestamps() {
let config = SpanConformanceConfig {
max_attributes: 4,
max_events: 2,
max_attribute_length: Some(16),
test_sampling: true,
test_context_propagation: true,
};
let mut parent = TestSpan::new_with_config("checkout", SpanKind::Server, &config);
parent.set_attribute("component", "orders");
parent.set_attribute("request_id", "req-7c1f7ecf-54ff-4ac8-8ec5-6aa64500a161");
parent.set_baggage_item("tenant", "alpha");
parent.add_event(
"db.query",
HashMap::from([
("statement".to_string(), "select".to_string()),
(
"traceparent".to_string(),
"00-abcdef-0123456789".to_string(),
),
]),
);
parent.set_status(Status::Error {
description: "timeout".into(),
});
parent.end();
let remote_parent = SpanContext::new(
TraceId::from_bytes([
0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x11, 0x12, 0x13, 0x14, 0x15,
0x16, 0x17, 0x18,
]),
SpanId::from_bytes([0x11; 8]),
TraceFlags::SAMPLED,
true,
TraceState::from_key_value([("vendor", "edge")]).expect("valid trace state"),
);
let remote_child = TestSpan::child_from_remote_parent(
remote_parent,
HashMap::from([("tenant".to_string(), "alpha".to_string())]),
"cache.lookup",
SpanKind::Client,
&config,
);
insta::assert_json_snapshot!(
"span_export_scrubbed",
json!({
"parent": test_span_snapshot(&parent),
"remote_child": test_span_snapshot(&remote_child),
})
);
}
#[test]
fn span_export_format_snapshot_scrubs_ids_and_timestamps() {
let config = SpanConformanceConfig {
max_attributes: 6,
max_events: 3,
max_attribute_length: Some(20),
test_sampling: true,
test_context_propagation: true,
};
let mut happy_path =
TestSpan::new_with_config("http.request", SpanKind::Server, &config);
happy_path.set_attribute("service.name", "checkout");
happy_path.set_attribute("http.method", "POST");
happy_path.add_event(
"response.sent",
HashMap::from([("status_code".to_string(), "200".to_string())]),
);
happy_path.set_status(Status::Ok);
happy_path.end();
let mut error_path = TestSpan::new_with_config("db.query", SpanKind::Client, &config);
error_path.set_attribute("db.system", "postgresql");
error_path.set_attribute("db.operation", "select");
error_path.add_event(
"db.error",
HashMap::from([
("error.kind".to_string(), "timeout".to_string()),
("statement".to_string(), "select * from orders".to_string()),
]),
);
error_path.set_status(Status::Error {
description: "deadline exceeded".into(),
});
error_path.end();
let mut root = TestSpan::new_with_config("batch.import", SpanKind::Producer, &config);
root.set_attribute("job.name", "nightly-import");
root.set_baggage_item("tenant", "alpha");
let mut decode_child = root.new_child("decode.payload", SpanKind::Internal);
decode_child.set_attribute("stage", "decode");
decode_child.add_event(
"payload.decoded",
HashMap::from([("records".to_string(), "42".to_string())]),
);
decode_child.set_status(Status::Ok);
decode_child.end();
let mut publish_child = root.new_child("publish.kafka", SpanKind::Producer);
publish_child.set_attribute("messaging.system", "kafka");
publish_child.add_event(
"broker.ack",
HashMap::from([("partition".to_string(), "7".to_string())]),
);
publish_child.set_status(Status::Ok);
publish_child.end();
root.add_event(
"pipeline.completed",
HashMap::from([("children".to_string(), "2".to_string())]),
);
root.set_status(Status::Ok);
root.end();
insta::assert_json_snapshot!(
"span_export_format_scrubbed",
json!({
"happy_path": test_span_snapshot(&happy_path),
"error_path": test_span_snapshot(&error_path),
"multi_span_trace": [
test_span_snapshot(&root),
test_span_snapshot(&decode_child),
test_span_snapshot(&publish_child),
],
})
);
}
#[test]
fn otlp_wire_format_scrubbed() {
let config = SpanConformanceConfig {
max_attributes: 6,
max_events: 3,
max_attribute_length: Some(24),
test_sampling: true,
test_context_propagation: true,
};
let mut root = TestSpan::new_with_config("otlp.export", SpanKind::Server, &config);
root.set_attribute("service.name", "checkout");
root.set_attribute("deployment.environment", "staging");
root.add_event(
"request.accepted",
HashMap::from([("route".to_string(), "/v1/orders".to_string())]),
);
root.set_status(Status::Ok);
root.end();
let mut child = root.new_child("postgres.query", SpanKind::Client);
child.set_attribute("db.system", "postgresql");
child.set_attribute("db.operation", "select");
child.add_event(
"row.batch",
HashMap::from([("rows".to_string(), "3".to_string())]),
);
child.set_status(Status::Error {
description: "deadline exceeded".into(),
});
child.end();
let mut metrics = MetricsSnapshot::new();
metrics.add_counter(
"otel.export.spans",
vec![("signal".to_string(), "traces".to_string())],
2,
);
metrics.add_gauge(
"otel.export.queue_depth",
vec![("pipeline".to_string(), "primary".to_string())],
1,
);
metrics.add_histogram(
"otel.export.latency_ms",
vec![("signal".to_string(), "mixed".to_string())],
2,
17.5,
);
insta::assert_json_snapshot!(
"otlp_wire_format_scrubbed",
json!({
"resource_spans": [{
"resource": {
"attributes": [
{"key": "service.name", "value": {"string_value": "checkout"}},
{"key": "telemetry.sdk.name", "value": {"string_value": "asupersync"}},
]
},
"scope_spans": [{
"scope": {
"name": "asupersync.observability.otel",
"version": "0.2.9",
},
"spans": [
otlp_span_wire_snapshot(&root),
otlp_span_wire_snapshot(&child),
],
}]
}],
"resource_metrics": [otlp_metrics_wire_snapshot(&metrics)],
"resource_logs": [{
"scope_logs": [{
"scope": {
"name": "asupersync.observability.otel",
"version": "0.2.9",
},
"log_records": [
otlp_log_record_snapshot(
"export started",
HashMap::from([
("component".to_string(), "otlp".to_string()),
("signal".to_string(), "traces".to_string()),
]),
),
otlp_log_record_snapshot(
"export retry scheduled",
HashMap::from([
("component".to_string(), "otlp".to_string()),
("retry_in_ms".to_string(), "250".to_string()),
]),
),
],
}]
}],
})
);
}
}
}
#[cfg(not(feature = "tracing-integration"))]
pub mod span_semantics {
#[derive(Debug)]
pub struct SpanConformanceResult {
pub tests_run: usize,
pub tests_passed: usize,
pub tests_failed: usize,
pub failures: Vec<String>,
}
impl SpanConformanceResult {
pub fn is_success(&self) -> bool {
self.tests_failed == 0
}
pub fn success_rate(&self) -> f64 {
0.0
}
}
pub fn run_span_conformance_tests() -> Result<SpanConformanceResult, Box<dyn std::error::Error>>
{
Err("OpenTelemetry span semantics testing requires 'tracing-integration' feature".into())
}
}