use super::context::RegionTag;
use super::id::TraceId;
use super::span::{SymbolSpan, SymbolSpanKind, SymbolSpanStatus};
use crate::types::Time;
use crate::types::symbol::ObjectId;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::time::Duration;
#[derive(Clone, Debug)]
pub struct TraceRecord {
pub trace_id: TraceId,
pub object_id: Option<ObjectId>,
pub first_seen: Time,
pub last_updated: Time,
pub spans: Vec<SymbolSpan>,
pub regions: Vec<RegionTag>,
pub is_complete: bool,
}
#[derive(Clone, Debug)]
pub struct TraceSummary {
pub trace_id: TraceId,
pub object_id: Option<ObjectId>,
pub span_count: usize,
pub symbols_encoded: u32,
pub symbols_transmitted: u32,
pub symbols_received: u32,
pub symbols_dropped: u32,
pub end_to_end_latency: Option<Duration>,
pub encode_duration: Option<Duration>,
pub transmit_duration_median: Option<Duration>,
pub decode_duration: Option<Duration>,
pub regions: Vec<String>,
pub success: bool,
pub error: Option<String>,
}
pub struct SymbolTraceCollector {
traces: RwLock<HashMap<TraceId, TraceRecord>>,
max_traces: usize,
max_age: Duration,
clock_skew_tolerance: Duration,
local_region: RegionTag,
}
impl SymbolTraceCollector {
#[must_use]
pub fn new(local_region: RegionTag) -> Self {
Self {
traces: RwLock::new(HashMap::new()),
max_traces: 10_000,
max_age: Duration::from_hours(1),
clock_skew_tolerance: Duration::from_millis(100),
local_region,
}
}
#[must_use]
pub fn local_region(&self) -> &RegionTag {
&self.local_region
}
#[must_use]
pub fn with_max_traces(mut self, max: usize) -> Self {
self.max_traces = max;
self
}
#[must_use]
pub fn with_max_age(mut self, age: Duration) -> Self {
self.max_age = age;
self
}
#[must_use]
pub fn with_clock_skew_tolerance(mut self, tolerance: Duration) -> Self {
self.clock_skew_tolerance = tolerance;
self
}
#[must_use]
pub const fn clock_skew_tolerance(&self) -> Duration {
self.clock_skew_tolerance
}
pub fn record_span(&self, span: &SymbolSpan, now: Time) {
let trace_id = span.context().trace_id();
let mut traces = self.traces.write();
let record = traces.entry(trace_id).or_insert_with(|| TraceRecord {
trace_id,
object_id: span.object_id(),
first_seen: now,
last_updated: now,
spans: Vec::new(),
regions: Vec::new(),
is_complete: false,
});
record.last_updated = now;
if record.object_id.is_none() {
record.object_id = span.object_id();
}
record.spans.push(span.clone());
let region = span.context().origin_region().clone();
if !record.regions.contains(®ion) {
record.regions.push(region);
}
if span.kind() == SymbolSpanKind::Decode
&& matches!(
span.status(),
SymbolSpanStatus::Ok | SymbolSpanStatus::Error
)
{
record.is_complete = true;
}
if traces.len() > self.max_traces {
self.evict_oldest(&mut traces, now);
}
}
#[must_use]
pub fn get_trace(&self, trace_id: TraceId) -> Option<TraceRecord> {
self.traces.read().get(&trace_id).cloned()
}
#[must_use]
pub fn get_summary(&self, trace_id: TraceId) -> Option<TraceSummary> {
let record = {
let traces = self.traces.read();
traces.get(&trace_id)?.clone()
};
let mut symbols_encoded = 0u32;
let mut symbols_transmitted = 0u32;
let mut symbols_received = 0u32;
let mut symbols_dropped = 0u32;
let mut encode_duration = None;
let mut decode_duration = None;
let mut transmit_durations = Vec::new();
let mut first_encode_time: Option<Time> = None;
let mut decode_complete_time: Option<Time> = None;
let mut error = None;
for span in &record.spans {
match span.kind() {
SymbolSpanKind::Encode => {
if let Some(count) = span.symbol_count() {
symbols_encoded = symbols_encoded.saturating_add(count);
}
if encode_duration.is_none() {
encode_duration =
span.duration().map(|t| Duration::from_nanos(t.as_nanos()));
}
if first_encode_time.is_none() {
first_encode_time = Some(span.start_time());
}
}
SymbolSpanKind::Transmit => {
symbols_transmitted = symbols_transmitted.saturating_add(1);
if let Some(d) = span.duration() {
transmit_durations.push(Duration::from_nanos(d.as_nanos()));
}
if span.status() == SymbolSpanStatus::Dropped {
symbols_dropped = symbols_dropped.saturating_add(1);
}
}
SymbolSpanKind::Receive => {
symbols_received = symbols_received.saturating_add(1);
}
SymbolSpanKind::Decode => {
decode_duration = span.duration().map(|t| Duration::from_nanos(t.as_nanos()));
if let Some(end) = span.end_time() {
decode_complete_time = Some(end);
}
if span.status() == SymbolSpanStatus::Error {
error = span.error_message().map(ToString::to_string);
}
}
_ => {}
}
}
let end_to_end_latency = match (first_encode_time, decode_complete_time) {
(Some(start), Some(end)) => Some(Duration::from_nanos(end.duration_since(start))),
_ => None,
};
let transmit_duration_median = if transmit_durations.is_empty() {
None
} else {
transmit_durations.sort_by_key(Duration::as_nanos);
let mid = transmit_durations.len() / 2;
Some(transmit_durations[mid])
};
Some(TraceSummary {
trace_id,
object_id: record.object_id,
span_count: record.spans.len(),
symbols_encoded,
symbols_transmitted,
symbols_received,
symbols_dropped,
end_to_end_latency,
encode_duration,
transmit_duration_median,
decode_duration,
regions: record
.regions
.iter()
.map(|r| r.as_str().to_string())
.collect(),
success: record.is_complete && error.is_none(),
error,
})
}
#[must_use]
pub fn active_traces(&self) -> Vec<TraceId> {
self.traces
.read()
.iter()
.filter(|(_, r)| !r.is_complete)
.map(|(id, _)| *id)
.collect()
}
#[must_use]
pub fn complete_traces(&self) -> Vec<TraceId> {
self.traces
.read()
.iter()
.filter(|(_, r)| r.is_complete)
.map(|(id, _)| *id)
.collect()
}
fn evict_oldest(&self, traces: &mut HashMap<TraceId, TraceRecord>, now: Time) {
let mut to_remove: Vec<_> = traces
.iter()
.filter(|(_, r)| r.is_complete)
.map(|(id, r)| (*id, r.last_updated))
.collect();
to_remove.sort_by_key(|(_, updated)| *updated);
let remove_count = (traces.len() / 10).max(1);
for (id, _) in to_remove.into_iter().take(remove_count) {
traces.remove(&id);
}
if traces.len() > self.max_traces {
let mut incomplete: Vec<_> = traces
.iter()
.filter(|(_, r)| !r.is_complete)
.map(|(id, r)| (*id, r.last_updated))
.collect();
incomplete.sort_by_key(|(_, updated)| *updated);
let excess = traces.len() - self.max_traces;
for (id, _) in incomplete.into_iter().take(excess) {
traces.remove(&id);
}
}
let max_age_nanos = self.max_age.as_nanos().min(u128::from(u64::MAX)) as u64;
let cutoff = now.saturating_sub_nanos(max_age_nanos);
traces.retain(|_, r| r.last_updated >= cutoff);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::trace::distributed::context::{RegionTag, SymbolTraceContext};
use crate::trace::distributed::id::{SymbolSpanId, TraceId};
use crate::trace::distributed::span::SymbolSpan;
use crate::types::symbol::SymbolId;
use crate::util::DetRng;
#[test]
fn collector_records_spans() {
let collector = SymbolTraceCollector::new(RegionTag::new("test"));
let mut rng = DetRng::new(42);
let trace_id = TraceId::new_for_test(1);
let ctx = SymbolTraceContext::new_for_encoding(
trace_id,
SymbolSpanId::NIL,
RegionTag::new("us-east-1"),
&mut rng,
);
let span = SymbolSpan::new_encode(ctx, ObjectId::new_for_test(1), Time::from_millis(0));
collector.record_span(&span, Time::from_millis(0));
let record = collector.get_trace(trace_id).expect("trace should exist");
assert_eq!(record.spans.len(), 1);
assert_eq!(record.regions.len(), 1);
}
#[test]
fn collector_detects_completion() {
let collector = SymbolTraceCollector::new(RegionTag::new("test"));
let mut rng = DetRng::new(7);
let trace_id = TraceId::new_for_test(2);
let ctx = SymbolTraceContext::new_for_encoding(
trace_id,
SymbolSpanId::NIL,
RegionTag::new("sender"),
&mut rng,
);
let mut decode_span =
SymbolSpan::new_decode(ctx, ObjectId::new_for_test(2), 4, Time::from_millis(100));
decode_span.complete_ok(Time::from_millis(120));
collector.record_span(&decode_span, Time::from_millis(120));
let record = collector.get_trace(trace_id).expect("trace should exist");
assert!(record.is_complete);
assert_eq!(collector.complete_traces(), vec![trace_id]);
}
#[test]
fn trace_summary_calculations() {
let collector = SymbolTraceCollector::new(RegionTag::new("test"));
let mut rng = DetRng::new(42);
let trace_id = TraceId::new_for_test(3);
let object_id = ObjectId::new_for_test(3);
let ctx = SymbolTraceContext::new_for_encoding(
trace_id,
SymbolSpanId::NIL,
RegionTag::new("sender"),
&mut rng,
);
let mut encode_span = SymbolSpan::new_encode(ctx.clone(), object_id, Time::from_millis(0));
encode_span.set_symbol_count(10);
encode_span.complete_ok(Time::from_millis(100));
collector.record_span(&encode_span, Time::from_millis(100));
for i in 0..10 {
let mut tx_span = SymbolSpan::new_transmit(
ctx.child(&mut rng),
SymbolId::new_for_test(3, 0, i),
Time::from_millis(100 + u64::from(i) * 10),
);
tx_span.complete_ok(Time::from_millis(150 + u64::from(i) * 10));
collector.record_span(&tx_span, Time::from_millis(150 + u64::from(i) * 10));
}
let mut decode_span =
SymbolSpan::new_decode(ctx.child(&mut rng), object_id, 10, Time::from_millis(300));
decode_span.complete_ok(Time::from_millis(400));
collector.record_span(&decode_span, Time::from_millis(400));
let summary = collector
.get_summary(trace_id)
.expect("summary should exist");
assert_eq!(summary.symbols_encoded, 10);
assert_eq!(summary.symbols_transmitted, 10);
assert!(summary.success);
assert!(summary.end_to_end_latency.is_some());
}
#[test]
fn trace_record_debug_clone() {
let collector = SymbolTraceCollector::new(RegionTag::new("test"));
let mut rng = DetRng::new(42);
let trace_id = TraceId::new_for_test(10);
let ctx = SymbolTraceContext::new_for_encoding(
trace_id,
SymbolSpanId::NIL,
RegionTag::new("region-a"),
&mut rng,
);
let span = SymbolSpan::new_encode(ctx, ObjectId::new_for_test(1), Time::from_millis(0));
collector.record_span(&span, Time::from_millis(0));
let record = collector.get_trace(trace_id).unwrap();
let record2 = record;
assert_eq!(record2.trace_id, trace_id);
assert!(!record2.is_complete);
assert!(format!("{record2:?}").contains("TraceRecord"));
}
#[test]
fn trace_summary_debug_clone() {
let collector = SymbolTraceCollector::new(RegionTag::new("test"));
let mut rng = DetRng::new(42);
let trace_id = TraceId::new_for_test(20);
let ctx = SymbolTraceContext::new_for_encoding(
trace_id,
SymbolSpanId::NIL,
RegionTag::new("r"),
&mut rng,
);
let mut span = SymbolSpan::new_encode(ctx, ObjectId::new_for_test(1), Time::from_millis(0));
span.set_symbol_count(5);
span.complete_ok(Time::from_millis(100));
collector.record_span(&span, Time::from_millis(100));
let summary = collector.get_summary(trace_id).unwrap();
let summary2 = summary;
assert_eq!(summary2.symbols_encoded, 5);
assert!(format!("{summary2:?}").contains("TraceSummary"));
}
#[test]
fn collector_builder_methods() {
let collector = SymbolTraceCollector::new(RegionTag::new("us-west"))
.with_max_traces(100)
.with_max_age(Duration::from_secs(120))
.with_clock_skew_tolerance(Duration::from_millis(50));
assert_eq!(collector.local_region(), &RegionTag::new("us-west"));
assert_eq!(collector.clock_skew_tolerance(), Duration::from_millis(50));
}
#[test]
fn collector_get_nonexistent_trace() {
let collector = SymbolTraceCollector::new(RegionTag::new("test"));
assert!(collector.get_trace(TraceId::new_for_test(999)).is_none());
assert!(collector.get_summary(TraceId::new_for_test(999)).is_none());
}
}