Skip to main content

zeph_core/debug_dump/
trace.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! OpenTelemetry-compatible trace collector for debug sessions.
5//!
6//! Collects span data during an agent session and serializes to OTLP JSON format
7//! at session end. All text-bearing attributes are redacted via `crate::redact::scrub_content`
8//! before storage (C-01).
9//!
10//! Design notes:
11//! - Uses explicit `begin_X` / `end_X` methods with owned `SpanGuard` — safe
12//!   across async `.await` boundaries because no borrow to `TracingCollector` is held (C-02).
13//! - A `HashMap<usize, IterationEntry>` tracks concurrent iterations (I-03).
14//! - `Drop` on `TracingCollector` flushes partial traces on error/panic paths (C-04).
15//! - When the `otel` feature is enabled an `mpsc` channel forwards completed spans to
16//!   the OTLP exporter in `tracing_init.rs` (C-05).
17
18use std::collections::{HashMap, VecDeque};
19use std::path::{Path, PathBuf};
20use std::sync::atomic::{AtomicU64, Ordering};
21use std::time::{SystemTime, UNIX_EPOCH};
22
23use rand::RngExt as _;
24use serde::{Deserialize, Serialize};
25
26use crate::redact::scrub_content;
27
28// ─── Span ID generation ───────────────────────────────────────────────────────
29
30static SPAN_COUNTER: AtomicU64 = AtomicU64::new(0);
31
32#[must_use]
33fn new_trace_id() -> [u8; 16] {
34    rand::rng().random()
35}
36
37#[must_use]
38fn new_span_id() -> [u8; 8] {
39    let mut id: [u8; 8] = rand::rng().random();
40    // XOR low byte with counter to guarantee distinct IDs even under high concurrency.
41    id[0] ^= (SPAN_COUNTER.fetch_add(1, Ordering::Relaxed) & 0xFF) as u8;
42    id
43}
44
45#[must_use]
46fn hex16(b: &[u8; 16]) -> String {
47    use std::fmt::Write as _;
48    let mut s = String::with_capacity(32);
49    for x in b {
50        let _ = write!(s, "{x:02x}");
51    }
52    s
53}
54
55#[must_use]
56fn hex8(b: [u8; 8]) -> String {
57    use std::fmt::Write as _;
58    let mut s = String::with_capacity(16);
59    for x in b {
60        let _ = write!(s, "{x:02x}");
61    }
62    s
63}
64
65#[must_use]
66fn now_unix_nanos() -> u64 {
67    SystemTime::now()
68        .duration_since(UNIX_EPOCH)
69        .map_or(0, |d| u64::try_from(d.as_nanos()).unwrap_or(u64::MAX))
70}
71
72// ─── Public types ─────────────────────────────────────────────────────────────
73
74/// Span status code (matches OTLP `StatusCode` values).
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub enum SpanStatus {
77    Ok,
78    Error { message: String },
79    Unset,
80}
81
82/// A completed span ready for OTLP serialization.
83#[derive(Debug, Clone)]
84pub struct SpanData {
85    pub trace_id: [u8; 16],
86    pub span_id: [u8; 8],
87    pub parent_span_id: Option<[u8; 8]>,
88    pub name: String,
89    pub start_time_unix_nanos: u64,
90    pub end_time_unix_nanos: u64,
91    pub attributes: Vec<(String, String)>,
92    pub status: SpanStatus,
93}
94
95/// Owned guard returned by `begin_*` methods. Pass back to `end_*` to close the span.
96///
97/// Does NOT hold a reference to `TracingCollector` — safe across async `.await` boundaries (C-02).
98pub struct SpanGuard {
99    pub span_id: [u8; 8],
100    pub parent_span_id: [u8; 8],
101    pub name: String,
102    pub start_time_unix_nanos: u64,
103}
104
105/// Attributes for a completed LLM request span.
106pub struct LlmAttributes {
107    pub model: String,
108    pub prompt_tokens: u64,
109    pub completion_tokens: u64,
110    pub latency_ms: u64,
111    pub streaming: bool,
112    pub cache_hit: bool,
113}
114
115/// Attributes for a completed tool call span.
116pub struct ToolAttributes {
117    pub latency_ms: u64,
118    pub is_error: bool,
119    pub error_kind: Option<String>,
120}
121
122/// Attributes for a completed memory search span.
123pub struct MemorySearchAttributes {
124    pub query_preview: String,
125    pub result_count: usize,
126    pub latency_ms: u64,
127}
128
129// ─── Bridge event for OTLP export (C-05) ─────────────────────────────────────
130
131/// Event sent over the mpsc channel to the OTLP exporter.
132///
133/// The root crate wires the sender when the `otel` feature is enabled.
134/// `zeph-core` compiles this unconditionally so the struct is always available.
135#[derive(Debug)]
136pub struct TraceEvent {
137    pub trace_id: [u8; 16],
138    pub spans: Vec<SpanData>,
139}
140
141// ─── Internal ─────────────────────────────────────────────────────────────────
142
143/// Internal entry for a still-open iteration.
144struct IterationEntry {
145    guard: SpanGuard,
146    user_msg_preview: String,
147}
148
149// ─── TracingCollector ─────────────────────────────────────────────────────────
150
151/// Collects OTel-compatible spans for a single agent session.
152///
153/// All methods take `&mut self`. The agent loop is single-threaded within a session.
154/// For concurrent iteration support (I-03), a `HashMap<usize, IterationEntry>` is used.
155/// Default cap on collected spans per session (SEC-02).
156const DEFAULT_MAX_SPANS: usize = 10_000;
157
158pub struct TracingCollector {
159    trace_id: [u8; 16],
160    session_span_id: [u8; 8],
161    session_start: u64,
162    service_name: String,
163    /// User-defined resource attributes from `telemetry.trace_metadata`, included in
164    /// `resourceSpans[].resource.attributes` when serializing to OTLP JSON.
165    trace_metadata: HashMap<String, String>,
166    output_dir: PathBuf,
167    /// Active (open) iterations keyed by iteration index (I-03).
168    active_iterations: HashMap<usize, IterationEntry>,
169    completed_spans: VecDeque<SpanData>,
170    /// Hard cap on `completed_spans` length. Oldest span dropped when exceeded (SEC-02).
171    max_spans: usize,
172    /// Whether to redact text attributes. Defaults to `true` (C-01).
173    redact: bool,
174    /// Guards against double-write on explicit `finish()` followed by `Drop`.
175    flushed: bool,
176    /// Optional channel to forward completed spans to the OTLP exporter.
177    /// Wired by the root crate when the `otel` feature is enabled (C-05).
178    trace_tx: Option<tokio::sync::mpsc::UnboundedSender<TraceEvent>>,
179}
180
181impl TracingCollector {
182    /// Create a new collector.
183    ///
184    /// `trace_metadata` is included as additional resource attributes in the OTLP JSON output.
185    /// The reserved key `service.name` in `trace_metadata` is silently skipped.
186    ///
187    /// # Errors
188    ///
189    /// Returns an error if `output_dir` cannot be created.
190    pub fn new(
191        output_dir: &Path,
192        service_name: impl Into<String>,
193        trace_metadata: HashMap<String, String>,
194        redact: bool,
195        trace_tx: Option<tokio::sync::mpsc::UnboundedSender<TraceEvent>>,
196    ) -> std::io::Result<Self> {
197        std::fs::create_dir_all(output_dir)?;
198        Ok(Self {
199            trace_id: new_trace_id(),
200            session_span_id: new_span_id(),
201            session_start: now_unix_nanos(),
202            service_name: service_name.into(),
203            trace_metadata,
204            output_dir: output_dir.to_owned(),
205            active_iterations: HashMap::new(),
206            completed_spans: VecDeque::new(),
207            max_spans: DEFAULT_MAX_SPANS,
208            redact,
209            flushed: false,
210            trace_tx,
211        })
212    }
213
214    fn maybe_redact<'a>(&self, text: &'a str) -> std::borrow::Cow<'a, str> {
215        if self.redact {
216            scrub_content(text)
217        } else {
218            std::borrow::Cow::Borrowed(text)
219        }
220    }
221
222    /// Append a span, dropping the oldest when `max_spans` is exceeded (SEC-02).
223    fn push_span(&mut self, span: SpanData) {
224        if self.completed_spans.len() >= self.max_spans {
225            tracing::warn!(
226                max_spans = self.max_spans,
227                "trace span cap reached, dropping oldest span"
228            );
229            self.completed_spans.pop_front();
230        }
231        self.completed_spans.push_back(span);
232    }
233
234    // ── Iteration spans ───────────────────────────────────────────────────────
235
236    /// Open an iteration span. Call at the start of `process_user_message`.
237    pub fn begin_iteration(&mut self, index: usize, user_msg_preview: &str) {
238        let preview = self
239            .maybe_redact(user_msg_preview)
240            .chars()
241            .take(100)
242            .collect::<String>();
243        let entry = IterationEntry {
244            guard: SpanGuard {
245                span_id: new_span_id(),
246                parent_span_id: self.session_span_id,
247                name: format!("iteration.{index}"),
248                start_time_unix_nanos: now_unix_nanos(),
249            },
250            user_msg_preview: preview,
251        };
252        self.active_iterations.insert(index, entry);
253    }
254
255    /// Close an iteration span.
256    pub fn end_iteration(&mut self, index: usize, status: SpanStatus) {
257        let end_time = now_unix_nanos();
258        if let Some(entry) = self.active_iterations.remove(&index) {
259            let span = SpanData {
260                trace_id: self.trace_id,
261                span_id: entry.guard.span_id,
262                parent_span_id: Some(entry.guard.parent_span_id),
263                name: entry.guard.name,
264                start_time_unix_nanos: entry.guard.start_time_unix_nanos,
265                end_time_unix_nanos: end_time,
266                attributes: vec![(
267                    "zeph.iteration.user_message_preview".to_owned(),
268                    entry.user_msg_preview,
269                )],
270                status,
271            };
272            self.push_span(span);
273        } else {
274            tracing::warn!(index, "end_iteration without matching begin_iteration");
275        }
276    }
277
278    // ── LLM request spans ─────────────────────────────────────────────────────
279
280    /// Open an LLM request span. Returns an owned `SpanGuard` safe to hold across `.await`.
281    #[must_use]
282    pub fn begin_llm_request(&self, iteration_span_id: [u8; 8]) -> SpanGuard {
283        SpanGuard {
284            span_id: new_span_id(),
285            parent_span_id: iteration_span_id,
286            name: "llm.request".to_owned(),
287            start_time_unix_nanos: now_unix_nanos(),
288        }
289    }
290
291    /// Close an LLM request span.
292    pub fn end_llm_request(&mut self, guard: SpanGuard, attrs: &LlmAttributes) {
293        let end_time = now_unix_nanos();
294        let model_clean = self.maybe_redact(&attrs.model).into_owned();
295        self.push_span(SpanData {
296            trace_id: self.trace_id,
297            span_id: guard.span_id,
298            parent_span_id: Some(guard.parent_span_id),
299            name: guard.name,
300            start_time_unix_nanos: guard.start_time_unix_nanos,
301            end_time_unix_nanos: end_time,
302            attributes: vec![
303                ("zeph.llm.model".to_owned(), model_clean),
304                (
305                    "zeph.llm.prompt_tokens".to_owned(),
306                    attrs.prompt_tokens.to_string(),
307                ),
308                (
309                    "zeph.llm.completion_tokens".to_owned(),
310                    attrs.completion_tokens.to_string(),
311                ),
312                (
313                    "zeph.llm.latency_ms".to_owned(),
314                    attrs.latency_ms.to_string(),
315                ),
316                ("zeph.llm.streaming".to_owned(), attrs.streaming.to_string()),
317                ("zeph.llm.cache_hit".to_owned(), attrs.cache_hit.to_string()),
318            ],
319            status: SpanStatus::Ok,
320        });
321    }
322
323    // ── Tool call spans ───────────────────────────────────────────────────────
324
325    /// Open a tool call span, recording the start time as now.
326    #[must_use]
327    pub fn begin_tool_call(&self, tool_name: &str, iteration_span_id: [u8; 8]) -> SpanGuard {
328        self.begin_tool_call_at(tool_name, iteration_span_id, &std::time::Instant::now())
329    }
330
331    /// Open a tool call span with a pre-recorded start time.
332    ///
333    /// Use this variant when the tool has already executed (post-hoc assembly pattern) and
334    /// `started_at` was captured *before* the call. The Unix start timestamp is back-computed
335    /// from `started_at.elapsed()` so the span is correctly positioned on the timeline.
336    #[must_use]
337    pub fn begin_tool_call_at(
338        &self,
339        tool_name: &str,
340        iteration_span_id: [u8; 8],
341        started_at: &std::time::Instant,
342    ) -> SpanGuard {
343        let elapsed_nanos = u64::try_from(started_at.elapsed().as_nanos()).unwrap_or(u64::MAX);
344        let start_time_unix_nanos = now_unix_nanos().saturating_sub(elapsed_nanos);
345        SpanGuard {
346            span_id: new_span_id(),
347            parent_span_id: iteration_span_id,
348            name: format!("tool.{}", sanitize_name(tool_name)),
349            start_time_unix_nanos,
350        }
351    }
352
353    /// Close a tool call span.
354    pub fn end_tool_call(&mut self, guard: SpanGuard, tool_name: &str, attrs: ToolAttributes) {
355        let end_time = now_unix_nanos();
356        let tool_clean = sanitize_name(tool_name);
357        let mut attributes = vec![
358            ("zeph.tool.name".to_owned(), tool_clean),
359            (
360                "zeph.tool.latency_ms".to_owned(),
361                attrs.latency_ms.to_string(),
362            ),
363            ("zeph.tool.is_error".to_owned(), attrs.is_error.to_string()),
364        ];
365        if let Some(kind) = attrs.error_kind {
366            // IMP-04: apply redaction to error messages (may contain secret data).
367            let kind_clean = self.maybe_redact(&kind).into_owned();
368            attributes.push(("zeph.tool.error_kind".to_owned(), kind_clean));
369        }
370        let status = if attrs.is_error {
371            SpanStatus::Error {
372                message: "tool call failed".to_owned(),
373            }
374        } else {
375            SpanStatus::Ok
376        };
377        self.push_span(SpanData {
378            trace_id: self.trace_id,
379            span_id: guard.span_id,
380            parent_span_id: Some(guard.parent_span_id),
381            name: guard.name,
382            start_time_unix_nanos: guard.start_time_unix_nanos,
383            end_time_unix_nanos: end_time,
384            attributes,
385            status,
386        });
387    }
388
389    // ── Memory search spans ───────────────────────────────────────────────────
390
391    /// Open a memory search span.
392    #[must_use]
393    pub fn begin_memory_search(&self, parent_span_id: [u8; 8]) -> SpanGuard {
394        SpanGuard {
395            span_id: new_span_id(),
396            parent_span_id,
397            name: "memory.search".to_owned(),
398            start_time_unix_nanos: now_unix_nanos(),
399        }
400    }
401
402    /// Close a memory search span.
403    pub fn end_memory_search(&mut self, guard: SpanGuard, attrs: &MemorySearchAttributes) {
404        let end_time = now_unix_nanos();
405        let query_clean = self
406            .maybe_redact(&attrs.query_preview)
407            .chars()
408            .take(100)
409            .collect::<String>();
410        self.push_span(SpanData {
411            trace_id: self.trace_id,
412            span_id: guard.span_id,
413            parent_span_id: Some(guard.parent_span_id),
414            name: guard.name,
415            start_time_unix_nanos: guard.start_time_unix_nanos,
416            end_time_unix_nanos: end_time,
417            attributes: vec![
418                ("zeph.memory.query_preview".to_owned(), query_clean),
419                (
420                    "zeph.memory.result_count".to_owned(),
421                    attrs.result_count.to_string(),
422                ),
423                (
424                    "zeph.memory.latency_ms".to_owned(),
425                    attrs.latency_ms.to_string(),
426                ),
427            ],
428            status: SpanStatus::Ok,
429        });
430    }
431
432    // ── Accessors ─────────────────────────────────────────────────────────────
433
434    /// Return the path to the `trace.json` file that will be written on `finish()`.
435    #[must_use]
436    pub fn trace_json_path(&self) -> PathBuf {
437        self.output_dir.join("trace.json")
438    }
439
440    /// Return the span ID of the currently active iteration, if any.
441    #[must_use]
442    pub fn current_iteration_span_id(&self, index: usize) -> Option<[u8; 8]> {
443        self.active_iterations.get(&index).map(|e| e.guard.span_id)
444    }
445
446    /// Return the session root span ID (fallback parent when no iteration is active).
447    #[must_use]
448    pub fn session_span_id(&self) -> [u8; 8] {
449        self.session_span_id
450    }
451
452    /// Return the trace ID for this session.
453    #[must_use]
454    pub fn trace_id(&self) -> [u8; 16] {
455        self.trace_id
456    }
457
458    // ── Flush ─────────────────────────────────────────────────────────────────
459
460    /// Finalize the session span and write `trace.json`.
461    ///
462    /// Safe to call multiple times — subsequent calls after the first are no-ops.
463    /// Also sends spans over the `OTel` channel when the `otel` feature is enabled (C-05).
464    pub fn finish(&mut self) {
465        if self.flushed {
466            return;
467        }
468        self.flushed = true;
469
470        // Close any still-open iteration spans with `Unset` status (partial trace on error/cancel).
471        let open_keys: Vec<usize> = self.active_iterations.keys().copied().collect();
472        let end_time = now_unix_nanos();
473        for index in open_keys {
474            if let Some(entry) = self.active_iterations.remove(&index) {
475                self.push_span(SpanData {
476                    trace_id: self.trace_id,
477                    span_id: entry.guard.span_id,
478                    parent_span_id: Some(entry.guard.parent_span_id),
479                    name: entry.guard.name,
480                    start_time_unix_nanos: entry.guard.start_time_unix_nanos,
481                    end_time_unix_nanos: end_time,
482                    attributes: vec![(
483                        "zeph.iteration.user_message_preview".to_owned(),
484                        entry.user_msg_preview,
485                    )],
486                    status: SpanStatus::Unset,
487                });
488            }
489        }
490
491        let session_span = SpanData {
492            trace_id: self.trace_id,
493            span_id: self.session_span_id,
494            parent_span_id: None,
495            name: "session".to_owned(),
496            start_time_unix_nanos: self.session_start,
497            end_time_unix_nanos: end_time,
498            attributes: vec![
499                ("service.name".to_owned(), self.service_name.clone()),
500                ("zeph.session.trace_id".to_owned(), hex16(&self.trace_id)),
501            ],
502            status: SpanStatus::Ok,
503        };
504
505        let mut all_spans = vec![session_span];
506        all_spans.extend(self.completed_spans.drain(..));
507
508        let json = serialize_otlp_json(&all_spans, &self.service_name, &self.trace_metadata);
509        let path = self.output_dir.join("trace.json");
510        if let Err(e) = write_trace_file(&path, json.as_bytes()) {
511            tracing::warn!(path = %path.display(), error = %e, "trace.json write failed");
512        } else {
513            tracing::info!(path = %path.display(), "OTel trace written");
514        }
515
516        // C-05: forward spans to OTLP exporter when the root crate has wired the channel.
517        if let Some(ref tx) = self.trace_tx {
518            let event = TraceEvent {
519                trace_id: self.trace_id,
520                spans: all_spans,
521            };
522            if tx.send(event).is_err() {
523                tracing::debug!("OTLP trace channel closed, skipping export");
524            }
525        }
526    }
527}
528
529// C-04: Drop flushes partial traces on error/panic/cancellation.
530impl Drop for TracingCollector {
531    fn drop(&mut self) {
532        self.finish();
533    }
534}
535
536// ─── OTLP JSON serialization (I-04) ───────────────────────────────────────────
537
538fn span_status_code(status: &SpanStatus) -> u8 {
539    match status {
540        SpanStatus::Unset => 0,
541        SpanStatus::Ok => 1,
542        SpanStatus::Error { .. } => 2,
543    }
544}
545
546/// Serialize spans to OTLP JSON Protobuf encoding.
547///
548/// `trace_metadata` entries are appended as additional resource attributes after `service.name`.
549/// The reserved key `service.name` in `trace_metadata` is skipped to avoid duplication.
550///
551/// Format: <https://opentelemetry.io/docs/specs/otlp/#json-protobuf-encoding>
552#[must_use]
553pub fn serialize_otlp_json<S: std::hash::BuildHasher>(
554    spans: &[SpanData],
555    service_name: &str,
556    trace_metadata: &HashMap<String, String, S>,
557) -> String {
558    let otlp_spans: Vec<serde_json::Value> = spans
559        .iter()
560        .map(|s| {
561            let attrs: Vec<serde_json::Value> = s
562                .attributes
563                .iter()
564                .map(|(k, v)| {
565                    serde_json::json!({
566                        "key": k,
567                        "value": { "stringValue": v }
568                    })
569                })
570                .collect();
571
572            let mut obj = serde_json::json!({
573                "traceId": hex16(&s.trace_id),
574                "spanId": hex8(s.span_id),
575                "name": s.name,
576                // OTLP JSON spec requires int64 fields as strings.
577                "startTimeUnixNano": s.start_time_unix_nanos.to_string(),
578                "endTimeUnixNano": s.end_time_unix_nanos.to_string(),
579                "attributes": attrs,
580                "status": {
581                    "code": span_status_code(&s.status)
582                }
583            });
584
585            if let Some(parent) = s.parent_span_id {
586                obj["parentSpanId"] = serde_json::json!(hex8(parent));
587            }
588
589            if let SpanStatus::Error { message } = &s.status {
590                obj["status"]["message"] = serde_json::json!(message);
591            }
592
593            obj
594        })
595        .collect();
596
597    let mut resource_attrs = vec![serde_json::json!({
598        "key": "service.name",
599        "value": { "stringValue": service_name }
600    })];
601    for (k, v) in trace_metadata {
602        if k == "service.name" {
603            continue;
604        }
605        resource_attrs.push(serde_json::json!({
606            "key": k,
607            "value": { "stringValue": v }
608        }));
609    }
610
611    let payload = serde_json::json!({
612        "resourceSpans": [{
613            "resource": {
614                "attributes": resource_attrs
615            },
616            "scopeSpans": [{
617                "scope": {
618                    "name": "zeph",
619                    "version": env!("CARGO_PKG_VERSION")
620                },
621                "spans": otlp_spans
622            }]
623        }]
624    });
625
626    serde_json::to_string_pretty(&payload)
627        .unwrap_or_else(|e| format!("{{\"error\": \"serialization failed: {e}\"}}"))
628}
629
630// ─── Helpers ──────────────────────────────────────────────────────────────────
631
632/// Write `data` to `path` with mode 0o600 on Unix (SEC-01).
633/// Falls back to `std::fs::write` on non-Unix platforms.
634fn write_trace_file(path: &Path, data: &[u8]) -> std::io::Result<()> {
635    #[cfg(unix)]
636    {
637        use std::io::Write as _;
638        use std::os::unix::fs::OpenOptionsExt as _;
639        let mut f = std::fs::OpenOptions::new()
640            .write(true)
641            .create(true)
642            .truncate(true)
643            .mode(0o600)
644            .open(path)?;
645        f.write_all(data)
646    }
647    #[cfg(not(unix))]
648    {
649        std::fs::write(path, data)
650    }
651}
652
653fn sanitize_name(name: &str) -> String {
654    name.chars()
655        .map(|c| {
656            if c.is_alphanumeric() || c == '-' || c == '_' || c == '.' {
657                c
658            } else {
659                '_'
660            }
661        })
662        .collect()
663}
664
665// ─── Tests ────────────────────────────────────────────────────────────────────
666
667#[cfg(test)]
668mod tests {
669    use super::*;
670    use tempfile::tempdir;
671
672    fn make_collector(dir: &Path) -> TracingCollector {
673        TracingCollector::new(dir, "zeph-test", HashMap::new(), false, None).unwrap()
674    }
675
676    #[test]
677    fn span_id_generation_is_unique() {
678        let ids: Vec<[u8; 8]> = (0..100).map(|_| new_span_id()).collect();
679        let unique: std::collections::HashSet<[u8; 8]> = ids.into_iter().collect();
680        assert_eq!(unique.len(), 100);
681    }
682
683    #[test]
684    fn hex_lengths_correct() {
685        assert_eq!(hex16(&new_trace_id()).len(), 32);
686        assert_eq!(hex8(new_span_id()).len(), 16);
687    }
688
689    #[test]
690    fn collector_creates_output_dir() {
691        let tmp = tempdir().unwrap();
692        let sub = tmp.path().join("traces");
693        make_collector(&sub);
694        assert!(sub.exists());
695    }
696
697    #[test]
698    fn finish_writes_trace_json() {
699        let tmp = tempdir().unwrap();
700        let mut c = make_collector(tmp.path());
701        c.begin_iteration(0, "hello world");
702        c.end_iteration(0, SpanStatus::Ok);
703        c.finish();
704
705        let path = tmp.path().join("trace.json");
706        assert!(path.exists(), "trace.json must be written");
707
708        let v: serde_json::Value =
709            serde_json::from_str(&std::fs::read_to_string(&path).unwrap()).unwrap();
710        assert!(v["resourceSpans"].is_array());
711        // session + iteration = 2 spans.
712        let spans = v["resourceSpans"][0]["scopeSpans"][0]["spans"]
713            .as_array()
714            .unwrap();
715        assert_eq!(spans.len(), 2);
716    }
717
718    #[test]
719    fn span_hierarchy_parent_child_correct() {
720        let tmp = tempdir().unwrap();
721        let mut c = make_collector(tmp.path());
722        c.begin_iteration(0, "test");
723        let iter_id = c.current_iteration_span_id(0).unwrap();
724        let guard = c.begin_llm_request(iter_id);
725        c.end_llm_request(
726            guard,
727            &LlmAttributes {
728                model: "test-model".to_owned(),
729                prompt_tokens: 100,
730                completion_tokens: 50,
731                latency_ms: 200,
732                streaming: false,
733                cache_hit: false,
734            },
735        );
736        c.end_iteration(0, SpanStatus::Ok);
737        c.finish();
738
739        let content = std::fs::read_to_string(tmp.path().join("trace.json")).unwrap();
740        let v: serde_json::Value = serde_json::from_str(&content).unwrap();
741        let spans = v["resourceSpans"][0]["scopeSpans"][0]["spans"]
742            .as_array()
743            .unwrap();
744        assert_eq!(spans.len(), 3, "session + iteration + llm = 3 spans");
745
746        let llm_span = spans
747            .iter()
748            .find(|s| s["name"] == "llm.request")
749            .expect("llm.request span missing");
750        let iter_span = spans
751            .iter()
752            .find(|s| s["name"] == "iteration.0")
753            .expect("iteration.0 span missing");
754
755        assert_eq!(
756            llm_span["parentSpanId"], iter_span["spanId"],
757            "llm span parent must be iteration span"
758        );
759    }
760
761    #[test]
762    fn redaction_applied_to_text_attributes() {
763        let tmp = tempdir().unwrap();
764        let mut c = TracingCollector::new(tmp.path(), "test", HashMap::new(), true, None).unwrap();
765        let iter_id = c.session_span_id();
766        let guard = c.begin_memory_search(iter_id);
767        c.end_memory_search(
768            guard,
769            &MemorySearchAttributes {
770                query_preview: "search sk-secretkey123 here".to_owned(),
771                result_count: 3,
772                latency_ms: 10,
773            },
774        );
775        c.finish();
776
777        let content = std::fs::read_to_string(tmp.path().join("trace.json")).unwrap();
778        assert!(
779            !content.contains("sk-secretkey123"),
780            "raw secret must be redacted from trace"
781        );
782    }
783
784    #[test]
785    fn otlp_json_format_spec_compliant() {
786        let trace_id = [0xAB_u8; 16];
787        let span_id = [0xCD_u8; 8];
788        let spans = vec![SpanData {
789            trace_id,
790            span_id,
791            parent_span_id: None,
792            name: "session".to_owned(),
793            start_time_unix_nanos: 1_000_000,
794            end_time_unix_nanos: 2_000_000,
795            attributes: vec![("service.name".to_owned(), "zeph".to_owned())],
796            status: SpanStatus::Ok,
797        }];
798
799        let json = serialize_otlp_json(&spans, "zeph", &HashMap::new());
800        let v: serde_json::Value = serde_json::from_str(&json).unwrap();
801
802        let span = &v["resourceSpans"][0]["scopeSpans"][0]["spans"][0];
803        assert_eq!(
804            span["traceId"],
805            "abababababababababababababababababab"[..32]
806        );
807        assert_eq!(span["spanId"], "cdcdcdcdcdcdcdcd");
808        assert_eq!(span["name"], "session");
809        // int64 must be serialized as string per OTLP JSON spec.
810        assert!(span["startTimeUnixNano"].is_string());
811        assert_eq!(span["status"]["code"], 1_u64);
812    }
813
814    #[test]
815    fn drop_flushes_trace() {
816        let tmp = tempdir().unwrap();
817        {
818            let mut c = make_collector(tmp.path());
819            c.begin_iteration(0, "hello");
820            // Drop without explicit finish.
821        }
822        assert!(
823            tmp.path().join("trace.json").exists(),
824            "Drop must flush trace.json"
825        );
826    }
827
828    #[test]
829    fn trace_metadata_included_in_resource_attributes() {
830        let tmp = tempdir().unwrap();
831        let mut meta = HashMap::new();
832        meta.insert("deployment.environment".to_owned(), "staging".to_owned());
833        meta.insert("service.name".to_owned(), "should-be-skipped".to_owned());
834        let mut c = TracingCollector::new(tmp.path(), "zeph-test", meta, false, None).unwrap();
835        c.finish();
836
837        let content = std::fs::read_to_string(tmp.path().join("trace.json")).unwrap();
838        let v: serde_json::Value = serde_json::from_str(&content).unwrap();
839        let attrs = v["resourceSpans"][0]["resource"]["attributes"]
840            .as_array()
841            .unwrap();
842
843        // service.name must appear with the collector's service_name, not the skipped value.
844        let svc = attrs
845            .iter()
846            .find(|a| a["key"] == "service.name")
847            .expect("service.name must be in resource attributes");
848        assert_eq!(svc["value"]["stringValue"], "zeph-test");
849
850        // Custom metadata attribute must be present.
851        let env_attr = attrs.iter().find(|a| a["key"] == "deployment.environment");
852        assert!(
853            env_attr.is_some(),
854            "deployment.environment must be in resource attributes"
855        );
856        assert_eq!(env_attr.unwrap()["value"]["stringValue"], "staging");
857    }
858
859    #[test]
860    fn finish_is_idempotent() {
861        let tmp = tempdir().unwrap();
862        let mut c = make_collector(tmp.path());
863        c.finish();
864        c.finish();
865        assert!(tmp.path().join("trace.json").exists());
866    }
867
868    #[test]
869    fn concurrent_iterations_tracked_independently() {
870        let tmp = tempdir().unwrap();
871        let mut c = make_collector(tmp.path());
872        c.begin_iteration(0, "first");
873        c.begin_iteration(1, "second");
874        let id0 = c.current_iteration_span_id(0).unwrap();
875        let id1 = c.current_iteration_span_id(1).unwrap();
876        assert_ne!(
877            id0, id1,
878            "concurrent iterations must have distinct span IDs"
879        );
880        c.end_iteration(0, SpanStatus::Ok);
881        c.end_iteration(1, SpanStatus::Ok);
882        c.finish();
883
884        let v: serde_json::Value =
885            serde_json::from_str(&std::fs::read_to_string(tmp.path().join("trace.json")).unwrap())
886                .unwrap();
887        let spans = v["resourceSpans"][0]["scopeSpans"][0]["spans"]
888            .as_array()
889            .unwrap();
890        // session + 2 iterations = 3.
891        assert_eq!(spans.len(), 3);
892    }
893
894    #[test]
895    fn trace_format_skips_legacy_numbered_files() {
896        use crate::debug_dump::{DebugDumper, DumpFormat, RequestDebugDump};
897
898        let tmp = tempdir().unwrap();
899        let d = DebugDumper::new(tmp.path(), DumpFormat::Trace).unwrap();
900        let session_dir = d.dir().to_owned();
901        let id = d.dump_request(&RequestDebugDump {
902            model_name: "test",
903            messages: &[],
904            tools: &[],
905            provider_request: serde_json::json!({}),
906            memcot_state: None,
907        });
908        d.dump_response(id, "resp");
909        d.dump_tool_output("shell", "output");
910
911        // No legacy numbered files should be written in Trace format.
912        let files: Vec<_> = std::fs::read_dir(&session_dir)
913            .unwrap()
914            .filter_map(std::result::Result::ok)
915            .filter(|e| {
916                e.file_name()
917                    .to_string_lossy()
918                    .chars()
919                    .next()
920                    .is_some_and(|c| c.is_ascii_digit())
921            })
922            .collect();
923        assert!(
924            files.is_empty(),
925            "no legacy numbered files in Trace format session dir"
926        );
927
928        // trace.json is written into the session subdir by TracingCollector when wired.
929        // Here we only verify the session dir itself exists (TracingCollector is not wired in this test).
930        assert!(session_dir.is_dir(), "session subdir must exist");
931    }
932
933    #[test]
934    fn tool_call_span_emitted() {
935        let tmp = tempdir().unwrap();
936        let mut c = make_collector(tmp.path());
937        c.begin_iteration(0, "test");
938        let iter_id = c.current_iteration_span_id(0).unwrap();
939        let guard = c.begin_tool_call("shell", iter_id);
940        c.end_tool_call(
941            guard,
942            "shell",
943            ToolAttributes {
944                latency_ms: 50,
945                is_error: false,
946                error_kind: None,
947            },
948        );
949        c.end_iteration(0, SpanStatus::Ok);
950        c.finish();
951
952        let content = std::fs::read_to_string(c.trace_json_path()).unwrap();
953        let v: serde_json::Value = serde_json::from_str(&content).unwrap();
954        let spans = v["resourceSpans"][0]["scopeSpans"][0]["spans"]
955            .as_array()
956            .unwrap();
957        assert!(
958            spans.iter().any(|s| s["name"] == "tool.shell"),
959            "tool.shell span must be emitted"
960        );
961    }
962
963    #[test]
964    fn tool_call_error_span_emitted() {
965        let tmp = tempdir().unwrap();
966        let mut c = make_collector(tmp.path());
967        c.begin_iteration(0, "test");
968        let iter_id = c.current_iteration_span_id(0).unwrap();
969        let guard = c.begin_tool_call("shell", iter_id);
970        c.end_tool_call(
971            guard,
972            "shell",
973            ToolAttributes {
974                latency_ms: 10,
975                is_error: true,
976                error_kind: Some("permission denied".to_owned()),
977            },
978        );
979        c.end_iteration(0, SpanStatus::Ok);
980        c.finish();
981
982        let content = std::fs::read_to_string(c.trace_json_path()).unwrap();
983        let v: serde_json::Value = serde_json::from_str(&content).unwrap();
984        let spans = v["resourceSpans"][0]["scopeSpans"][0]["spans"]
985            .as_array()
986            .unwrap();
987        let tool_span = spans
988            .iter()
989            .find(|s| s["name"] == "tool.shell")
990            .expect("tool.shell span missing");
991        assert_eq!(
992            tool_span["status"]["code"], 2_u64,
993            "error span must have status code 2"
994        );
995    }
996
997    #[test]
998    fn begin_tool_call_at_timestamps_precede_end_time() {
999        let tmp = tempdir().unwrap();
1000        let mut c = make_collector(tmp.path());
1001        c.begin_iteration(0, "test");
1002        let iter_id = c.current_iteration_span_id(0).unwrap();
1003
1004        // Simulate post-hoc assembly: capture start before "execution", then call begin_tool_call_at.
1005        let started_at = std::time::Instant::now();
1006        std::thread::sleep(std::time::Duration::from_millis(2));
1007        let guard = c.begin_tool_call_at("shell", iter_id, &started_at);
1008        let span_start = guard.start_time_unix_nanos;
1009        c.end_tool_call(
1010            guard,
1011            "shell",
1012            ToolAttributes {
1013                latency_ms: 2,
1014                is_error: false,
1015                error_kind: None,
1016            },
1017        );
1018        c.end_iteration(0, SpanStatus::Ok);
1019        c.finish();
1020
1021        let content = std::fs::read_to_string(c.trace_json_path()).unwrap();
1022        let v: serde_json::Value = serde_json::from_str(&content).unwrap();
1023        let spans = v["resourceSpans"][0]["scopeSpans"][0]["spans"]
1024            .as_array()
1025            .unwrap();
1026        let tool_span = spans
1027            .iter()
1028            .find(|s| s["name"] == "tool.shell")
1029            .expect("tool.shell span missing");
1030        let recorded_start: u64 = tool_span["startTimeUnixNano"]
1031            .as_str()
1032            .unwrap()
1033            .parse()
1034            .unwrap();
1035        let recorded_end: u64 = tool_span["endTimeUnixNano"]
1036            .as_str()
1037            .unwrap()
1038            .parse()
1039            .unwrap();
1040        // The span start must be earlier than the end.
1041        assert!(
1042            recorded_start < recorded_end,
1043            "start ({recorded_start}) must precede end ({recorded_end})"
1044        );
1045        // The guard's start_time matches what was serialized.
1046        assert_eq!(
1047            span_start, recorded_start,
1048            "guard start must match serialized start"
1049        );
1050    }
1051
1052    #[test]
1053    fn session_to_iteration_parent_span_id() {
1054        let tmp = tempdir().unwrap();
1055        let mut c = make_collector(tmp.path());
1056        let session_id = c.session_span_id();
1057        c.begin_iteration(0, "test");
1058        c.end_iteration(0, SpanStatus::Ok);
1059        c.finish();
1060
1061        let content = std::fs::read_to_string(c.trace_json_path()).unwrap();
1062        let v: serde_json::Value = serde_json::from_str(&content).unwrap();
1063        let spans = v["resourceSpans"][0]["scopeSpans"][0]["spans"]
1064            .as_array()
1065            .unwrap();
1066        let iter_span = spans
1067            .iter()
1068            .find(|s| s["name"] == "iteration.0")
1069            .expect("iteration.0 span missing");
1070        assert_eq!(
1071            iter_span["parentSpanId"],
1072            serde_json::json!(hex8(session_id)),
1073            "iteration span parent must be session span"
1074        );
1075    }
1076
1077    #[test]
1078    fn json_and_raw_formats_still_write_files() {
1079        use crate::debug_dump::{DebugDumper, DumpFormat, RequestDebugDump};
1080
1081        let tmp = tempdir().unwrap();
1082        for fmt in [DumpFormat::Json, DumpFormat::Raw] {
1083            let d = DebugDumper::new(tmp.path(), fmt).unwrap();
1084            let id = d.dump_request(&RequestDebugDump {
1085                model_name: "test-model",
1086                messages: &[],
1087                tools: &[],
1088                provider_request: serde_json::json!({"model": "test-model", "max_tokens": 100}),
1089                memcot_state: None,
1090            });
1091            d.dump_response(id, "hello");
1092            let session_dir = std::fs::read_dir(tmp.path())
1093                .unwrap()
1094                .filter_map(std::result::Result::ok)
1095                .find(|e| e.file_type().is_ok_and(|t| t.is_dir()))
1096                .unwrap()
1097                .path();
1098            assert!(
1099                session_dir.join("0000-request.json").exists(),
1100                "request file must exist for format {fmt:?}"
1101            );
1102        }
1103    }
1104}