use std::collections::{BTreeMap, HashMap, VecDeque};
use std::fs;
use std::io::{BufRead, BufReader, Write};
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use std::time::{Duration, Instant, SystemTime};
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use super::model::{RunSummary, SpanEvent, SpanNode, SpanTree};
use super::otlp::{ExportTraceServiceRequest, key_values_to_map};
pub const DEFAULT_CAPACITY: usize = 1_000;
pub(crate) const MAX_SPANS_PER_TRACE: usize = 10_000;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SpanRecord {
pub trace_id: String,
pub span_id: String,
pub parent_span_id: Option<String>,
pub name: String,
pub level: String,
pub target: String,
pub start_unix_nano: Option<i128>,
pub end_unix_nano: Option<i128>,
pub fields: Map<String, Value>,
pub resource: BTreeMap<String, String>,
pub events: Vec<EventRecord>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EventRecord {
pub name: String,
pub time_unix_nano: Option<i128>,
pub target: String,
pub fields: Map<String, Value>,
}
#[derive(Debug)]
struct TraceAccum {
spans: HashMap<String, SpanRecord>,
resource: BTreeMap<String, String>,
last_seen: Instant,
}
impl TraceAccum {
fn new(now: Instant) -> Self {
Self {
spans: HashMap::new(),
resource: BTreeMap::new(),
last_seen: now,
}
}
}
#[derive(Debug, Default)]
struct Inner {
traces: HashMap<String, TraceAccum>,
order: VecDeque<String>,
}
#[derive(Debug)]
pub struct TraceStore {
inner: Mutex<Inner>,
capacity: usize,
ttl: Option<Duration>,
persist_dir: Option<PathBuf>,
}
impl TraceStore {
pub fn new() -> Self {
Self::with_capacity(DEFAULT_CAPACITY)
}
fn lock(&self) -> std::sync::MutexGuard<'_, Inner> {
self.inner
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
}
pub fn with_capacity(capacity: usize) -> Self {
Self {
inner: Mutex::new(Inner::default()),
capacity: capacity.max(1),
ttl: None,
persist_dir: None,
}
}
pub fn with_ttl(self, ttl: Duration) -> Self {
self.with_ttl_at(ttl, SystemTime::now())
}
pub(crate) fn with_ttl_at(mut self, ttl: Duration, now: SystemTime) -> Self {
self.ttl = Some(ttl);
if let Some(dir) = self.persist_dir.clone() {
*self.lock() = Inner::default();
self.load_from(&dir, now);
}
self
}
pub fn with_persistence(self, dir: impl Into<PathBuf>) -> Self {
self.with_persistence_at(dir, SystemTime::now())
}
pub(crate) fn with_persistence_at(mut self, dir: impl Into<PathBuf>, now: SystemTime) -> Self {
let dir = dir.into();
if let Err(err) = fs::create_dir_all(&dir) {
tracing::warn!(error = %err, dir = %dir.display(), "otel: failed to create persistence dir");
} else {
self.load_from(&dir, now);
}
self.persist_dir = Some(dir);
self
}
fn load_from(&self, dir: &Path, now: SystemTime) {
let entries = match fs::read_dir(dir) {
Ok(dir_entries) => dir_entries,
Err(err) => {
tracing::warn!(error = %err, "otel: failed to read persistence dir");
return;
}
};
let mut records = Vec::new();
for entry in entries.flatten() {
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) != Some("jsonl") {
continue;
}
if let Some(ttl) = self.ttl
&& is_expired_file(&path, now, ttl)
{
if let Err(err) = fs::remove_file(&path) {
tracing::warn!(error = %err, path = %path.display(), "otel: failed to remove expired trace file on load");
}
continue;
}
let Ok(file) = fs::File::open(&path) else {
continue;
};
for line in BufReader::new(file).lines().map_while(Result::ok) {
if line.trim().is_empty() {
continue;
}
match serde_json::from_str::<SpanRecord>(&line) {
Ok(rec) => records.push(rec),
Err(err) => {
tracing::warn!(error = %err, path = %path.display(), "otel: skipping malformed persisted span");
}
}
}
}
if !records.is_empty() {
let now = Instant::now();
let mut inner = self.lock();
for rec in records {
Self::upsert(&mut inner, rec, self.capacity, now);
}
tracing::info!(traces = inner.traces.len(), "otel: loaded persisted traces");
}
}
pub fn ingest_otlp_json(&self, body: &[u8]) -> Result<usize, serde_json::Error> {
Ok(self.ingest(serde_json::from_slice(body)?))
}
pub(crate) fn ingest(&self, request: ExportTraceServiceRequest) -> usize {
self.ingest_at(request, Instant::now())
}
pub(crate) fn ingest_at(&self, request: ExportTraceServiceRequest, now: Instant) -> usize {
let records = normalize(request);
if records.is_empty() {
return 0;
}
if let Some(dir) = &self.persist_dir {
self.persist(dir, &records);
}
let count = records.len();
let evicted = {
let mut inner = self.lock();
let mut evicted = self.prune_expired(&mut inner, now);
for rec in records {
Self::upsert(&mut inner, rec, self.capacity, now);
}
evicted.retain(|id| !inner.traces.contains_key(id));
evicted
};
self.remove_persisted(&evicted);
count
}
fn prune_expired(&self, inner: &mut Inner, now: Instant) -> Vec<String> {
let Some(ttl) = self.ttl else {
return Vec::new();
};
let Inner { traces, order } = inner;
let mut evicted = Vec::new();
traces.retain(|id, acc| {
let keep = now.saturating_duration_since(acc.last_seen) < ttl;
if !keep {
evicted.push(id.clone());
}
keep
});
if !evicted.is_empty() {
order.retain(|id| traces.contains_key(id));
}
evicted
}
fn remove_persisted(&self, evicted: &[String]) {
let Some(dir) = &self.persist_dir else {
return;
};
for id in evicted {
let path = dir.join(format!("{}.jsonl", sanitize(id)));
if let Err(err) = fs::remove_file(&path)
&& err.kind() != std::io::ErrorKind::NotFound
{
tracing::warn!(error = %err, trace_id = %id, "otel: failed to remove expired trace file");
}
}
}
fn persist(&self, dir: &Path, records: &[SpanRecord]) {
let mut by_trace: HashMap<&str, String> = HashMap::new();
for rec in records {
let line = match serde_json::to_string(rec) {
Ok(l) => l,
Err(err) => {
tracing::warn!(error = %err, "otel: failed to encode span for persistence");
continue;
}
};
let buf = by_trace.entry(rec.trace_id.as_str()).or_default();
buf.push_str(&line);
buf.push('\n');
}
for (trace_id, buf) in by_trace {
let path = dir.join(format!("{}.jsonl", sanitize(trace_id)));
match fs::OpenOptions::new().create(true).append(true).open(&path) {
Ok(mut file) => {
if let Err(err) = file.write_all(buf.as_bytes()) {
tracing::warn!(error = %err, "otel: failed to append persisted spans");
}
}
Err(err) => {
tracing::warn!(error = %err, "otel: failed to open trace file for append");
}
}
}
}
fn upsert(inner: &mut Inner, rec: SpanRecord, capacity: usize, now: Instant) {
let trace_id = rec.trace_id.clone();
let is_new = !inner.traces.contains_key(&trace_id);
let accum = inner
.traces
.entry(trace_id.clone())
.or_insert_with(|| TraceAccum::new(now));
accum.last_seen = now;
if !accum.spans.contains_key(&rec.span_id) && accum.spans.len() >= MAX_SPANS_PER_TRACE {
tracing::debug!(
trace_id = %rec.trace_id,
span_id = %rec.span_id,
cap = MAX_SPANS_PER_TRACE,
"otel: per-trace span cap reached, dropping span"
);
return;
}
for (k, v) in &rec.resource {
accum.resource.insert(k.clone(), v.clone());
}
accum.spans.insert(rec.span_id.clone(), rec);
if is_new {
inner.order.push_back(trace_id);
while inner.order.len() > capacity {
if let Some(evict) = inner.order.pop_front() {
inner.traces.remove(&evict);
}
}
}
}
pub fn list_runs(&self) -> Vec<RunSummary> {
self.list_runs_at(Instant::now())
}
pub(crate) fn list_runs_at(&self, now: Instant) -> Vec<RunSummary> {
let (runs, evicted) = {
let mut inner = self.lock();
let evicted = self.prune_expired(&mut inner, now);
let runs: Vec<RunSummary> = inner
.order
.iter()
.rev()
.filter_map(|id| inner.traces.get(id).map(|acc| summarize(id, acc)))
.collect();
(runs, evicted)
};
self.remove_persisted(&evicted);
runs
}
pub fn run_tree(&self, run_id: &str) -> Option<SpanTree> {
self.run_tree_at(run_id, Instant::now())
}
pub(crate) fn run_tree_at(&self, run_id: &str, now: Instant) -> Option<SpanTree> {
let (tree, evicted) = {
let mut inner = self.lock();
let evicted = self.prune_expired(&mut inner, now);
let tree = inner.traces.get(run_id).map(|acc| build_tree(run_id, acc));
(tree, evicted)
};
self.remove_persisted(&evicted);
tree
}
}
impl Default for TraceStore {
fn default() -> Self {
Self::new()
}
}
fn normalize(request: ExportTraceServiceRequest) -> Vec<SpanRecord> {
let mut out = Vec::new();
for rs in request.resource_spans {
let resource: BTreeMap<String, String> = rs
.resource
.attributes
.iter()
.filter(|kv| !kv.key.is_empty())
.map(|kv| (kv.key.clone(), kv.value.to_label_string()))
.collect();
for ss in rs.scope_spans {
let scope_name = ss.scope.name.clone().unwrap_or_default();
for span in ss.spans {
if span.trace_id.is_empty() || span.span_id.is_empty() {
continue;
}
let mut fields = key_values_to_map(&span.attributes);
if !scope_name.is_empty() {
fields
.entry("otel.scope.name".to_string())
.or_insert_with(|| Value::String(scope_name.clone()));
}
if let Some(v) = &ss.scope.version {
fields
.entry("otel.scope.version".to_string())
.or_insert_with(|| Value::String(v.clone()));
}
let is_error = span.status.is_error();
if is_error {
fields.insert("otel.status_code".to_string(), Value::from(2));
if let Some(msg) = &span.status.message {
fields
.entry("otel.status_message".to_string())
.or_insert_with(|| Value::String(msg.clone()));
}
}
let events = span
.events
.into_iter()
.map(|ev| EventRecord {
name: ev.name,
time_unix_nano: value_to_nanos(ev.time_unix_nano.as_ref()),
target: scope_name.clone(),
fields: key_values_to_map(&ev.attributes),
})
.collect();
out.push(SpanRecord {
trace_id: span.trace_id,
span_id: span.span_id,
parent_span_id: span.parent_span_id.filter(|p| !p.is_empty()),
name: span.name,
level: if is_error {
"error".into()
} else {
"info".into()
},
target: scope_name.clone(),
start_unix_nano: value_to_nanos(span.start_time_unix_nano.as_ref()),
end_unix_nano: value_to_nanos(span.end_time_unix_nano.as_ref()),
fields,
resource: resource.clone(),
events,
});
}
}
}
out
}
fn summarize(run_id: &str, acc: &TraceAccum) -> RunSummary {
let mut started: Option<i128> = None;
let mut input_tokens: u64 = 0;
let mut output_tokens: u64 = 0;
let mut cost_usd: f64 = 0.0;
let mut any_error = false;
let mut any_open = false;
for span in acc.spans.values() {
if let Some(s) = span.start_unix_nano {
started = Some(started.map_or(s, |cur| cur.min(s)));
}
if span.end_unix_nano.is_none() {
any_open = true;
}
if span.level == "error" {
any_error = true;
}
input_tokens =
input_tokens.saturating_add(read_u64(&span.fields, "gen_ai.usage.input_tokens"));
output_tokens =
output_tokens.saturating_add(read_u64(&span.fields, "gen_ai.usage.output_tokens"));
if let Some(c) = span_cost(&span.fields) {
cost_usd += c;
}
}
let root = find_root(acc);
let duration_ms = root
.and_then(span_duration_ms)
.or_else(|| trace_span_ms(acc));
let outcome = if any_error {
Some("error".to_string())
} else if any_open {
None
} else {
Some("success".to_string())
};
RunSummary {
run_id: run_id.to_string(),
agent_name: None,
started_at: started.map(nanos_to_iso8601),
duration_ms,
outcome,
input_tokens,
output_tokens,
cost_usd,
labels: resource_to_labels(&acc.resource),
}
}
fn build_tree(run_id: &str, acc: &TraceAccum) -> SpanTree {
let mut children_of: HashMap<String, Vec<&SpanRecord>> = HashMap::new();
let mut roots: Vec<&SpanRecord> = Vec::new();
let mut orphans: Vec<&SpanRecord> = Vec::new();
for span in acc.spans.values() {
match &span.parent_span_id {
None => roots.push(span),
Some(parent) if acc.spans.contains_key(parent) => {
children_of.entry(parent.clone()).or_default().push(span);
}
Some(_) => orphans.push(span),
}
}
let mut root_nodes: Vec<SpanNode> = roots
.iter()
.map(|s| node_with_children(s, &children_of))
.collect();
sort_nodes(&mut root_nodes);
let mut orphan_nodes: Vec<SpanNode> = orphans
.iter()
.map(|s| node_with_children(s, &children_of))
.collect();
sort_nodes(&mut orphan_nodes);
let summary = summarize(run_id, acc);
SpanTree {
run_id: run_id.to_string(),
agent_name: summary.agent_name,
started_at: summary.started_at,
duration_ms: summary.duration_ms,
outcome: summary.outcome,
labels: summary.labels,
roots: root_nodes,
orphans: orphan_nodes,
}
}
fn node_with_children(
rec: &SpanRecord,
children_of: &HashMap<String, Vec<&SpanRecord>>,
) -> SpanNode {
let mut children: Vec<SpanNode> = children_of
.get(&rec.span_id)
.map(|kids| {
kids.iter()
.map(|c| node_with_children(c, children_of))
.collect()
})
.unwrap_or_default();
sort_nodes(&mut children);
SpanNode {
span_id: rec.span_id.clone(),
parent_span_id: rec.parent_span_id.clone(),
name: rec.name.clone(),
level: rec.level.clone(),
target: rec.target.clone(),
started_at: rec.start_unix_nano.map(nanos_to_iso8601),
closed_at: rec.end_unix_nano.map(nanos_to_iso8601),
duration_ms: span_duration_ms(rec),
fields: rec.fields.clone(),
events: rec
.events
.iter()
.map(|ev| SpanEvent {
ts: ev.time_unix_nano.map(nanos_to_iso8601).unwrap_or_default(),
level: "info".to_string(),
target: ev.target.clone(),
name: ev.name.clone(),
message: None,
fields: ev.fields.clone(),
})
.collect(),
children,
}
}
fn sort_nodes(nodes: &mut [SpanNode]) {
nodes.sort_by(|a, b| {
let ta = a.started_at.as_deref().unwrap_or("");
let tb = b.started_at.as_deref().unwrap_or("");
ta.cmp(tb)
.then_with(|| field_index(&a.fields).cmp(&field_index(&b.fields)))
.then_with(|| a.span_id.cmp(&b.span_id))
});
}
fn field_index(fields: &Map<String, Value>) -> i64 {
for (k, v) in fields {
if k.ends_with(".index") || k == "index" {
if let Some(n) = v.as_i64() {
return n;
}
if let Some(s) = v.as_str()
&& let Ok(n) = s.parse::<i64>()
{
return n;
}
}
}
i64::MAX
}
fn find_root(acc: &TraceAccum) -> Option<&SpanRecord> {
acc.spans
.values()
.find(|s| s.parent_span_id.is_none())
.or_else(|| {
acc.spans.values().find(|s| {
s.parent_span_id
.as_ref()
.is_some_and(|p| !acc.spans.contains_key(p))
})
})
}
fn span_duration_ms(rec: &SpanRecord) -> Option<f64> {
match (rec.start_unix_nano, rec.end_unix_nano) {
(Some(start), Some(end)) if end >= start => Some((end - start) as f64 / 1_000_000.0),
_ => None,
}
}
fn trace_span_ms(acc: &TraceAccum) -> Option<f64> {
let mut min_start: Option<i128> = None;
let mut max_end: Option<i128> = None;
for span in acc.spans.values() {
if let Some(s) = span.start_unix_nano {
min_start = Some(min_start.map_or(s, |c| c.min(s)));
}
if let Some(e) = span.end_unix_nano {
max_end = Some(max_end.map_or(e, |c| c.max(e)));
}
}
match (min_start, max_end) {
(Some(s), Some(e)) if e >= s => Some((e - s) as f64 / 1_000_000.0),
_ => None,
}
}
fn span_cost(fields: &Map<String, Value>) -> Option<f64> {
if let Some(v) = fields.get("gen_ai.usage.cost_usd") {
return read_f64(v).filter(|c| *c > 0.0);
}
let mut total: Option<&Value> = None;
let mut loose: Option<&Value> = None;
for (k, v) in fields {
let lk = k.to_ascii_lowercase();
if lk.ends_with(".cost.total_usd") || lk.ends_with("cost_total_usd") {
total.get_or_insert(v);
} else if lk.ends_with("_usd") && lk.contains("cost") {
loose.get_or_insert(v);
}
}
total.or(loose).and_then(read_f64).filter(|c| *c > 0.0)
}
fn read_u64(fields: &Map<String, Value>, key: &str) -> u64 {
match fields.get(key) {
Some(Value::Number(n)) => n
.as_u64()
.or_else(|| n.as_f64().map(|f| f as u64))
.unwrap_or(0),
Some(Value::String(s)) => s
.parse::<u64>()
.ok()
.or_else(|| s.parse::<f64>().ok().map(|f| f as u64))
.unwrap_or(0),
_ => 0,
}
}
fn read_f64(value: &Value) -> Option<f64> {
match value {
Value::Number(n) => n.as_f64(),
Value::String(s) => s.parse::<f64>().ok(),
_ => None,
}
}
fn resource_to_labels(resource: &BTreeMap<String, String>) -> Map<String, Value> {
let mut labels = Map::new();
for (k, v) in resource {
labels.insert(k.clone(), Value::String(v.clone()));
}
labels
}
fn value_to_nanos(value: Option<&Value>) -> Option<i128> {
match value? {
Value::String(s) => s.parse::<i128>().ok(),
Value::Number(n) => n
.as_i64()
.map(i128::from)
.or_else(|| n.as_f64().map(|f| f as i128)),
_ => None,
}
}
fn is_expired_file(path: &Path, now: SystemTime, ttl: Duration) -> bool {
fs::metadata(path)
.and_then(|m| m.modified())
.ok()
.and_then(|mtime| now.duration_since(mtime).ok())
.is_some_and(|age| age >= ttl)
}
fn sanitize(trace_id: &str) -> String {
trace_id
.chars()
.map(|c| {
if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
c
} else {
'_'
}
})
.take(128)
.collect()
}
pub(crate) fn nanos_to_iso8601(nanos: i128) -> String {
let secs = nanos.div_euclid(1_000_000_000);
let ns = nanos.rem_euclid(1_000_000_000);
let ms = ns / 1_000_000;
let days = secs.div_euclid(86_400);
let sod = secs.rem_euclid(86_400);
let (y, m, d) = civil_from_days(days as i64);
let hh = sod / 3600;
let mm = (sod % 3600) / 60;
let ss = sod % 60;
format!("{y:04}-{m:02}-{d:02}T{hh:02}:{mm:02}:{ss:02}.{ms:03}Z")
}
fn civil_from_days(z: i64) -> (i64, u32, u32) {
let z = z + 719_468;
let era = if z >= 0 { z } else { z - 146_096 } / 146_097;
let doe = z - era * 146_097;
let yoe = (doe - doe / 1460 + doe / 36_524 - doe / 146_096) / 365;
let y = yoe + era * 400;
let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
let mp = (5 * doy + 2) / 153;
let d = (doy - (153 * mp + 2) / 5 + 1) as u32;
let m = if mp < 10 { mp + 3 } else { mp - 9 } as u32;
(if m <= 2 { y + 1 } else { y }, m, d)
}