1use std::collections::HashMap;
19use std::path::{Path, PathBuf};
20use std::sync::atomic::{AtomicU64, Ordering};
21use std::time::{SystemTime, UNIX_EPOCH};
22
23use rand::RngExt as _;
24use serde::{Deserialize, Serialize};
25
26use crate::redact::scrub_content;
27
28static SPAN_COUNTER: AtomicU64 = AtomicU64::new(0);
31
32#[must_use]
33fn new_trace_id() -> [u8; 16] {
34 rand::rng().random()
35}
36
37#[must_use]
38fn new_span_id() -> [u8; 8] {
39 let mut id: [u8; 8] = rand::rng().random();
40 id[0] ^= (SPAN_COUNTER.fetch_add(1, Ordering::Relaxed) & 0xFF) as u8;
42 id
43}
44
45#[must_use]
46fn hex16(b: &[u8; 16]) -> String {
47 use std::fmt::Write as _;
48 let mut s = String::with_capacity(32);
49 for x in b {
50 let _ = write!(s, "{x:02x}");
51 }
52 s
53}
54
55#[must_use]
56fn hex8(b: [u8; 8]) -> String {
57 use std::fmt::Write as _;
58 let mut s = String::with_capacity(16);
59 for x in b {
60 let _ = write!(s, "{x:02x}");
61 }
62 s
63}
64
65#[must_use]
66fn now_unix_nanos() -> u64 {
67 SystemTime::now()
68 .duration_since(UNIX_EPOCH)
69 .map_or(0, |d| u64::try_from(d.as_nanos()).unwrap_or(u64::MAX))
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
76pub enum SpanStatus {
77 Ok,
78 Error { message: String },
79 Unset,
80}
81
82#[derive(Debug, Clone)]
84pub struct SpanData {
85 pub trace_id: [u8; 16],
86 pub span_id: [u8; 8],
87 pub parent_span_id: Option<[u8; 8]>,
88 pub name: String,
89 pub start_time_unix_nanos: u64,
90 pub end_time_unix_nanos: u64,
91 pub attributes: Vec<(String, String)>,
92 pub status: SpanStatus,
93}
94
95pub struct SpanGuard {
99 pub span_id: [u8; 8],
100 pub parent_span_id: [u8; 8],
101 pub name: String,
102 pub start_time_unix_nanos: u64,
103}
104
105pub struct LlmAttributes {
107 pub model: String,
108 pub prompt_tokens: u64,
109 pub completion_tokens: u64,
110 pub latency_ms: u64,
111 pub streaming: bool,
112 pub cache_hit: bool,
113}
114
115pub struct ToolAttributes {
117 pub latency_ms: u64,
118 pub is_error: bool,
119 pub error_kind: Option<String>,
120}
121
122pub struct MemorySearchAttributes {
124 pub query_preview: String,
125 pub result_count: usize,
126 pub latency_ms: u64,
127}
128
129#[derive(Debug)]
136pub struct TraceEvent {
137 pub trace_id: [u8; 16],
138 pub spans: Vec<SpanData>,
139}
140
141struct IterationEntry {
145 guard: SpanGuard,
146 user_msg_preview: String,
147}
148
149const DEFAULT_MAX_SPANS: usize = 10_000;
157
158pub struct TracingCollector {
159 trace_id: [u8; 16],
160 session_span_id: [u8; 8],
161 session_start: u64,
162 service_name: String,
163 output_dir: PathBuf,
164 active_iterations: HashMap<usize, IterationEntry>,
166 completed_spans: Vec<SpanData>,
167 max_spans: usize,
169 redact: bool,
171 flushed: bool,
173 trace_tx: Option<tokio::sync::mpsc::UnboundedSender<TraceEvent>>,
176}
177
178impl TracingCollector {
179 pub fn new(
185 output_dir: &Path,
186 service_name: impl Into<String>,
187 redact: bool,
188 trace_tx: Option<tokio::sync::mpsc::UnboundedSender<TraceEvent>>,
189 ) -> std::io::Result<Self> {
190 std::fs::create_dir_all(output_dir)?;
191 Ok(Self {
192 trace_id: new_trace_id(),
193 session_span_id: new_span_id(),
194 session_start: now_unix_nanos(),
195 service_name: service_name.into(),
196 output_dir: output_dir.to_owned(),
197 active_iterations: HashMap::new(),
198 completed_spans: Vec::new(),
199 max_spans: DEFAULT_MAX_SPANS,
200 redact,
201 flushed: false,
202 trace_tx,
203 })
204 }
205
206 fn maybe_redact<'a>(&self, text: &'a str) -> std::borrow::Cow<'a, str> {
207 if self.redact {
208 scrub_content(text)
209 } else {
210 std::borrow::Cow::Borrowed(text)
211 }
212 }
213
214 fn push_span(&mut self, span: SpanData) {
216 if self.completed_spans.len() >= self.max_spans {
217 tracing::warn!(
218 max_spans = self.max_spans,
219 "trace span cap reached, dropping oldest span"
220 );
221 self.completed_spans.remove(0); }
223 self.completed_spans.push(span);
224 }
225
226 pub fn begin_iteration(&mut self, index: usize, user_msg_preview: &str) {
230 let preview = self
231 .maybe_redact(user_msg_preview)
232 .chars()
233 .take(100)
234 .collect::<String>();
235 let entry = IterationEntry {
236 guard: SpanGuard {
237 span_id: new_span_id(),
238 parent_span_id: self.session_span_id,
239 name: format!("iteration.{index}"),
240 start_time_unix_nanos: now_unix_nanos(),
241 },
242 user_msg_preview: preview,
243 };
244 self.active_iterations.insert(index, entry);
245 }
246
247 pub fn end_iteration(&mut self, index: usize, status: SpanStatus) {
249 let end_time = now_unix_nanos();
250 if let Some(entry) = self.active_iterations.remove(&index) {
251 let span = SpanData {
252 trace_id: self.trace_id,
253 span_id: entry.guard.span_id,
254 parent_span_id: Some(entry.guard.parent_span_id),
255 name: entry.guard.name,
256 start_time_unix_nanos: entry.guard.start_time_unix_nanos,
257 end_time_unix_nanos: end_time,
258 attributes: vec![(
259 "zeph.iteration.user_message_preview".to_owned(),
260 entry.user_msg_preview,
261 )],
262 status,
263 };
264 self.push_span(span);
265 } else {
266 tracing::warn!(index, "end_iteration without matching begin_iteration");
267 }
268 }
269
270 #[must_use]
274 pub fn begin_llm_request(&self, iteration_span_id: [u8; 8]) -> SpanGuard {
275 SpanGuard {
276 span_id: new_span_id(),
277 parent_span_id: iteration_span_id,
278 name: "llm.request".to_owned(),
279 start_time_unix_nanos: now_unix_nanos(),
280 }
281 }
282
283 pub fn end_llm_request(&mut self, guard: SpanGuard, attrs: &LlmAttributes) {
285 let end_time = now_unix_nanos();
286 let model_clean = self.maybe_redact(&attrs.model).into_owned();
287 self.push_span(SpanData {
288 trace_id: self.trace_id,
289 span_id: guard.span_id,
290 parent_span_id: Some(guard.parent_span_id),
291 name: guard.name,
292 start_time_unix_nanos: guard.start_time_unix_nanos,
293 end_time_unix_nanos: end_time,
294 attributes: vec![
295 ("zeph.llm.model".to_owned(), model_clean),
296 (
297 "zeph.llm.prompt_tokens".to_owned(),
298 attrs.prompt_tokens.to_string(),
299 ),
300 (
301 "zeph.llm.completion_tokens".to_owned(),
302 attrs.completion_tokens.to_string(),
303 ),
304 (
305 "zeph.llm.latency_ms".to_owned(),
306 attrs.latency_ms.to_string(),
307 ),
308 ("zeph.llm.streaming".to_owned(), attrs.streaming.to_string()),
309 ("zeph.llm.cache_hit".to_owned(), attrs.cache_hit.to_string()),
310 ],
311 status: SpanStatus::Ok,
312 });
313 }
314
315 #[must_use]
319 pub fn begin_tool_call(&self, tool_name: &str, iteration_span_id: [u8; 8]) -> SpanGuard {
320 self.begin_tool_call_at(tool_name, iteration_span_id, &std::time::Instant::now())
321 }
322
323 #[must_use]
329 pub fn begin_tool_call_at(
330 &self,
331 tool_name: &str,
332 iteration_span_id: [u8; 8],
333 started_at: &std::time::Instant,
334 ) -> SpanGuard {
335 let elapsed_nanos = u64::try_from(started_at.elapsed().as_nanos()).unwrap_or(u64::MAX);
336 let start_time_unix_nanos = now_unix_nanos().saturating_sub(elapsed_nanos);
337 SpanGuard {
338 span_id: new_span_id(),
339 parent_span_id: iteration_span_id,
340 name: format!("tool.{}", sanitize_name(tool_name)),
341 start_time_unix_nanos,
342 }
343 }
344
345 pub fn end_tool_call(&mut self, guard: SpanGuard, tool_name: &str, attrs: ToolAttributes) {
347 let end_time = now_unix_nanos();
348 let tool_clean = sanitize_name(tool_name);
349 let mut attributes = vec![
350 ("zeph.tool.name".to_owned(), tool_clean),
351 (
352 "zeph.tool.latency_ms".to_owned(),
353 attrs.latency_ms.to_string(),
354 ),
355 ("zeph.tool.is_error".to_owned(), attrs.is_error.to_string()),
356 ];
357 if let Some(kind) = attrs.error_kind {
358 let kind_clean = self.maybe_redact(&kind).into_owned();
360 attributes.push(("zeph.tool.error_kind".to_owned(), kind_clean));
361 }
362 let status = if attrs.is_error {
363 SpanStatus::Error {
364 message: "tool call failed".to_owned(),
365 }
366 } else {
367 SpanStatus::Ok
368 };
369 self.push_span(SpanData {
370 trace_id: self.trace_id,
371 span_id: guard.span_id,
372 parent_span_id: Some(guard.parent_span_id),
373 name: guard.name,
374 start_time_unix_nanos: guard.start_time_unix_nanos,
375 end_time_unix_nanos: end_time,
376 attributes,
377 status,
378 });
379 }
380
381 #[must_use]
385 pub fn begin_memory_search(&self, parent_span_id: [u8; 8]) -> SpanGuard {
386 SpanGuard {
387 span_id: new_span_id(),
388 parent_span_id,
389 name: "memory.search".to_owned(),
390 start_time_unix_nanos: now_unix_nanos(),
391 }
392 }
393
394 pub fn end_memory_search(&mut self, guard: SpanGuard, attrs: &MemorySearchAttributes) {
396 let end_time = now_unix_nanos();
397 let query_clean = self
398 .maybe_redact(&attrs.query_preview)
399 .chars()
400 .take(100)
401 .collect::<String>();
402 self.push_span(SpanData {
403 trace_id: self.trace_id,
404 span_id: guard.span_id,
405 parent_span_id: Some(guard.parent_span_id),
406 name: guard.name,
407 start_time_unix_nanos: guard.start_time_unix_nanos,
408 end_time_unix_nanos: end_time,
409 attributes: vec![
410 ("zeph.memory.query_preview".to_owned(), query_clean),
411 (
412 "zeph.memory.result_count".to_owned(),
413 attrs.result_count.to_string(),
414 ),
415 (
416 "zeph.memory.latency_ms".to_owned(),
417 attrs.latency_ms.to_string(),
418 ),
419 ],
420 status: SpanStatus::Ok,
421 });
422 }
423
424 #[must_use]
428 pub fn trace_json_path(&self) -> PathBuf {
429 self.output_dir.join("trace.json")
430 }
431
432 #[must_use]
434 pub fn current_iteration_span_id(&self, index: usize) -> Option<[u8; 8]> {
435 self.active_iterations.get(&index).map(|e| e.guard.span_id)
436 }
437
438 #[must_use]
440 pub fn session_span_id(&self) -> [u8; 8] {
441 self.session_span_id
442 }
443
444 #[must_use]
446 pub fn trace_id(&self) -> [u8; 16] {
447 self.trace_id
448 }
449
450 pub fn finish(&mut self) {
457 if self.flushed {
458 return;
459 }
460 self.flushed = true;
461
462 let open_keys: Vec<usize> = self.active_iterations.keys().copied().collect();
464 let end_time = now_unix_nanos();
465 for index in open_keys {
466 if let Some(entry) = self.active_iterations.remove(&index) {
467 self.push_span(SpanData {
468 trace_id: self.trace_id,
469 span_id: entry.guard.span_id,
470 parent_span_id: Some(entry.guard.parent_span_id),
471 name: entry.guard.name,
472 start_time_unix_nanos: entry.guard.start_time_unix_nanos,
473 end_time_unix_nanos: end_time,
474 attributes: vec![(
475 "zeph.iteration.user_message_preview".to_owned(),
476 entry.user_msg_preview,
477 )],
478 status: SpanStatus::Unset,
479 });
480 }
481 }
482
483 let session_span = SpanData {
484 trace_id: self.trace_id,
485 span_id: self.session_span_id,
486 parent_span_id: None,
487 name: "session".to_owned(),
488 start_time_unix_nanos: self.session_start,
489 end_time_unix_nanos: end_time,
490 attributes: vec![
491 ("service.name".to_owned(), self.service_name.clone()),
492 ("zeph.session.trace_id".to_owned(), hex16(&self.trace_id)),
493 ],
494 status: SpanStatus::Ok,
495 };
496
497 let mut all_spans = vec![session_span];
498 all_spans.append(&mut self.completed_spans);
499
500 let json = serialize_otlp_json(&all_spans, &self.service_name);
501 let path = self.output_dir.join("trace.json");
502 if let Err(e) = write_trace_file(&path, json.as_bytes()) {
503 tracing::warn!(path = %path.display(), error = %e, "trace.json write failed");
504 } else {
505 tracing::info!(path = %path.display(), "OTel trace written");
506 }
507
508 if let Some(ref tx) = self.trace_tx {
510 let event = TraceEvent {
511 trace_id: self.trace_id,
512 spans: all_spans,
513 };
514 if tx.send(event).is_err() {
515 tracing::debug!("OTLP trace channel closed, skipping export");
516 }
517 }
518 }
519}
520
521impl Drop for TracingCollector {
523 fn drop(&mut self) {
524 self.finish();
525 }
526}
527
528fn span_status_code(status: &SpanStatus) -> u8 {
531 match status {
532 SpanStatus::Unset => 0,
533 SpanStatus::Ok => 1,
534 SpanStatus::Error { .. } => 2,
535 }
536}
537
538#[must_use]
542pub fn serialize_otlp_json(spans: &[SpanData], service_name: &str) -> String {
543 let otlp_spans: Vec<serde_json::Value> = spans
544 .iter()
545 .map(|s| {
546 let attrs: Vec<serde_json::Value> = s
547 .attributes
548 .iter()
549 .map(|(k, v)| {
550 serde_json::json!({
551 "key": k,
552 "value": { "stringValue": v }
553 })
554 })
555 .collect();
556
557 let mut obj = serde_json::json!({
558 "traceId": hex16(&s.trace_id),
559 "spanId": hex8(s.span_id),
560 "name": s.name,
561 "startTimeUnixNano": s.start_time_unix_nanos.to_string(),
563 "endTimeUnixNano": s.end_time_unix_nanos.to_string(),
564 "attributes": attrs,
565 "status": {
566 "code": span_status_code(&s.status)
567 }
568 });
569
570 if let Some(parent) = s.parent_span_id {
571 obj["parentSpanId"] = serde_json::json!(hex8(parent));
572 }
573
574 if let SpanStatus::Error { message } = &s.status {
575 obj["status"]["message"] = serde_json::json!(message);
576 }
577
578 obj
579 })
580 .collect();
581
582 let payload = serde_json::json!({
583 "resourceSpans": [{
584 "resource": {
585 "attributes": [{
586 "key": "service.name",
587 "value": { "stringValue": service_name }
588 }]
589 },
590 "scopeSpans": [{
591 "scope": {
592 "name": "zeph",
593 "version": env!("CARGO_PKG_VERSION")
594 },
595 "spans": otlp_spans
596 }]
597 }]
598 });
599
600 serde_json::to_string_pretty(&payload)
601 .unwrap_or_else(|e| format!("{{\"error\": \"serialization failed: {e}\"}}"))
602}
603
604fn write_trace_file(path: &Path, data: &[u8]) -> std::io::Result<()> {
609 #[cfg(unix)]
610 {
611 use std::io::Write as _;
612 use std::os::unix::fs::OpenOptionsExt as _;
613 let mut f = std::fs::OpenOptions::new()
614 .write(true)
615 .create(true)
616 .truncate(true)
617 .mode(0o600)
618 .open(path)?;
619 f.write_all(data)
620 }
621 #[cfg(not(unix))]
622 {
623 std::fs::write(path, data)
624 }
625}
626
627fn sanitize_name(name: &str) -> String {
628 name.chars()
629 .map(|c| {
630 if c.is_alphanumeric() || c == '-' || c == '_' || c == '.' {
631 c
632 } else {
633 '_'
634 }
635 })
636 .collect()
637}
638
639#[cfg(test)]
642mod tests {
643 use super::*;
644 use tempfile::tempdir;
645
646 fn make_collector(dir: &Path) -> TracingCollector {
647 TracingCollector::new(dir, "zeph-test", false, None).unwrap()
648 }
649
650 #[test]
651 fn span_id_generation_is_unique() {
652 let ids: Vec<[u8; 8]> = (0..100).map(|_| new_span_id()).collect();
653 let unique: std::collections::HashSet<[u8; 8]> = ids.into_iter().collect();
654 assert_eq!(unique.len(), 100);
655 }
656
657 #[test]
658 fn hex_lengths_correct() {
659 assert_eq!(hex16(&new_trace_id()).len(), 32);
660 assert_eq!(hex8(new_span_id()).len(), 16);
661 }
662
663 #[test]
664 fn collector_creates_output_dir() {
665 let tmp = tempdir().unwrap();
666 let sub = tmp.path().join("traces");
667 make_collector(&sub);
668 assert!(sub.exists());
669 }
670
671 #[test]
672 fn finish_writes_trace_json() {
673 let tmp = tempdir().unwrap();
674 let mut c = make_collector(tmp.path());
675 c.begin_iteration(0, "hello world");
676 c.end_iteration(0, SpanStatus::Ok);
677 c.finish();
678
679 let path = tmp.path().join("trace.json");
680 assert!(path.exists(), "trace.json must be written");
681
682 let v: serde_json::Value =
683 serde_json::from_str(&std::fs::read_to_string(&path).unwrap()).unwrap();
684 assert!(v["resourceSpans"].is_array());
685 let spans = v["resourceSpans"][0]["scopeSpans"][0]["spans"]
687 .as_array()
688 .unwrap();
689 assert_eq!(spans.len(), 2);
690 }
691
692 #[test]
693 fn span_hierarchy_parent_child_correct() {
694 let tmp = tempdir().unwrap();
695 let mut c = make_collector(tmp.path());
696 c.begin_iteration(0, "test");
697 let iter_id = c.current_iteration_span_id(0).unwrap();
698 let guard = c.begin_llm_request(iter_id);
699 c.end_llm_request(
700 guard,
701 &LlmAttributes {
702 model: "test-model".to_owned(),
703 prompt_tokens: 100,
704 completion_tokens: 50,
705 latency_ms: 200,
706 streaming: false,
707 cache_hit: false,
708 },
709 );
710 c.end_iteration(0, SpanStatus::Ok);
711 c.finish();
712
713 let content = std::fs::read_to_string(tmp.path().join("trace.json")).unwrap();
714 let v: serde_json::Value = serde_json::from_str(&content).unwrap();
715 let spans = v["resourceSpans"][0]["scopeSpans"][0]["spans"]
716 .as_array()
717 .unwrap();
718 assert_eq!(spans.len(), 3, "session + iteration + llm = 3 spans");
719
720 let llm_span = spans
721 .iter()
722 .find(|s| s["name"] == "llm.request")
723 .expect("llm.request span missing");
724 let iter_span = spans
725 .iter()
726 .find(|s| s["name"] == "iteration.0")
727 .expect("iteration.0 span missing");
728
729 assert_eq!(
730 llm_span["parentSpanId"], iter_span["spanId"],
731 "llm span parent must be iteration span"
732 );
733 }
734
735 #[test]
736 fn redaction_applied_to_text_attributes() {
737 let tmp = tempdir().unwrap();
738 let mut c = TracingCollector::new(tmp.path(), "test", true, None).unwrap();
739 let iter_id = c.session_span_id();
740 let guard = c.begin_memory_search(iter_id);
741 c.end_memory_search(
742 guard,
743 &MemorySearchAttributes {
744 query_preview: "search sk-secretkey123 here".to_owned(),
745 result_count: 3,
746 latency_ms: 10,
747 },
748 );
749 c.finish();
750
751 let content = std::fs::read_to_string(tmp.path().join("trace.json")).unwrap();
752 assert!(
753 !content.contains("sk-secretkey123"),
754 "raw secret must be redacted from trace"
755 );
756 }
757
758 #[test]
759 fn otlp_json_format_spec_compliant() {
760 let trace_id = [0xAB_u8; 16];
761 let span_id = [0xCD_u8; 8];
762 let spans = vec![SpanData {
763 trace_id,
764 span_id,
765 parent_span_id: None,
766 name: "session".to_owned(),
767 start_time_unix_nanos: 1_000_000,
768 end_time_unix_nanos: 2_000_000,
769 attributes: vec![("service.name".to_owned(), "zeph".to_owned())],
770 status: SpanStatus::Ok,
771 }];
772
773 let json = serialize_otlp_json(&spans, "zeph");
774 let v: serde_json::Value = serde_json::from_str(&json).unwrap();
775
776 let span = &v["resourceSpans"][0]["scopeSpans"][0]["spans"][0];
777 assert_eq!(
778 span["traceId"],
779 "abababababababababababababababababab"[..32]
780 );
781 assert_eq!(span["spanId"], "cdcdcdcdcdcdcdcd");
782 assert_eq!(span["name"], "session");
783 assert!(span["startTimeUnixNano"].is_string());
785 assert_eq!(span["status"]["code"], 1_u64);
786 }
787
788 #[test]
789 fn drop_flushes_trace() {
790 let tmp = tempdir().unwrap();
791 {
792 let mut c = make_collector(tmp.path());
793 c.begin_iteration(0, "hello");
794 }
796 assert!(
797 tmp.path().join("trace.json").exists(),
798 "Drop must flush trace.json"
799 );
800 }
801
802 #[test]
803 fn finish_is_idempotent() {
804 let tmp = tempdir().unwrap();
805 let mut c = make_collector(tmp.path());
806 c.finish();
807 c.finish();
808 assert!(tmp.path().join("trace.json").exists());
809 }
810
811 #[test]
812 fn concurrent_iterations_tracked_independently() {
813 let tmp = tempdir().unwrap();
814 let mut c = make_collector(tmp.path());
815 c.begin_iteration(0, "first");
816 c.begin_iteration(1, "second");
817 let id0 = c.current_iteration_span_id(0).unwrap();
818 let id1 = c.current_iteration_span_id(1).unwrap();
819 assert_ne!(
820 id0, id1,
821 "concurrent iterations must have distinct span IDs"
822 );
823 c.end_iteration(0, SpanStatus::Ok);
824 c.end_iteration(1, SpanStatus::Ok);
825 c.finish();
826
827 let v: serde_json::Value =
828 serde_json::from_str(&std::fs::read_to_string(tmp.path().join("trace.json")).unwrap())
829 .unwrap();
830 let spans = v["resourceSpans"][0]["scopeSpans"][0]["spans"]
831 .as_array()
832 .unwrap();
833 assert_eq!(spans.len(), 3);
835 }
836
837 #[test]
838 fn trace_format_skips_legacy_numbered_files() {
839 use crate::debug_dump::{DebugDumper, DumpFormat, RequestDebugDump};
840
841 let tmp = tempdir().unwrap();
842 let d = DebugDumper::new(tmp.path(), DumpFormat::Trace).unwrap();
843 let session_dir = d.dir().to_owned();
844 let id = d.dump_request(&RequestDebugDump {
845 model_name: "test",
846 messages: &[],
847 tools: &[],
848 provider_request: serde_json::json!({}),
849 });
850 d.dump_response(id, "resp");
851 d.dump_tool_output("shell", "output");
852
853 let files: Vec<_> = std::fs::read_dir(&session_dir)
855 .unwrap()
856 .filter_map(|e| e.ok())
857 .filter(|e| {
858 e.file_name()
859 .to_string_lossy()
860 .chars()
861 .next()
862 .map_or(false, |c| c.is_ascii_digit())
863 })
864 .collect();
865 assert!(
866 files.is_empty(),
867 "no legacy numbered files in Trace format session dir"
868 );
869
870 assert!(session_dir.is_dir(), "session subdir must exist");
873 }
874
875 #[test]
876 fn tool_call_span_emitted() {
877 let tmp = tempdir().unwrap();
878 let mut c = make_collector(tmp.path());
879 c.begin_iteration(0, "test");
880 let iter_id = c.current_iteration_span_id(0).unwrap();
881 let guard = c.begin_tool_call("shell", iter_id);
882 c.end_tool_call(
883 guard,
884 "shell",
885 ToolAttributes {
886 latency_ms: 50,
887 is_error: false,
888 error_kind: None,
889 },
890 );
891 c.end_iteration(0, SpanStatus::Ok);
892 c.finish();
893
894 let content = std::fs::read_to_string(c.trace_json_path()).unwrap();
895 let v: serde_json::Value = serde_json::from_str(&content).unwrap();
896 let spans = v["resourceSpans"][0]["scopeSpans"][0]["spans"]
897 .as_array()
898 .unwrap();
899 assert!(
900 spans.iter().any(|s| s["name"] == "tool.shell"),
901 "tool.shell span must be emitted"
902 );
903 }
904
905 #[test]
906 fn tool_call_error_span_emitted() {
907 let tmp = tempdir().unwrap();
908 let mut c = make_collector(tmp.path());
909 c.begin_iteration(0, "test");
910 let iter_id = c.current_iteration_span_id(0).unwrap();
911 let guard = c.begin_tool_call("shell", iter_id);
912 c.end_tool_call(
913 guard,
914 "shell",
915 ToolAttributes {
916 latency_ms: 10,
917 is_error: true,
918 error_kind: Some("permission denied".to_owned()),
919 },
920 );
921 c.end_iteration(0, SpanStatus::Ok);
922 c.finish();
923
924 let content = std::fs::read_to_string(c.trace_json_path()).unwrap();
925 let v: serde_json::Value = serde_json::from_str(&content).unwrap();
926 let spans = v["resourceSpans"][0]["scopeSpans"][0]["spans"]
927 .as_array()
928 .unwrap();
929 let tool_span = spans
930 .iter()
931 .find(|s| s["name"] == "tool.shell")
932 .expect("tool.shell span missing");
933 assert_eq!(
934 tool_span["status"]["code"], 2_u64,
935 "error span must have status code 2"
936 );
937 }
938
939 #[test]
940 fn begin_tool_call_at_timestamps_precede_end_time() {
941 let tmp = tempdir().unwrap();
942 let mut c = make_collector(tmp.path());
943 c.begin_iteration(0, "test");
944 let iter_id = c.current_iteration_span_id(0).unwrap();
945
946 let started_at = std::time::Instant::now();
948 std::thread::sleep(std::time::Duration::from_millis(2));
949 let guard = c.begin_tool_call_at("shell", iter_id, &started_at);
950 let span_start = guard.start_time_unix_nanos;
951 c.end_tool_call(
952 guard,
953 "shell",
954 ToolAttributes {
955 latency_ms: 2,
956 is_error: false,
957 error_kind: None,
958 },
959 );
960 c.end_iteration(0, SpanStatus::Ok);
961 c.finish();
962
963 let content = std::fs::read_to_string(c.trace_json_path()).unwrap();
964 let v: serde_json::Value = serde_json::from_str(&content).unwrap();
965 let spans = v["resourceSpans"][0]["scopeSpans"][0]["spans"]
966 .as_array()
967 .unwrap();
968 let tool_span = spans
969 .iter()
970 .find(|s| s["name"] == "tool.shell")
971 .expect("tool.shell span missing");
972 let recorded_start: u64 = tool_span["startTimeUnixNano"]
973 .as_str()
974 .unwrap()
975 .parse()
976 .unwrap();
977 let recorded_end: u64 = tool_span["endTimeUnixNano"]
978 .as_str()
979 .unwrap()
980 .parse()
981 .unwrap();
982 assert!(
984 recorded_start < recorded_end,
985 "start ({recorded_start}) must precede end ({recorded_end})"
986 );
987 assert_eq!(
989 span_start, recorded_start,
990 "guard start must match serialized start"
991 );
992 }
993
994 #[test]
995 fn session_to_iteration_parent_span_id() {
996 let tmp = tempdir().unwrap();
997 let mut c = make_collector(tmp.path());
998 let session_id = c.session_span_id();
999 c.begin_iteration(0, "test");
1000 c.end_iteration(0, SpanStatus::Ok);
1001 c.finish();
1002
1003 let content = std::fs::read_to_string(c.trace_json_path()).unwrap();
1004 let v: serde_json::Value = serde_json::from_str(&content).unwrap();
1005 let spans = v["resourceSpans"][0]["scopeSpans"][0]["spans"]
1006 .as_array()
1007 .unwrap();
1008 let iter_span = spans
1009 .iter()
1010 .find(|s| s["name"] == "iteration.0")
1011 .expect("iteration.0 span missing");
1012 assert_eq!(
1013 iter_span["parentSpanId"],
1014 serde_json::json!(hex8(session_id)),
1015 "iteration span parent must be session span"
1016 );
1017 }
1018
1019 #[test]
1020 fn json_and_raw_formats_still_write_files() {
1021 use crate::debug_dump::{DebugDumper, DumpFormat, RequestDebugDump};
1022
1023 let tmp = tempdir().unwrap();
1024 for fmt in [DumpFormat::Json, DumpFormat::Raw] {
1025 let d = DebugDumper::new(tmp.path(), fmt).unwrap();
1026 let id = d.dump_request(&RequestDebugDump {
1027 model_name: "test-model",
1028 messages: &[],
1029 tools: &[],
1030 provider_request: serde_json::json!({"model": "test-model", "max_tokens": 100}),
1031 });
1032 d.dump_response(id, "hello");
1033 let session_dir = std::fs::read_dir(tmp.path())
1034 .unwrap()
1035 .filter_map(|e| e.ok())
1036 .filter(|e| e.file_type().map(|t| t.is_dir()).unwrap_or(false))
1037 .next()
1038 .unwrap()
1039 .path();
1040 assert!(
1041 session_dir.join("0000-request.json").exists(),
1042 "request file must exist for format {fmt:?}"
1043 );
1044 }
1045 }
1046}