Skip to main content

zeph_core/debug_dump/
trace.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! OpenTelemetry-compatible trace collector for debug sessions.
5//!
6//! Collects span data during an agent session and serializes to OTLP JSON format
7//! at session end. All text-bearing attributes are redacted via `crate::redact::scrub_content`
8//! before storage (C-01).
9//!
10//! Design notes:
11//! - Uses explicit `begin_X` / `end_X` methods with owned `SpanGuard` — safe
12//!   across async `.await` boundaries because no borrow to `TracingCollector` is held (C-02).
13//! - A `HashMap<usize, IterationEntry>` tracks concurrent iterations (I-03).
14//! - `Drop` on `TracingCollector` flushes partial traces on error/panic paths (C-04).
15//! - When the `otel` feature is enabled an `mpsc` channel forwards completed spans to
16//!   the OTLP exporter in `tracing_init.rs` (C-05).
17
18use std::collections::{HashMap, VecDeque};
19use std::path::{Path, PathBuf};
20use std::sync::atomic::{AtomicU64, Ordering};
21use std::time::{SystemTime, UNIX_EPOCH};
22
23use rand::RngExt as _;
24use serde::{Deserialize, Serialize};
25
26use crate::redact::scrub_content;
27
28// ─── Span ID generation ───────────────────────────────────────────────────────
29
30static SPAN_COUNTER: AtomicU64 = AtomicU64::new(0);
31
32#[must_use]
33fn new_trace_id() -> [u8; 16] {
34    rand::rng().random()
35}
36
37#[must_use]
38fn new_span_id() -> [u8; 8] {
39    let mut id: [u8; 8] = rand::rng().random();
40    // XOR low byte with counter to guarantee distinct IDs even under high concurrency.
41    id[0] ^= (SPAN_COUNTER.fetch_add(1, Ordering::Relaxed) & 0xFF) as u8;
42    id
43}
44
45#[must_use]
46fn hex16(b: &[u8; 16]) -> String {
47    use std::fmt::Write as _;
48    let mut s = String::with_capacity(32);
49    for x in b {
50        let _ = write!(s, "{x:02x}");
51    }
52    s
53}
54
55#[must_use]
56fn hex8(b: [u8; 8]) -> String {
57    use std::fmt::Write as _;
58    let mut s = String::with_capacity(16);
59    for x in b {
60        let _ = write!(s, "{x:02x}");
61    }
62    s
63}
64
65#[must_use]
66fn now_unix_nanos() -> u64 {
67    SystemTime::now()
68        .duration_since(UNIX_EPOCH)
69        .map_or(0, |d| u64::try_from(d.as_nanos()).unwrap_or(u64::MAX))
70}
71
72// ─── Public types ─────────────────────────────────────────────────────────────
73
74#[non_exhaustive]
75/// Span status code (matches OTLP `StatusCode` values).
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub enum SpanStatus {
78    Ok,
79    Error { message: String },
80    Unset,
81}
82
83/// A completed span ready for OTLP serialization.
84#[derive(Debug, Clone)]
85pub struct SpanData {
86    pub trace_id: [u8; 16],
87    pub span_id: [u8; 8],
88    pub parent_span_id: Option<[u8; 8]>,
89    pub name: String,
90    pub start_time_unix_nanos: u64,
91    pub end_time_unix_nanos: u64,
92    pub attributes: Vec<(String, String)>,
93    pub status: SpanStatus,
94}
95
96/// Owned guard returned by `begin_*` methods. Pass back to `end_*` to close the span.
97///
98/// Does NOT hold a reference to `TracingCollector` — safe across async `.await` boundaries (C-02).
99pub struct SpanGuard {
100    pub span_id: [u8; 8],
101    pub parent_span_id: [u8; 8],
102    pub name: String,
103    pub start_time_unix_nanos: u64,
104}
105
106/// Attributes for a completed LLM request span.
107pub struct LlmAttributes {
108    pub model: String,
109    pub prompt_tokens: u64,
110    pub completion_tokens: u64,
111    pub latency_ms: u64,
112    pub streaming: bool,
113    pub cache_hit: bool,
114}
115
116/// Attributes for a completed tool call span.
117pub struct ToolAttributes {
118    pub latency_ms: u64,
119    pub is_error: bool,
120    pub error_kind: Option<String>,
121}
122
123/// Attributes for a completed memory search span.
124pub struct MemorySearchAttributes {
125    pub query_preview: String,
126    pub result_count: usize,
127    pub latency_ms: u64,
128}
129
130// ─── Bridge event for OTLP export (C-05) ─────────────────────────────────────
131
132/// Event sent over the mpsc channel to the OTLP exporter.
133///
134/// The root crate wires the sender when the `otel` feature is enabled.
135/// `zeph-core` compiles this unconditionally so the struct is always available.
136#[derive(Debug)]
137pub struct TraceEvent {
138    pub trace_id: [u8; 16],
139    pub spans: Vec<SpanData>,
140}
141
142// ─── Internal ─────────────────────────────────────────────────────────────────
143
144/// Internal entry for a still-open iteration.
145struct IterationEntry {
146    guard: SpanGuard,
147    user_msg_preview: String,
148}
149
150// ─── TracingCollector ─────────────────────────────────────────────────────────
151
152/// Collects OTel-compatible spans for a single agent session.
153///
154/// All methods take `&mut self`. The agent loop is single-threaded within a session.
155/// For concurrent iteration support (I-03), a `HashMap<usize, IterationEntry>` is used.
156/// Default cap on collected spans per session (SEC-02).
157const DEFAULT_MAX_SPANS: usize = 10_000;
158
159pub struct TracingCollector {
160    trace_id: [u8; 16],
161    session_span_id: [u8; 8],
162    session_start: u64,
163    service_name: String,
164    /// User-defined resource attributes from `telemetry.trace_metadata`, included in
165    /// `resourceSpans[].resource.attributes` when serializing to OTLP JSON.
166    trace_metadata: HashMap<String, String>,
167    output_dir: PathBuf,
168    /// Active (open) iterations keyed by iteration index (I-03).
169    active_iterations: HashMap<usize, IterationEntry>,
170    completed_spans: VecDeque<SpanData>,
171    /// Hard cap on `completed_spans` length. Oldest span dropped when exceeded (SEC-02).
172    max_spans: usize,
173    /// Whether to redact text attributes. Defaults to `true` (C-01).
174    redact: bool,
175    /// Guards against double-write on explicit `finish()` followed by `Drop`.
176    flushed: bool,
177    /// Optional channel to forward completed spans to the OTLP exporter.
178    /// Wired by the root crate when the `otel` feature is enabled (C-05).
179    trace_tx: Option<tokio::sync::mpsc::UnboundedSender<TraceEvent>>,
180}
181
182impl TracingCollector {
183    /// Create a new collector.
184    ///
185    /// `trace_metadata` is included as additional resource attributes in the OTLP JSON output.
186    /// The reserved key `service.name` in `trace_metadata` is silently skipped.
187    ///
188    /// # Errors
189    ///
190    /// Returns an error if `output_dir` cannot be created.
191    pub fn new(
192        output_dir: &Path,
193        service_name: impl Into<String>,
194        trace_metadata: HashMap<String, String>,
195        redact: bool,
196        trace_tx: Option<tokio::sync::mpsc::UnboundedSender<TraceEvent>>,
197    ) -> std::io::Result<Self> {
198        std::fs::create_dir_all(output_dir)?;
199        Ok(Self {
200            trace_id: new_trace_id(),
201            session_span_id: new_span_id(),
202            session_start: now_unix_nanos(),
203            service_name: service_name.into(),
204            trace_metadata,
205            output_dir: output_dir.to_owned(),
206            active_iterations: HashMap::new(),
207            completed_spans: VecDeque::new(),
208            max_spans: DEFAULT_MAX_SPANS,
209            redact,
210            flushed: false,
211            trace_tx,
212        })
213    }
214
215    fn maybe_redact<'a>(&self, text: &'a str) -> std::borrow::Cow<'a, str> {
216        if self.redact {
217            scrub_content(text)
218        } else {
219            std::borrow::Cow::Borrowed(text)
220        }
221    }
222
223    /// Append a span, dropping the oldest when `max_spans` is exceeded (SEC-02).
224    fn push_span(&mut self, span: SpanData) {
225        if self.completed_spans.len() >= self.max_spans {
226            tracing::warn!(
227                max_spans = self.max_spans,
228                "trace span cap reached, dropping oldest span"
229            );
230            self.completed_spans.pop_front();
231        }
232        self.completed_spans.push_back(span);
233    }
234
235    // ── Iteration spans ───────────────────────────────────────────────────────
236
237    /// Open an iteration span. Call at the start of `process_user_message`.
238    pub fn begin_iteration(&mut self, index: usize, user_msg_preview: &str) {
239        let preview = self
240            .maybe_redact(user_msg_preview)
241            .chars()
242            .take(100)
243            .collect::<String>();
244        let entry = IterationEntry {
245            guard: SpanGuard {
246                span_id: new_span_id(),
247                parent_span_id: self.session_span_id,
248                name: format!("iteration.{index}"),
249                start_time_unix_nanos: now_unix_nanos(),
250            },
251            user_msg_preview: preview,
252        };
253        self.active_iterations.insert(index, entry);
254    }
255
256    /// Close an iteration span.
257    pub fn end_iteration(&mut self, index: usize, status: SpanStatus) {
258        let end_time = now_unix_nanos();
259        if let Some(entry) = self.active_iterations.remove(&index) {
260            let span = SpanData {
261                trace_id: self.trace_id,
262                span_id: entry.guard.span_id,
263                parent_span_id: Some(entry.guard.parent_span_id),
264                name: entry.guard.name,
265                start_time_unix_nanos: entry.guard.start_time_unix_nanos,
266                end_time_unix_nanos: end_time,
267                attributes: vec![(
268                    "zeph.iteration.user_message_preview".to_owned(),
269                    entry.user_msg_preview,
270                )],
271                status,
272            };
273            self.push_span(span);
274        } else {
275            tracing::warn!(index, "end_iteration without matching begin_iteration");
276        }
277    }
278
279    // ── LLM request spans ─────────────────────────────────────────────────────
280
281    /// Open an LLM request span. Returns an owned `SpanGuard` safe to hold across `.await`.
282    #[must_use]
283    pub fn begin_llm_request(&self, iteration_span_id: [u8; 8]) -> SpanGuard {
284        SpanGuard {
285            span_id: new_span_id(),
286            parent_span_id: iteration_span_id,
287            name: "llm.request".to_owned(),
288            start_time_unix_nanos: now_unix_nanos(),
289        }
290    }
291
292    /// Close an LLM request span.
293    pub fn end_llm_request(&mut self, guard: SpanGuard, attrs: &LlmAttributes) {
294        let end_time = now_unix_nanos();
295        let model_clean = self.maybe_redact(&attrs.model).into_owned();
296        self.push_span(SpanData {
297            trace_id: self.trace_id,
298            span_id: guard.span_id,
299            parent_span_id: Some(guard.parent_span_id),
300            name: guard.name,
301            start_time_unix_nanos: guard.start_time_unix_nanos,
302            end_time_unix_nanos: end_time,
303            attributes: vec![
304                ("zeph.llm.model".to_owned(), model_clean),
305                (
306                    "zeph.llm.prompt_tokens".to_owned(),
307                    attrs.prompt_tokens.to_string(),
308                ),
309                (
310                    "zeph.llm.completion_tokens".to_owned(),
311                    attrs.completion_tokens.to_string(),
312                ),
313                (
314                    "zeph.llm.latency_ms".to_owned(),
315                    attrs.latency_ms.to_string(),
316                ),
317                ("zeph.llm.streaming".to_owned(), attrs.streaming.to_string()),
318                ("zeph.llm.cache_hit".to_owned(), attrs.cache_hit.to_string()),
319            ],
320            status: SpanStatus::Ok,
321        });
322    }
323
324    // ── Tool call spans ───────────────────────────────────────────────────────
325
326    /// Open a tool call span, recording the start time as now.
327    #[must_use]
328    pub fn begin_tool_call(&self, tool_name: &str, iteration_span_id: [u8; 8]) -> SpanGuard {
329        self.begin_tool_call_at(tool_name, iteration_span_id, &std::time::Instant::now())
330    }
331
332    /// Open a tool call span with a pre-recorded start time.
333    ///
334    /// Use this variant when the tool has already executed (post-hoc assembly pattern) and
335    /// `started_at` was captured *before* the call. The Unix start timestamp is back-computed
336    /// from `started_at.elapsed()` so the span is correctly positioned on the timeline.
337    #[must_use]
338    pub fn begin_tool_call_at(
339        &self,
340        tool_name: &str,
341        iteration_span_id: [u8; 8],
342        started_at: &std::time::Instant,
343    ) -> SpanGuard {
344        let elapsed_nanos = u64::try_from(started_at.elapsed().as_nanos()).unwrap_or(u64::MAX);
345        let start_time_unix_nanos = now_unix_nanos().saturating_sub(elapsed_nanos);
346        SpanGuard {
347            span_id: new_span_id(),
348            parent_span_id: iteration_span_id,
349            name: format!("tool.{}", sanitize_name(tool_name)),
350            start_time_unix_nanos,
351        }
352    }
353
354    /// Close a tool call span.
355    pub fn end_tool_call(&mut self, guard: SpanGuard, tool_name: &str, attrs: ToolAttributes) {
356        let end_time = now_unix_nanos();
357        let tool_clean = sanitize_name(tool_name);
358        let mut attributes = vec![
359            ("zeph.tool.name".to_owned(), tool_clean),
360            (
361                "zeph.tool.latency_ms".to_owned(),
362                attrs.latency_ms.to_string(),
363            ),
364            ("zeph.tool.is_error".to_owned(), attrs.is_error.to_string()),
365        ];
366        if let Some(kind) = attrs.error_kind {
367            // IMP-04: apply redaction to error messages (may contain secret data).
368            let kind_clean = self.maybe_redact(&kind).into_owned();
369            attributes.push(("zeph.tool.error_kind".to_owned(), kind_clean));
370        }
371        let status = if attrs.is_error {
372            SpanStatus::Error {
373                message: "tool call failed".to_owned(),
374            }
375        } else {
376            SpanStatus::Ok
377        };
378        self.push_span(SpanData {
379            trace_id: self.trace_id,
380            span_id: guard.span_id,
381            parent_span_id: Some(guard.parent_span_id),
382            name: guard.name,
383            start_time_unix_nanos: guard.start_time_unix_nanos,
384            end_time_unix_nanos: end_time,
385            attributes,
386            status,
387        });
388    }
389
390    // ── Memory search spans ───────────────────────────────────────────────────
391
392    /// Open a memory search span.
393    #[must_use]
394    pub fn begin_memory_search(&self, parent_span_id: [u8; 8]) -> SpanGuard {
395        SpanGuard {
396            span_id: new_span_id(),
397            parent_span_id,
398            name: "memory.search".to_owned(),
399            start_time_unix_nanos: now_unix_nanos(),
400        }
401    }
402
403    /// Close a memory search span.
404    pub fn end_memory_search(&mut self, guard: SpanGuard, attrs: &MemorySearchAttributes) {
405        let end_time = now_unix_nanos();
406        let query_clean = self
407            .maybe_redact(&attrs.query_preview)
408            .chars()
409            .take(100)
410            .collect::<String>();
411        self.push_span(SpanData {
412            trace_id: self.trace_id,
413            span_id: guard.span_id,
414            parent_span_id: Some(guard.parent_span_id),
415            name: guard.name,
416            start_time_unix_nanos: guard.start_time_unix_nanos,
417            end_time_unix_nanos: end_time,
418            attributes: vec![
419                ("zeph.memory.query_preview".to_owned(), query_clean),
420                (
421                    "zeph.memory.result_count".to_owned(),
422                    attrs.result_count.to_string(),
423                ),
424                (
425                    "zeph.memory.latency_ms".to_owned(),
426                    attrs.latency_ms.to_string(),
427                ),
428            ],
429            status: SpanStatus::Ok,
430        });
431    }
432
433    // ── Accessors ─────────────────────────────────────────────────────────────
434
435    /// Return the path to the `trace.json` file that will be written on `finish()`.
436    #[must_use]
437    pub fn trace_json_path(&self) -> PathBuf {
438        self.output_dir.join("trace.json")
439    }
440
441    /// Return the span ID of the currently active iteration, if any.
442    #[must_use]
443    pub fn current_iteration_span_id(&self, index: usize) -> Option<[u8; 8]> {
444        self.active_iterations.get(&index).map(|e| e.guard.span_id)
445    }
446
447    /// Return the session root span ID (fallback parent when no iteration is active).
448    #[must_use]
449    pub fn session_span_id(&self) -> [u8; 8] {
450        self.session_span_id
451    }
452
453    /// Return the trace ID for this session.
454    #[must_use]
455    pub fn trace_id(&self) -> [u8; 16] {
456        self.trace_id
457    }
458
459    // ── Flush ─────────────────────────────────────────────────────────────────
460
461    /// Finalize the session span and write `trace.json`.
462    ///
463    /// Safe to call multiple times — subsequent calls after the first are no-ops.
464    /// Also sends spans over the `OTel` channel when the `otel` feature is enabled (C-05).
465    pub fn finish(&mut self) {
466        if self.flushed {
467            return;
468        }
469        self.flushed = true;
470
471        // Close any still-open iteration spans with `Unset` status (partial trace on error/cancel).
472        let open_keys: Vec<usize> = self.active_iterations.keys().copied().collect();
473        let end_time = now_unix_nanos();
474        for index in open_keys {
475            if let Some(entry) = self.active_iterations.remove(&index) {
476                self.push_span(SpanData {
477                    trace_id: self.trace_id,
478                    span_id: entry.guard.span_id,
479                    parent_span_id: Some(entry.guard.parent_span_id),
480                    name: entry.guard.name,
481                    start_time_unix_nanos: entry.guard.start_time_unix_nanos,
482                    end_time_unix_nanos: end_time,
483                    attributes: vec![(
484                        "zeph.iteration.user_message_preview".to_owned(),
485                        entry.user_msg_preview,
486                    )],
487                    status: SpanStatus::Unset,
488                });
489            }
490        }
491
492        let session_span = SpanData {
493            trace_id: self.trace_id,
494            span_id: self.session_span_id,
495            parent_span_id: None,
496            name: "session".to_owned(),
497            start_time_unix_nanos: self.session_start,
498            end_time_unix_nanos: end_time,
499            attributes: vec![
500                ("service.name".to_owned(), self.service_name.clone()),
501                ("zeph.session.trace_id".to_owned(), hex16(&self.trace_id)),
502            ],
503            status: SpanStatus::Ok,
504        };
505
506        let mut all_spans = vec![session_span];
507        all_spans.extend(self.completed_spans.drain(..));
508
509        let json = serialize_otlp_json(&all_spans, &self.service_name, &self.trace_metadata);
510        let path = self.output_dir.join("trace.json");
511        if let Err(e) = write_trace_file(&path, json.as_bytes()) {
512            tracing::warn!(path = %path.display(), error = %e, "trace.json write failed");
513        } else {
514            tracing::info!(path = %path.display(), "OTel trace written");
515        }
516
517        // C-05: forward spans to OTLP exporter when the root crate has wired the channel.
518        if let Some(ref tx) = self.trace_tx {
519            let event = TraceEvent {
520                trace_id: self.trace_id,
521                spans: all_spans,
522            };
523            if tx.send(event).is_err() {
524                tracing::debug!("OTLP trace channel closed, skipping export");
525            }
526        }
527    }
528}
529
530// C-04: Drop flushes partial traces on error/panic/cancellation.
531impl Drop for TracingCollector {
532    fn drop(&mut self) {
533        self.finish();
534    }
535}
536
537// ─── OTLP JSON serialization (I-04) ───────────────────────────────────────────
538
539fn span_status_code(status: &SpanStatus) -> u8 {
540    match status {
541        SpanStatus::Unset => 0,
542        SpanStatus::Ok => 1,
543        SpanStatus::Error { .. } => 2,
544    }
545}
546
547/// Serialize spans to OTLP JSON Protobuf encoding.
548///
549/// `trace_metadata` entries are appended as additional resource attributes after `service.name`.
550/// The reserved key `service.name` in `trace_metadata` is skipped to avoid duplication.
551///
552/// Format: <https://opentelemetry.io/docs/specs/otlp/#json-protobuf-encoding>
553#[must_use]
554pub fn serialize_otlp_json<S: std::hash::BuildHasher>(
555    spans: &[SpanData],
556    service_name: &str,
557    trace_metadata: &HashMap<String, String, S>,
558) -> String {
559    let otlp_spans: Vec<serde_json::Value> = spans
560        .iter()
561        .map(|s| {
562            let attrs: Vec<serde_json::Value> = s
563                .attributes
564                .iter()
565                .map(|(k, v)| {
566                    serde_json::json!({
567                        "key": k,
568                        "value": { "stringValue": v }
569                    })
570                })
571                .collect();
572
573            let mut obj = serde_json::json!({
574                "traceId": hex16(&s.trace_id),
575                "spanId": hex8(s.span_id),
576                "name": s.name,
577                // OTLP JSON spec requires int64 fields as strings.
578                "startTimeUnixNano": s.start_time_unix_nanos.to_string(),
579                "endTimeUnixNano": s.end_time_unix_nanos.to_string(),
580                "attributes": attrs,
581                "status": {
582                    "code": span_status_code(&s.status)
583                }
584            });
585
586            if let Some(parent) = s.parent_span_id {
587                obj["parentSpanId"] = serde_json::json!(hex8(parent));
588            }
589
590            if let SpanStatus::Error { message } = &s.status {
591                obj["status"]["message"] = serde_json::json!(message);
592            }
593
594            obj
595        })
596        .collect();
597
598    let mut resource_attrs = vec![serde_json::json!({
599        "key": "service.name",
600        "value": { "stringValue": service_name }
601    })];
602    for (k, v) in trace_metadata {
603        if k == "service.name" {
604            continue;
605        }
606        resource_attrs.push(serde_json::json!({
607            "key": k,
608            "value": { "stringValue": v }
609        }));
610    }
611
612    let payload = serde_json::json!({
613        "resourceSpans": [{
614            "resource": {
615                "attributes": resource_attrs
616            },
617            "scopeSpans": [{
618                "scope": {
619                    "name": "zeph",
620                    "version": env!("CARGO_PKG_VERSION")
621                },
622                "spans": otlp_spans
623            }]
624        }]
625    });
626
627    serde_json::to_string_pretty(&payload)
628        .unwrap_or_else(|e| format!("{{\"error\": \"serialization failed: {e}\"}}"))
629}
630
631// ─── Helpers ──────────────────────────────────────────────────────────────────
632
633/// Write `data` to `path` with mode 0o600 on Unix (SEC-01).
634/// Falls back to `std::fs::write` on non-Unix platforms.
635fn write_trace_file(path: &Path, data: &[u8]) -> std::io::Result<()> {
636    #[cfg(unix)]
637    {
638        use std::io::Write as _;
639        use std::os::unix::fs::OpenOptionsExt as _;
640        let mut f = std::fs::OpenOptions::new()
641            .write(true)
642            .create(true)
643            .truncate(true)
644            .mode(0o600)
645            .open(path)?;
646        f.write_all(data)
647    }
648    #[cfg(not(unix))]
649    {
650        std::fs::write(path, data)
651    }
652}
653
654fn sanitize_name(name: &str) -> String {
655    name.chars()
656        .map(|c| {
657            if c.is_alphanumeric() || c == '-' || c == '_' || c == '.' {
658                c
659            } else {
660                '_'
661            }
662        })
663        .collect()
664}
665
666// ─── Tests ────────────────────────────────────────────────────────────────────
667
668#[cfg(test)]
669mod tests {
670    use super::*;
671    use tempfile::tempdir;
672
673    fn make_collector(dir: &Path) -> TracingCollector {
674        TracingCollector::new(dir, "zeph-test", HashMap::new(), false, None).unwrap()
675    }
676
677    #[test]
678    fn span_id_generation_is_unique() {
679        let ids: Vec<[u8; 8]> = (0..100).map(|_| new_span_id()).collect();
680        let unique: std::collections::HashSet<[u8; 8]> = ids.into_iter().collect();
681        assert_eq!(unique.len(), 100);
682    }
683
684    #[test]
685    fn hex_lengths_correct() {
686        assert_eq!(hex16(&new_trace_id()).len(), 32);
687        assert_eq!(hex8(new_span_id()).len(), 16);
688    }
689
690    #[test]
691    fn collector_creates_output_dir() {
692        let tmp = tempdir().unwrap();
693        let sub = tmp.path().join("traces");
694        make_collector(&sub);
695        assert!(sub.exists());
696    }
697
698    #[test]
699    fn finish_writes_trace_json() {
700        let tmp = tempdir().unwrap();
701        let mut c = make_collector(tmp.path());
702        c.begin_iteration(0, "hello world");
703        c.end_iteration(0, SpanStatus::Ok);
704        c.finish();
705
706        let path = tmp.path().join("trace.json");
707        assert!(path.exists(), "trace.json must be written");
708
709        let v: serde_json::Value =
710            serde_json::from_str(&std::fs::read_to_string(&path).unwrap()).unwrap();
711        assert!(v["resourceSpans"].is_array());
712        // session + iteration = 2 spans.
713        let spans = v["resourceSpans"][0]["scopeSpans"][0]["spans"]
714            .as_array()
715            .unwrap();
716        assert_eq!(spans.len(), 2);
717    }
718
719    #[test]
720    fn span_hierarchy_parent_child_correct() {
721        let tmp = tempdir().unwrap();
722        let mut c = make_collector(tmp.path());
723        c.begin_iteration(0, "test");
724        let iter_id = c.current_iteration_span_id(0).unwrap();
725        let guard = c.begin_llm_request(iter_id);
726        c.end_llm_request(
727            guard,
728            &LlmAttributes {
729                model: "test-model".to_owned(),
730                prompt_tokens: 100,
731                completion_tokens: 50,
732                latency_ms: 200,
733                streaming: false,
734                cache_hit: false,
735            },
736        );
737        c.end_iteration(0, SpanStatus::Ok);
738        c.finish();
739
740        let content = std::fs::read_to_string(tmp.path().join("trace.json")).unwrap();
741        let v: serde_json::Value = serde_json::from_str(&content).unwrap();
742        let spans = v["resourceSpans"][0]["scopeSpans"][0]["spans"]
743            .as_array()
744            .unwrap();
745        assert_eq!(spans.len(), 3, "session + iteration + llm = 3 spans");
746
747        let llm_span = spans
748            .iter()
749            .find(|s| s["name"] == "llm.request")
750            .expect("llm.request span missing");
751        let iter_span = spans
752            .iter()
753            .find(|s| s["name"] == "iteration.0")
754            .expect("iteration.0 span missing");
755
756        assert_eq!(
757            llm_span["parentSpanId"], iter_span["spanId"],
758            "llm span parent must be iteration span"
759        );
760    }
761
762    #[test]
763    fn redaction_applied_to_text_attributes() {
764        let tmp = tempdir().unwrap();
765        let mut c = TracingCollector::new(tmp.path(), "test", HashMap::new(), true, None).unwrap();
766        let iter_id = c.session_span_id();
767        let guard = c.begin_memory_search(iter_id);
768        c.end_memory_search(
769            guard,
770            &MemorySearchAttributes {
771                query_preview: "search sk-secretkey123 here".to_owned(),
772                result_count: 3,
773                latency_ms: 10,
774            },
775        );
776        c.finish();
777
778        let content = std::fs::read_to_string(tmp.path().join("trace.json")).unwrap();
779        assert!(
780            !content.contains("sk-secretkey123"),
781            "raw secret must be redacted from trace"
782        );
783    }
784
785    #[test]
786    fn otlp_json_format_spec_compliant() {
787        let trace_id = [0xAB_u8; 16];
788        let span_id = [0xCD_u8; 8];
789        let spans = vec![SpanData {
790            trace_id,
791            span_id,
792            parent_span_id: None,
793            name: "session".to_owned(),
794            start_time_unix_nanos: 1_000_000,
795            end_time_unix_nanos: 2_000_000,
796            attributes: vec![("service.name".to_owned(), "zeph".to_owned())],
797            status: SpanStatus::Ok,
798        }];
799
800        let json = serialize_otlp_json(&spans, "zeph", &HashMap::new());
801        let v: serde_json::Value = serde_json::from_str(&json).unwrap();
802
803        let span = &v["resourceSpans"][0]["scopeSpans"][0]["spans"][0];
804        assert_eq!(
805            span["traceId"],
806            "abababababababababababababababababab"[..32]
807        );
808        assert_eq!(span["spanId"], "cdcdcdcdcdcdcdcd");
809        assert_eq!(span["name"], "session");
810        // int64 must be serialized as string per OTLP JSON spec.
811        assert!(span["startTimeUnixNano"].is_string());
812        assert_eq!(span["status"]["code"], 1_u64);
813    }
814
815    #[test]
816    fn drop_flushes_trace() {
817        let tmp = tempdir().unwrap();
818        {
819            let mut c = make_collector(tmp.path());
820            c.begin_iteration(0, "hello");
821            // Drop without explicit finish.
822        }
823        assert!(
824            tmp.path().join("trace.json").exists(),
825            "Drop must flush trace.json"
826        );
827    }
828
829    #[test]
830    fn trace_metadata_included_in_resource_attributes() {
831        let tmp = tempdir().unwrap();
832        let mut meta = HashMap::new();
833        meta.insert("deployment.environment".to_owned(), "staging".to_owned());
834        meta.insert("service.name".to_owned(), "should-be-skipped".to_owned());
835        let mut c = TracingCollector::new(tmp.path(), "zeph-test", meta, false, None).unwrap();
836        c.finish();
837
838        let content = std::fs::read_to_string(tmp.path().join("trace.json")).unwrap();
839        let v: serde_json::Value = serde_json::from_str(&content).unwrap();
840        let attrs = v["resourceSpans"][0]["resource"]["attributes"]
841            .as_array()
842            .unwrap();
843
844        // service.name must appear with the collector's service_name, not the skipped value.
845        let svc = attrs
846            .iter()
847            .find(|a| a["key"] == "service.name")
848            .expect("service.name must be in resource attributes");
849        assert_eq!(svc["value"]["stringValue"], "zeph-test");
850
851        // Custom metadata attribute must be present.
852        let env_attr = attrs.iter().find(|a| a["key"] == "deployment.environment");
853        assert!(
854            env_attr.is_some(),
855            "deployment.environment must be in resource attributes"
856        );
857        assert_eq!(env_attr.unwrap()["value"]["stringValue"], "staging");
858    }
859
860    #[test]
861    fn finish_is_idempotent() {
862        let tmp = tempdir().unwrap();
863        let mut c = make_collector(tmp.path());
864        c.finish();
865        c.finish();
866        assert!(tmp.path().join("trace.json").exists());
867    }
868
869    #[test]
870    fn concurrent_iterations_tracked_independently() {
871        let tmp = tempdir().unwrap();
872        let mut c = make_collector(tmp.path());
873        c.begin_iteration(0, "first");
874        c.begin_iteration(1, "second");
875        let id0 = c.current_iteration_span_id(0).unwrap();
876        let id1 = c.current_iteration_span_id(1).unwrap();
877        assert_ne!(
878            id0, id1,
879            "concurrent iterations must have distinct span IDs"
880        );
881        c.end_iteration(0, SpanStatus::Ok);
882        c.end_iteration(1, SpanStatus::Ok);
883        c.finish();
884
885        let v: serde_json::Value =
886            serde_json::from_str(&std::fs::read_to_string(tmp.path().join("trace.json")).unwrap())
887                .unwrap();
888        let spans = v["resourceSpans"][0]["scopeSpans"][0]["spans"]
889            .as_array()
890            .unwrap();
891        // session + 2 iterations = 3.
892        assert_eq!(spans.len(), 3);
893    }
894
895    #[test]
896    fn trace_format_skips_legacy_numbered_files() {
897        use crate::debug_dump::{DebugDumper, DumpFormat, RequestDebugDump};
898
899        let tmp = tempdir().unwrap();
900        let d = DebugDumper::new(tmp.path(), DumpFormat::Trace).unwrap();
901        let session_dir = d.dir().to_owned();
902        let id = d.dump_request(&RequestDebugDump {
903            model_name: "test",
904            messages: &[],
905            tools: &[],
906            provider_request: serde_json::json!({}),
907            memcot_state: None,
908        });
909        d.dump_response(id, "resp");
910        d.dump_tool_output("shell", "output");
911
912        // No legacy numbered files should be written in Trace format.
913        let files: Vec<_> = std::fs::read_dir(&session_dir)
914            .unwrap()
915            .filter_map(std::result::Result::ok)
916            .filter(|e| {
917                e.file_name()
918                    .to_string_lossy()
919                    .chars()
920                    .next()
921                    .is_some_and(|c| c.is_ascii_digit())
922            })
923            .collect();
924        assert!(
925            files.is_empty(),
926            "no legacy numbered files in Trace format session dir"
927        );
928
929        // trace.json is written into the session subdir by TracingCollector when wired.
930        // Here we only verify the session dir itself exists (TracingCollector is not wired in this test).
931        assert!(session_dir.is_dir(), "session subdir must exist");
932    }
933
934    #[test]
935    fn tool_call_span_emitted() {
936        let tmp = tempdir().unwrap();
937        let mut c = make_collector(tmp.path());
938        c.begin_iteration(0, "test");
939        let iter_id = c.current_iteration_span_id(0).unwrap();
940        let guard = c.begin_tool_call("shell", iter_id);
941        c.end_tool_call(
942            guard,
943            "shell",
944            ToolAttributes {
945                latency_ms: 50,
946                is_error: false,
947                error_kind: None,
948            },
949        );
950        c.end_iteration(0, SpanStatus::Ok);
951        c.finish();
952
953        let content = std::fs::read_to_string(c.trace_json_path()).unwrap();
954        let v: serde_json::Value = serde_json::from_str(&content).unwrap();
955        let spans = v["resourceSpans"][0]["scopeSpans"][0]["spans"]
956            .as_array()
957            .unwrap();
958        assert!(
959            spans.iter().any(|s| s["name"] == "tool.shell"),
960            "tool.shell span must be emitted"
961        );
962    }
963
964    #[test]
965    fn tool_call_error_span_emitted() {
966        let tmp = tempdir().unwrap();
967        let mut c = make_collector(tmp.path());
968        c.begin_iteration(0, "test");
969        let iter_id = c.current_iteration_span_id(0).unwrap();
970        let guard = c.begin_tool_call("shell", iter_id);
971        c.end_tool_call(
972            guard,
973            "shell",
974            ToolAttributes {
975                latency_ms: 10,
976                is_error: true,
977                error_kind: Some("permission denied".to_owned()),
978            },
979        );
980        c.end_iteration(0, SpanStatus::Ok);
981        c.finish();
982
983        let content = std::fs::read_to_string(c.trace_json_path()).unwrap();
984        let v: serde_json::Value = serde_json::from_str(&content).unwrap();
985        let spans = v["resourceSpans"][0]["scopeSpans"][0]["spans"]
986            .as_array()
987            .unwrap();
988        let tool_span = spans
989            .iter()
990            .find(|s| s["name"] == "tool.shell")
991            .expect("tool.shell span missing");
992        assert_eq!(
993            tool_span["status"]["code"], 2_u64,
994            "error span must have status code 2"
995        );
996    }
997
998    #[test]
999    fn begin_tool_call_at_timestamps_precede_end_time() {
1000        let tmp = tempdir().unwrap();
1001        let mut c = make_collector(tmp.path());
1002        c.begin_iteration(0, "test");
1003        let iter_id = c.current_iteration_span_id(0).unwrap();
1004
1005        // Simulate post-hoc assembly: capture start before "execution", then call begin_tool_call_at.
1006        let started_at = std::time::Instant::now();
1007        std::thread::sleep(std::time::Duration::from_millis(2));
1008        let guard = c.begin_tool_call_at("shell", iter_id, &started_at);
1009        let span_start = guard.start_time_unix_nanos;
1010        c.end_tool_call(
1011            guard,
1012            "shell",
1013            ToolAttributes {
1014                latency_ms: 2,
1015                is_error: false,
1016                error_kind: None,
1017            },
1018        );
1019        c.end_iteration(0, SpanStatus::Ok);
1020        c.finish();
1021
1022        let content = std::fs::read_to_string(c.trace_json_path()).unwrap();
1023        let v: serde_json::Value = serde_json::from_str(&content).unwrap();
1024        let spans = v["resourceSpans"][0]["scopeSpans"][0]["spans"]
1025            .as_array()
1026            .unwrap();
1027        let tool_span = spans
1028            .iter()
1029            .find(|s| s["name"] == "tool.shell")
1030            .expect("tool.shell span missing");
1031        let recorded_start: u64 = tool_span["startTimeUnixNano"]
1032            .as_str()
1033            .unwrap()
1034            .parse()
1035            .unwrap();
1036        let recorded_end: u64 = tool_span["endTimeUnixNano"]
1037            .as_str()
1038            .unwrap()
1039            .parse()
1040            .unwrap();
1041        // The span start must be earlier than the end.
1042        assert!(
1043            recorded_start < recorded_end,
1044            "start ({recorded_start}) must precede end ({recorded_end})"
1045        );
1046        // The guard's start_time matches what was serialized.
1047        assert_eq!(
1048            span_start, recorded_start,
1049            "guard start must match serialized start"
1050        );
1051    }
1052
1053    #[test]
1054    fn session_to_iteration_parent_span_id() {
1055        let tmp = tempdir().unwrap();
1056        let mut c = make_collector(tmp.path());
1057        let session_id = c.session_span_id();
1058        c.begin_iteration(0, "test");
1059        c.end_iteration(0, SpanStatus::Ok);
1060        c.finish();
1061
1062        let content = std::fs::read_to_string(c.trace_json_path()).unwrap();
1063        let v: serde_json::Value = serde_json::from_str(&content).unwrap();
1064        let spans = v["resourceSpans"][0]["scopeSpans"][0]["spans"]
1065            .as_array()
1066            .unwrap();
1067        let iter_span = spans
1068            .iter()
1069            .find(|s| s["name"] == "iteration.0")
1070            .expect("iteration.0 span missing");
1071        assert_eq!(
1072            iter_span["parentSpanId"],
1073            serde_json::json!(hex8(session_id)),
1074            "iteration span parent must be session span"
1075        );
1076    }
1077
1078    #[test]
1079    fn json_and_raw_formats_still_write_files() {
1080        use crate::debug_dump::{DebugDumper, DumpFormat, RequestDebugDump};
1081
1082        let tmp = tempdir().unwrap();
1083        for fmt in [DumpFormat::Json, DumpFormat::Raw] {
1084            let d = DebugDumper::new(tmp.path(), fmt).unwrap();
1085            let id = d.dump_request(&RequestDebugDump {
1086                model_name: "test-model",
1087                messages: &[],
1088                tools: &[],
1089                provider_request: serde_json::json!({"model": "test-model", "max_tokens": 100}),
1090                memcot_state: None,
1091            });
1092            d.dump_response(id, "hello");
1093            let session_dir = std::fs::read_dir(tmp.path())
1094                .unwrap()
1095                .filter_map(std::result::Result::ok)
1096                .find(|e| e.file_type().is_ok_and(|t| t.is_dir()))
1097                .unwrap()
1098                .path();
1099            assert!(
1100                session_dir.join("0000-request.json").exists(),
1101                "request file must exist for format {fmt:?}"
1102            );
1103        }
1104    }
1105}