roboticus-api 0.11.3

HTTP routes, WebSocket, auth, rate limiting, and dashboard for the Roboticus agent runtime
Documentation
#![cfg_attr(not(test), allow(dead_code))]

//! Structured trace annotations for pipeline stages.
//!
//! `PipelineTrace` records per-stage spans with key/value annotations.
//! Callers open a span with `begin_stage`, attach metadata with `annotate` or
//! `annotate_ns`, then close it with `end_stage`.
//!
//! The `ns` module provides canonical namespace constants so that callers
//! across retrieval, tool-search, MCP, and delegation subsystems use
//! consistent annotation keys.

use std::collections::HashMap;
use std::time::Instant;

use serde::Serialize;

// ── Namespace constants ────────────────────────────────────────────────────

/// Canonical annotation namespaces for pipeline trace keys.
pub mod ns {
    pub const RETRIEVAL: &str = "retrieval";
    pub const TOOL_SEARCH: &str = "tool_search";
    pub const MCP: &str = "mcp";
    pub const DELEGATION: &str = "delegation";
    pub const TASK_STATE: &str = "task_state";
    pub const RETRIEVAL_STRATEGY: &str = "retrieval_strategy";

    /// Build a namespaced key: `"<namespace>.<field>"`.
    pub fn key(namespace: &str, field: &str) -> String {
        format!("{namespace}.{field}")
    }
}

// ── Span outcome ───────────────────────────────────────────────────────────

/// Outcome recorded when a pipeline stage span is closed.
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub enum SpanOutcome {
    /// Stage completed successfully.
    Ok,
    /// Stage was skipped (e.g. short-circuited by cache hit).
    Skipped,
    /// Stage failed; the inner string carries a brief error label.
    Error(String),
}

// ── Span ───────────────────────────────────────────────────────────────────

/// A single timed stage within a `PipelineTrace`.
#[derive(Debug, Serialize)]
pub struct TraceSpan {
    /// Human-readable stage label (e.g. `"inference"`, `"retrieval"`).
    pub name: String,
    /// Duration of the span in milliseconds, set when `end_stage` is called.
    pub duration_ms: Option<u64>,
    /// Outcome of the span, set when `end_stage` is called.
    pub outcome: Option<SpanOutcome>,
    /// Key/value annotations attached to this span.
    pub annotations: HashMap<String, serde_json::Value>,
}

// ── PipelineTrace ──────────────────────────────────────────────────────────

/// Lightweight structured trace for a single pipeline turn.
///
/// Usage:
/// ```ignore
/// let mut trace = PipelineTrace::new("turn-abc", "api");
/// trace.begin_stage("retrieval");
/// trace.annotate_ns(ns::RETRIEVAL, "hit_count", json!(3));
/// trace.end_stage(SpanOutcome::Ok);
/// ```
pub struct PipelineTrace {
    /// Identifier of the turn being traced.
    pub turn_id: String,
    /// Source channel / connector label (e.g. `"api"`, `"telegram"`).
    pub channel: String,
    /// All recorded stage spans (in order).
    pub stages: Vec<TraceSpan>,
    /// Wall-clock start of the current open span; `None` when no span is open.
    pending_start: Option<Instant>,
    /// Name of the currently open span.
    pending_name: Option<String>,
    /// Annotations accumulated for the pending span.
    pending_annotations: HashMap<String, serde_json::Value>,
}

impl PipelineTrace {
    /// Create a new trace for `turn_id` received on `channel`.
    pub fn new(turn_id: impl Into<String>, channel: impl Into<String>) -> Self {
        Self {
            turn_id: turn_id.into(),
            channel: channel.into(),
            stages: Vec::new(),
            pending_start: None,
            pending_name: None,
            pending_annotations: HashMap::new(),
        }
    }

    /// Open a new stage span.  Any previously open span is discarded (use
    /// `end_stage` to close properly before beginning the next one).
    pub fn begin_stage(&mut self, name: impl Into<String>) {
        self.pending_start = Some(Instant::now());
        self.pending_name = Some(name.into());
        self.pending_annotations.clear();
    }

    /// Attach a raw annotation to the currently open span.
    ///
    /// No-op when no span is open.
    pub fn annotate(&mut self, key: impl Into<String>, value: serde_json::Value) {
        self.pending_annotations.insert(key.into(), value);
    }

    /// Attach an annotation using a `"<namespace>.<field>"` key.
    ///
    /// No-op when no span is open.
    pub fn annotate_ns(&mut self, namespace: &str, field: &str, value: serde_json::Value) {
        self.annotate(ns::key(namespace, field), value);
    }

