#![cfg_attr(not(test), allow(dead_code))]
use std::collections::HashMap;
use std::time::Instant;
use serde::Serialize;
pub mod ns {
pub const RETRIEVAL: &str = "retrieval";
pub const TOOL_SEARCH: &str = "tool_search";
pub const MCP: &str = "mcp";
pub const DELEGATION: &str = "delegation";
pub const TASK_STATE: &str = "task_state";
pub const RETRIEVAL_STRATEGY: &str = "retrieval_strategy";
pub fn key(namespace: &str, field: &str) -> String {
format!("{namespace}.{field}")
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub enum SpanOutcome {
Ok,
Skipped,
Error(String),
}
#[derive(Debug, Serialize)]
pub struct TraceSpan {
pub name: String,
pub duration_ms: Option<u64>,
pub outcome: Option<SpanOutcome>,
pub annotations: HashMap<String, serde_json::Value>,
}
pub struct PipelineTrace {
pub turn_id: String,
pub channel: String,
pub stages: Vec<TraceSpan>,
pending_start: Option<Instant>,
pending_name: Option<String>,
pending_annotations: HashMap<String, serde_json::Value>,
}
impl PipelineTrace {
pub fn new(turn_id: impl Into<String>, channel: impl Into<String>) -> Self {
Self {
turn_id: turn_id.into(),
channel: channel.into(),
stages: Vec::new(),
pending_start: None,
pending_name: None,
pending_annotations: HashMap::new(),
}
}
pub fn begin_stage(&mut self, name: impl Into<String>) {
self.pending_start = Some(Instant::now());
self.pending_name = Some(name.into());
self.pending_annotations.clear();
}
pub fn annotate(&mut self, key: impl Into<String>, value: serde_json::Value) {
self.pending_annotations.insert(key.into(), value);
}
pub fn annotate_ns(&mut self, namespace: &str, field: &str, value: serde_json::Value) {
self.annotate(ns::key(namespace, field), value);
}
pub fn end_stage(&mut self, outcome: SpanOutcome) {
let (Some(start), Some(name)) = (self.pending_start.take(), self.pending_name.take())
else {
return;
};
let duration_ms = start.elapsed().as_millis() as u64;
let annotations = std::mem::take(&mut self.pending_annotations);
self.stages.push(TraceSpan {
name,
duration_ms: Some(duration_ms),
outcome: Some(outcome),
annotations,
});
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn annotation_namespaces_are_consistent() {
assert_eq!(ns::RETRIEVAL, "retrieval");
assert_eq!(ns::TOOL_SEARCH, "tool_search");
assert_eq!(ns::MCP, "mcp");
assert_eq!(ns::DELEGATION, "delegation");
}
#[test]
fn namespaced_key_formatting() {
assert_eq!(ns::key(ns::RETRIEVAL, "hit_count"), "retrieval.hit_count");
assert_eq!(ns::key(ns::MCP, "server"), "mcp.server");
}
#[test]
fn typed_annotation_helper() {
let mut trace = PipelineTrace::new("turn-1", "api");
trace.begin_stage("inference");
trace.annotate_ns(ns::RETRIEVAL, "avg_similarity", serde_json::json!(0.82));
trace.end_stage(SpanOutcome::Ok);
let span = &trace.stages[0];
assert_eq!(
span.annotations.get("retrieval.avg_similarity"),
Some(&serde_json::json!(0.82))
);
}
#[test]
fn begin_and_end_stage_records_span() {
let mut trace = PipelineTrace::new("turn-2", "telegram");
assert_eq!(trace.turn_id, "turn-2");
assert_eq!(trace.channel, "telegram");
trace.begin_stage("retrieval");
trace.end_stage(SpanOutcome::Ok);
assert_eq!(trace.stages.len(), 1);
assert_eq!(trace.stages[0].name, "retrieval");
assert_eq!(trace.stages[0].outcome, Some(SpanOutcome::Ok));
assert!(trace.stages[0].duration_ms.is_some());
}
#[test]
fn skipped_outcome_is_recorded() {
let mut trace = PipelineTrace::new("turn-skip", "api");
trace.begin_stage("cache");
trace.end_stage(SpanOutcome::Skipped);
assert_eq!(trace.stages[0].outcome, Some(SpanOutcome::Skipped));
}
#[test]
fn delegation_trace_annotations_use_namespace() {
let mut trace = PipelineTrace::new("turn-delegation", "api");
trace.begin_stage("delegated_execution");
trace.annotate_ns(ns::DELEGATION, "subtask_count", serde_json::json!(3));
trace.annotate_ns(ns::DELEGATION, "pattern", serde_json::json!("fan-out"));
trace.end_stage(SpanOutcome::Ok);
let span = &trace.stages[0];
assert_eq!(
span.annotations.get("delegation.subtask_count"),
Some(&serde_json::json!(3))
);
assert_eq!(
span.annotations.get("delegation.pattern"),
Some(&serde_json::json!("fan-out"))
);
}
#[test]
fn end_stage_without_begin_is_noop() {
let mut trace = PipelineTrace::new("turn-3", "api");
trace.end_stage(SpanOutcome::Ok); assert!(trace.stages.is_empty());
}
#[test]
fn multiple_stages_recorded_in_order() {
let mut trace = PipelineTrace::new("turn-4", "api");
for name in ["retrieval", "inference", "delegation"] {
trace.begin_stage(name);
trace.end_stage(SpanOutcome::Ok);
}
assert_eq!(trace.stages.len(), 3);
assert_eq!(trace.stages[0].name, "retrieval");
assert_eq!(trace.stages[1].name, "inference");
assert_eq!(trace.stages[2].name, "delegation");
}
#[test]
fn error_outcome_preserved() {
let mut trace = PipelineTrace::new("turn-5", "api");
trace.begin_stage("tool_search");
trace.end_stage(SpanOutcome::Error("timeout".into()));
assert_eq!(
trace.stages[0].outcome,
Some(SpanOutcome::Error("timeout".into()))
);
}
}