    /// Close the currently open span with the given outcome.
    ///
    /// No-op when no span is open.
    pub fn end_stage(&mut self, outcome: SpanOutcome) {
        let (Some(start), Some(name)) = (self.pending_start.take(), self.pending_name.take())
        else {
            return;
        };
        let duration_ms = start.elapsed().as_millis() as u64;
        let annotations = std::mem::take(&mut self.pending_annotations);
        self.stages.push(TraceSpan {
            name,
            duration_ms: Some(duration_ms),
            outcome: Some(outcome),
            annotations,
        });
    }
}

// ── Tests ──────────────────────────────────────────────────────────────────

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn annotation_namespaces_are_consistent() {
        assert_eq!(ns::RETRIEVAL, "retrieval");
        assert_eq!(ns::TOOL_SEARCH, "tool_search");
        assert_eq!(ns::MCP, "mcp");
        assert_eq!(ns::DELEGATION, "delegation");
    }

    #[test]
    fn namespaced_key_formatting() {
        assert_eq!(ns::key(ns::RETRIEVAL, "hit_count"), "retrieval.hit_count");
        assert_eq!(ns::key(ns::MCP, "server"), "mcp.server");
    }

    #[test]
    fn typed_annotation_helper() {
        let mut trace = PipelineTrace::new("turn-1", "api");
        trace.begin_stage("inference");
        trace.annotate_ns(ns::RETRIEVAL, "avg_similarity", serde_json::json!(0.82));
        trace.end_stage(SpanOutcome::Ok);
        let span = &trace.stages[0];
        assert_eq!(
            span.annotations.get("retrieval.avg_similarity"),
            Some(&serde_json::json!(0.82))
        );
    }

    #[test]
    fn begin_and_end_stage_records_span() {
        let mut trace = PipelineTrace::new("turn-2", "telegram");
        assert_eq!(trace.turn_id, "turn-2");
        assert_eq!(trace.channel, "telegram");
        trace.begin_stage("retrieval");
        trace.end_stage(SpanOutcome::Ok);
        assert_eq!(trace.stages.len(), 1);
        assert_eq!(trace.stages[0].name, "retrieval");
        assert_eq!(trace.stages[0].outcome, Some(SpanOutcome::Ok));
        assert!(trace.stages[0].duration_ms.is_some());
    }

    #[test]
    fn skipped_outcome_is_recorded() {
        let mut trace = PipelineTrace::new("turn-skip", "api");
        trace.begin_stage("cache");
        trace.end_stage(SpanOutcome::Skipped);
        assert_eq!(trace.stages[0].outcome, Some(SpanOutcome::Skipped));
    }

    #[test]
    fn delegation_trace_annotations_use_namespace() {
        let mut trace = PipelineTrace::new("turn-delegation", "api");
        trace.begin_stage("delegated_execution");
        trace.annotate_ns(ns::DELEGATION, "subtask_count", serde_json::json!(3));
        trace.annotate_ns(ns::DELEGATION, "pattern", serde_json::json!("fan-out"));
        trace.end_stage(SpanOutcome::Ok);

        let span = &trace.stages[0];
        assert_eq!(
            span.annotations.get("delegation.subtask_count"),
            Some(&serde_json::json!(3))
        );
        assert_eq!(
            span.annotations.get("delegation.pattern"),
            Some(&serde_json::json!("fan-out"))
        );
    }

    #[test]
    fn end_stage_without_begin_is_noop() {
        let mut trace = PipelineTrace::new("turn-3", "api");
        trace.end_stage(SpanOutcome::Ok); // should not panic
        assert!(trace.stages.is_empty());
    }

    #[test]
    fn multiple_stages_recorded_in_order() {
        let mut trace = PipelineTrace::new("turn-4", "api");
        for name in ["retrieval", "inference", "delegation"] {
            trace.begin_stage(name);
            trace.end_stage(SpanOutcome::Ok);
        }
        assert_eq!(trace.stages.len(), 3);
        assert_eq!(trace.stages[0].name, "retrieval");
        assert_eq!(trace.stages[1].name, "inference");
        assert_eq!(trace.stages[2].name, "delegation");
    }

    #[test]
    fn error_outcome_preserved() {
        let mut trace = PipelineTrace::new("turn-5", "api");
        trace.begin_stage("tool_search");
        trace.end_stage(SpanOutcome::Error("timeout".into()));
        assert_eq!(
            trace.stages[0].outcome,
            Some(SpanOutcome::Error("timeout".into()))
        );
    }
}