1use std::collections::{HashMap, VecDeque};
19use std::path::{Path, PathBuf};
20use std::sync::atomic::{AtomicU64, Ordering};
21use std::time::{SystemTime, UNIX_EPOCH};
22
23use rand::RngExt as _;
24use serde::{Deserialize, Serialize};
25
26use crate::redact::scrub_content;
27
28static SPAN_COUNTER: AtomicU64 = AtomicU64::new(0);
31
32#[must_use]
33fn new_trace_id() -> [u8; 16] {
34 rand::rng().random()
35}
36
37#[must_use]
38fn new_span_id() -> [u8; 8] {
39 let mut id: [u8; 8] = rand::rng().random();
40 id[0] ^= (SPAN_COUNTER.fetch_add(1, Ordering::Relaxed) & 0xFF) as u8;
42 id
43}
44
45#[must_use]
46fn hex16(b: &[u8; 16]) -> String {
47 use std::fmt::Write as _;
48 let mut s = String::with_capacity(32);
49 for x in b {
50 let _ = write!(s, "{x:02x}");
51 }
52 s
53}
54
55#[must_use]
56fn hex8(b: [u8; 8]) -> String {
57 use std::fmt::Write as _;
58 let mut s = String::with_capacity(16);
59 for x in b {
60 let _ = write!(s, "{x:02x}");
61 }
62 s
63}
64
65#[must_use]
66fn now_unix_nanos() -> u64 {
67 SystemTime::now()
68 .duration_since(UNIX_EPOCH)
69 .map_or(0, |d| u64::try_from(d.as_nanos()).unwrap_or(u64::MAX))
70}
71
72#[non_exhaustive]
75#[derive(Debug, Clone, Serialize, Deserialize)]
77pub enum SpanStatus {
78 Ok,
79 Error { message: String },
80 Unset,
81}
82
83#[derive(Debug, Clone)]
85pub struct SpanData {
86 pub trace_id: [u8; 16],
87 pub span_id: [u8; 8],
88 pub parent_span_id: Option<[u8; 8]>,
89 pub name: String,
90 pub start_time_unix_nanos: u64,
91 pub end_time_unix_nanos: u64,
92 pub attributes: Vec<(String, String)>,
93 pub status: SpanStatus,
94}
95
96pub struct SpanGuard {
100 pub span_id: [u8; 8],
101 pub parent_span_id: [u8; 8],
102 pub name: String,
103 pub start_time_unix_nanos: u64,
104}
105
106pub struct LlmAttributes {
108 pub model: String,
109 pub prompt_tokens: u64,
110 pub completion_tokens: u64,
111 pub latency_ms: u64,
112 pub streaming: bool,
113 pub cache_hit: bool,
114}
115
116pub struct ToolAttributes {
118 pub latency_ms: u64,
119 pub is_error: bool,
120 pub error_kind: Option<String>,
121}
122
123pub struct MemorySearchAttributes {
125 pub query_preview: String,
126 pub result_count: usize,
127 pub latency_ms: u64,
128}
129
130#[derive(Debug)]
137pub struct TraceEvent {
138 pub trace_id: [u8; 16],
139 pub spans: Vec<SpanData>,
140}
141
142struct IterationEntry {
146 guard: SpanGuard,
147 user_msg_preview: String,
148}
149
150const DEFAULT_MAX_SPANS: usize = 10_000;
158
159pub struct TracingCollector {
160 trace_id: [u8; 16],
161 session_span_id: [u8; 8],
162 session_start: u64,
163 service_name: String,
164 trace_metadata: HashMap<String, String>,
167 output_dir: PathBuf,
168 active_iterations: HashMap<usize, IterationEntry>,
170 completed_spans: VecDeque<SpanData>,
171 max_spans: usize,
173 redact: bool,
175 flushed: bool,
177 trace_tx: Option<tokio::sync::mpsc::UnboundedSender<TraceEvent>>,
180}
181
182impl TracingCollector {
183 pub fn new(
192 output_dir: &Path,
193 service_name: impl Into<String>,
194 trace_metadata: HashMap<String, String>,
195 redact: bool,
196 trace_tx: Option<tokio::sync::mpsc::UnboundedSender<TraceEvent>>,
197 ) -> std::io::Result<Self> {
198 std::fs::create_dir_all(output_dir)?;
199 Ok(Self {
200 trace_id: new_trace_id(),
201 session_span_id: new_span_id(),
202 session_start: now_unix_nanos(),
203 service_name: service_name.into(),
204 trace_metadata,
205 output_dir: output_dir.to_owned(),
206 active_iterations: HashMap::new(),
207 completed_spans: VecDeque::new(),
208 max_spans: DEFAULT_MAX_SPANS,
209 redact,
210 flushed: false,
211 trace_tx,
212 })
213 }
214
215 fn maybe_redact<'a>(&self, text: &'a str) -> std::borrow::Cow<'a, str> {
216 if self.redact {
217 scrub_content(text)
218 } else {
219 std::borrow::Cow::Borrowed(text)
220 }
221 }
222
223 fn push_span(&mut self, span: SpanData) {
225 if self.completed_spans.len() >= self.max_spans {
226 tracing::warn!(
227 max_spans = self.max_spans,
228 "trace span cap reached, dropping oldest span"
229 );
230 self.completed_spans.pop_front();
231 }
232 self.completed_spans.push_back(span);
233 }
234
235 pub fn begin_iteration(&mut self, index: usize, user_msg_preview: &str) {
239 let preview = self
240 .maybe_redact(user_msg_preview)
241 .chars()
242 .take(100)
243 .collect::<String>();
244 let entry = IterationEntry {
245 guard: SpanGuard {
246 span_id: new_span_id(),
247 parent_span_id: self.session_span_id,
248 name: format!("iteration.{index}"),
249 start_time_unix_nanos: now_unix_nanos(),
250 },
251 user_msg_preview: preview,
252 };
253 self.active_iterations.insert(index, entry);
254 }
255
256 pub fn end_iteration(&mut self, index: usize, status: SpanStatus) {
258 let end_time = now_unix_nanos();
259 if let Some(entry) = self.active_iterations.remove(&index) {
260 let span = SpanData {
261 trace_id: self.trace_id,
262 span_id: entry.guard.span_id,
263 parent_span_id: Some(entry.guard.parent_span_id),
264 name: entry.guard.name,
265 start_time_unix_nanos: entry.guard.start_time_unix_nanos,
266 end_time_unix_nanos: end_time,
267 attributes: vec![(
268 "zeph.iteration.user_message_preview".to_owned(),
269 entry.user_msg_preview,
270 )],
271 status,
272 };
273 self.push_span(span);
274 } else {
275 tracing::warn!(index, "end_iteration without matching begin_iteration");
276 }
277 }
278
279 #[must_use]
283 pub fn begin_llm_request(&self, iteration_span_id: [u8; 8]) -> SpanGuard {
284 SpanGuard {
285 span_id: new_span_id(),
286 parent_span_id: iteration_span_id,
287 name: "llm.request".to_owned(),
288 start_time_unix_nanos: now_unix_nanos(),
289 }
290 }
291
292 pub fn end_llm_request(&mut self, guard: SpanGuard, attrs: &LlmAttributes) {
294 let end_time = now_unix_nanos();
295 let model_clean = self.maybe_redact(&attrs.model).into_owned();
296 self.push_span(SpanData {
297 trace_id: self.trace_id,
298 span_id: guard.span_id,
299 parent_span_id: Some(guard.parent_span_id),
300 name: guard.name,
301 start_time_unix_nanos: guard.start_time_unix_nanos,
302 end_time_unix_nanos: end_time,
303 attributes: vec![
304 ("zeph.llm.model".to_owned(), model_clean),
305 (
306 "zeph.llm.prompt_tokens".to_owned(),
307 attrs.prompt_tokens.to_string(),
308 ),
309 (
310 "zeph.llm.completion_tokens".to_owned(),
311 attrs.completion_tokens.to_string(),
312 ),
313 (
314 "zeph.llm.latency_ms".to_owned(),
315 attrs.latency_ms.to_string(),
316 ),
317 ("zeph.llm.streaming".to_owned(), attrs.streaming.to_string()),
318 ("zeph.llm.cache_hit".to_owned(), attrs.cache_hit.to_string()),
319 ],
320 status: SpanStatus::Ok,
321 });
322 }
323
324 #[must_use]
328 pub fn begin_tool_call(&self, tool_name: &str, iteration_span_id: [u8; 8]) -> SpanGuard {
329 self.begin_tool_call_at(tool_name, iteration_span_id, &std::time::Instant::now())
330 }
331
332 #[must_use]
338 pub fn begin_tool_call_at(
339 &self,
340 tool_name: &str,
341 iteration_span_id: [u8; 8],
342 started_at: &std::time::Instant,
343 ) -> SpanGuard {
344 let elapsed_nanos = u64::try_from(started_at.elapsed().as_nanos()).unwrap_or(u64::MAX);
345 let start_time_unix_nanos = now_unix_nanos().saturating_sub(elapsed_nanos);
346 SpanGuard {
347 span_id: new_span_id(),
348 parent_span_id: iteration_span_id,
349 name: format!("tool.{}", sanitize_name(tool_name)),
350 start_time_unix_nanos,
351 }
352 }
353
354 pub fn end_tool_call(&mut self, guard: SpanGuard, tool_name: &str, attrs: ToolAttributes) {
356 let end_time = now_unix_nanos();
357 let tool_clean = sanitize_name(tool_name);
358 let mut attributes = vec![
359 ("zeph.tool.name".to_owned(), tool_clean),
360 (
361 "zeph.tool.latency_ms".to_owned(),
362 attrs.latency_ms.to_string(),
363 ),
364 ("zeph.tool.is_error".to_owned(), attrs.is_error.to_string()),
365 ];
366 if let Some(kind) = attrs.error_kind {
367 let kind_clean = self.maybe_redact(&kind).into_owned();
369 attributes.push(("zeph.tool.error_kind".to_owned(), kind_clean));
370 }
371 let status = if attrs.is_error {
372 SpanStatus::Error {
373 message: "tool call failed".to_owned(),
374 }
375 } else {
376 SpanStatus::Ok
377 };
378 self.push_span(SpanData {
379 trace_id: self.trace_id,
380 span_id: guard.span_id,
381 parent_span_id: Some(guard.parent_span_id),
382 name: guard.name,
383 start_time_unix_nanos: guard.start_time_unix_nanos,
384 end_time_unix_nanos: end_time,
385 attributes,
386 status,
387 });
388 }
389
390 #[must_use]
394 pub fn begin_memory_search(&self, parent_span_id: [u8; 8]) -> SpanGuard {
395 SpanGuard {
396 span_id: new_span_id(),
397 parent_span_id,
398 name: "memory.search".to_owned(),
399 start_time_unix_nanos: now_unix_nanos(),
400 }
401 }
402
403 pub fn end_memory_search(&mut self, guard: SpanGuard, attrs: &MemorySearchAttributes) {
405 let end_time = now_unix_nanos();
406 let query_clean = self
407 .maybe_redact(&attrs.query_preview)
408 .chars()
409 .take(100)
410 .collect::<String>();
411 self.push_span(SpanData {
412 trace_id: self.trace_id,
413 span_id: guard.span_id,
414 parent_span_id: Some(guard.parent_span_id),
415 name: guard.name,
416 start_time_unix_nanos: guard.start_time_unix_nanos,
417 end_time_unix_nanos: end_time,
418 attributes: vec![
419 ("zeph.memory.query_preview".to_owned(), query_clean),
420 (
421 "zeph.memory.result_count".to_owned(),
422 attrs.result_count.to_string(),
423 ),
424 (
425 "zeph.memory.latency_ms".to_owned(),
426 attrs.latency_ms.to_string(),
427 ),
428 ],
429 status: SpanStatus::Ok,
430 });
431 }
432
433 #[must_use]
437 pub fn trace_json_path(&self) -> PathBuf {
438 self.output_dir.join("trace.json")
439 }
440
441 #[must_use]
443 pub fn current_iteration_span_id(&self, index: usize) -> Option<[u8; 8]> {
444 self.active_iterations.get(&index).map(|e| e.guard.span_id)
445 }
446
447 #[must_use]
449 pub fn session_span_id(&self) -> [u8; 8] {
450 self.session_span_id
451 }
452
453 #[must_use]
455 pub fn trace_id(&self) -> [u8; 16] {
456 self.trace_id
457 }
458
459 pub fn finish(&mut self) {
466 if self.flushed {
467 return;
468 }
469 self.flushed = true;
470
471 let open_keys: Vec<usize> = self.active_iterations.keys().copied().collect();
473 let end_time = now_unix_nanos();
474 for index in open_keys {
475 if let Some(entry) = self.active_iterations.remove(&index) {
476 self.push_span(SpanData {
477 trace_id: self.trace_id,
478 span_id: entry.guard.span_id,
479 parent_span_id: Some(entry.guard.parent_span_id),
480 name: entry.guard.name,
481 start_time_unix_nanos: entry.guard.start_time_unix_nanos,
482 end_time_unix_nanos: end_time,
483 attributes: vec![(
484 "zeph.iteration.user_message_preview".to_owned(),
485 entry.user_msg_preview,
486 )],
487 status: SpanStatus::Unset,
488 });
489 }
490 }
491
492 let session_span = SpanData {
493 trace_id: self.trace_id,
494 span_id: self.session_span_id,
495 parent_span_id: None,
496 name: "session".to_owned(),
497 start_time_unix_nanos: self.session_start,
498 end_time_unix_nanos: end_time,
499 attributes: vec![
500 ("service.name".to_owned(), self.service_name.clone()),
501 ("zeph.session.trace_id".to_owned(), hex16(&self.trace_id)),
502 ],
503 status: SpanStatus::Ok,
504 };
505
506 let mut all_spans = vec![session_span];
507 all_spans.extend(self.completed_spans.drain(..));
508
509 let json = serialize_otlp_json(&all_spans, &self.service_name, &self.trace_metadata);
510 let path = self.output_dir.join("trace.json");
511 if let Err(e) = write_trace_file(&path, json.as_bytes()) {
512 tracing::warn!(path = %path.display(), error = %e, "trace.json write failed");
513 } else {
514 tracing::info!(path = %path.display(), "OTel trace written");
515 }
516
517 if let Some(ref tx) = self.trace_tx {
519 let event = TraceEvent {
520 trace_id: self.trace_id,
521 spans: all_spans,
522 };
523 if tx.send(event).is_err() {
524 tracing::debug!("OTLP trace channel closed, skipping export");
525 }
526 }
527 }
528}
529
530impl Drop for TracingCollector {
532 fn drop(&mut self) {
533 self.finish();
534 }
535}
536
537fn span_status_code(status: &SpanStatus) -> u8 {
540 match status {
541 SpanStatus::Unset => 0,
542 SpanStatus::Ok => 1,
543 SpanStatus::Error { .. } => 2,
544 }
545}
546
547#[must_use]
554pub fn serialize_otlp_json<S: std::hash::BuildHasher>(
555 spans: &[SpanData],
556 service_name: &str,
557 trace_metadata: &HashMap<String, String, S>,
558) -> String {
559 let otlp_spans: Vec<serde_json::Value> = spans
560 .iter()
561 .map(|s| {
562 let attrs: Vec<serde_json::Value> = s
563 .attributes
564 .iter()
565 .map(|(k, v)| {
566 serde_json::json!({
567 "key": k,
568 "value": { "stringValue": v }
569 })
570 })
571 .collect();
572
573 let mut obj = serde_json::json!({
574 "traceId": hex16(&s.trace_id),
575 "spanId": hex8(s.span_id),
576 "name": s.name,
577 "startTimeUnixNano": s.start_time_unix_nanos.to_string(),
579 "endTimeUnixNano": s.end_time_unix_nanos.to_string(),
580 "attributes": attrs,
581 "status": {
582 "code": span_status_code(&s.status)
583 }
584 });
585
586 if let Some(parent) = s.parent_span_id {
587 obj["parentSpanId"] = serde_json::json!(hex8(parent));
588 }
589
590 if let SpanStatus::Error { message } = &s.status {
591 obj["status"]["message"] = serde_json::json!(message);
592 }
593
594 obj
595 })
596 .collect();
597
598 let mut resource_attrs = vec![serde_json::json!({
599 "key": "service.name",
600 "value": { "stringValue": service_name }
601 })];
602 for (k, v) in trace_metadata {
603 if k == "service.name" {
604 continue;
605 }
606 resource_attrs.push(serde_json::json!({
607 "key": k,
608 "value": { "stringValue": v }
609 }));
610 }
611
612 let payload = serde_json::json!({
613 "resourceSpans": [{
614 "resource": {
615 "attributes": resource_attrs
616 },
617 "scopeSpans": [{
618 "scope": {
619 "name": "zeph",
620 "version": env!("CARGO_PKG_VERSION")
621 },
622 "spans": otlp_spans
623 }]
624 }]
625 });
626
627 serde_json::to_string_pretty(&payload)
628 .unwrap_or_else(|e| format!("{{\"error\": \"serialization failed: {e}\"}}"))
629}
630
631fn write_trace_file(path: &Path, data: &[u8]) -> std::io::Result<()> {
636 #[cfg(unix)]
637 {
638 use std::io::Write as _;
639 use std::os::unix::fs::OpenOptionsExt as _;
640 let mut f = std::fs::OpenOptions::new()
641 .write(true)
642 .create(true)
643 .truncate(true)
644 .mode(0o600)
645 .open(path)?;
646 f.write_all(data)
647 }
648 #[cfg(not(unix))]
649 {
650 std::fs::write(path, data)
651 }
652}
653
654fn sanitize_name(name: &str) -> String {
655 name.chars()
656 .map(|c| {
657 if c.is_alphanumeric() || c == '-' || c == '_' || c == '.' {
658 c
659 } else {
660 '_'
661 }
662 })
663 .collect()
664}
665
666#[cfg(test)]
669mod tests {
670 use super::*;
671 use tempfile::tempdir;
672
673 fn make_collector(dir: &Path) -> TracingCollector {
674 TracingCollector::new(dir, "zeph-test", HashMap::new(), false, None).unwrap()
675 }
676
677 #[test]
678 fn span_id_generation_is_unique() {
679 let ids: Vec<[u8; 8]> = (0..100).map(|_| new_span_id()).collect();
680 let unique: std::collections::HashSet<[u8; 8]> = ids.into_iter().collect();
681 assert_eq!(unique.len(), 100);
682 }
683
684 #[test]
685 fn hex_lengths_correct() {
686 assert_eq!(hex16(&new_trace_id()).len(), 32);
687 assert_eq!(hex8(new_span_id()).len(), 16);
688 }
689
690 #[test]
691 fn collector_creates_output_dir() {
692 let tmp = tempdir().unwrap();
693 let sub = tmp.path().join("traces");
694 make_collector(&sub);
695 assert!(sub.exists());
696 }
697
698 #[test]
699 fn finish_writes_trace_json() {
700 let tmp = tempdir().unwrap();
701 let mut c = make_collector(tmp.path());
702 c.begin_iteration(0, "hello world");
703 c.end_iteration(0, SpanStatus::Ok);
704 c.finish();
705
706 let path = tmp.path().join("trace.json");
707 assert!(path.exists(), "trace.json must be written");
708
709 let v: serde_json::Value =
710 serde_json::from_str(&std::fs::read_to_string(&path).unwrap()).unwrap();
711 assert!(v["resourceSpans"].is_array());
712 let spans = v["resourceSpans"][0]["scopeSpans"][0]["spans"]
714 .as_array()
715 .unwrap();
716 assert_eq!(spans.len(), 2);
717 }
718
719 #[test]
720 fn span_hierarchy_parent_child_correct() {
721 let tmp = tempdir().unwrap();
722 let mut c = make_collector(tmp.path());
723 c.begin_iteration(0, "test");
724 let iter_id = c.current_iteration_span_id(0).unwrap();
725 let guard = c.begin_llm_request(iter_id);
726 c.end_llm_request(
727 guard,
728 &LlmAttributes {
729 model: "test-model".to_owned(),
730 prompt_tokens: 100,
731 completion_tokens: 50,
732 latency_ms: 200,
733 streaming: false,
734 cache_hit: false,
735 },
736 );
737 c.end_iteration(0, SpanStatus::Ok);
738 c.finish();
739
740 let content = std::fs::read_to_string(tmp.path().join("trace.json")).unwrap();
741 let v: serde_json::Value = serde_json::from_str(&content).unwrap();
742 let spans = v["resourceSpans"][0]["scopeSpans"][0]["spans"]
743 .as_array()
744 .unwrap();
745 assert_eq!(spans.len(), 3, "session + iteration + llm = 3 spans");
746
747 let llm_span = spans
748 .iter()
749 .find(|s| s["name"] == "llm.request")
750 .expect("llm.request span missing");
751 let iter_span = spans
752 .iter()
753 .find(|s| s["name"] == "iteration.0")
754 .expect("iteration.0 span missing");
755
756 assert_eq!(
757 llm_span["parentSpanId"], iter_span["spanId"],
758 "llm span parent must be iteration span"
759 );
760 }
761
762 #[test]
763 fn redaction_applied_to_text_attributes() {
764 let tmp = tempdir().unwrap();
765 let mut c = TracingCollector::new(tmp.path(), "test", HashMap::new(), true, None).unwrap();
766 let iter_id = c.session_span_id();
767 let guard = c.begin_memory_search(iter_id);
768 c.end_memory_search(
769 guard,
770 &MemorySearchAttributes {
771 query_preview: "search sk-secretkey123 here".to_owned(),
772 result_count: 3,
773 latency_ms: 10,
774 },
775 );
776 c.finish();
777
778 let content = std::fs::read_to_string(tmp.path().join("trace.json")).unwrap();
779 assert!(
780 !content.contains("sk-secretkey123"),
781 "raw secret must be redacted from trace"
782 );
783 }
784
785 #[test]
786 fn otlp_json_format_spec_compliant() {
787 let trace_id = [0xAB_u8; 16];
788 let span_id = [0xCD_u8; 8];
789 let spans = vec![SpanData {
790 trace_id,
791 span_id,
792 parent_span_id: None,
793 name: "session".to_owned(),
794 start_time_unix_nanos: 1_000_000,
795 end_time_unix_nanos: 2_000_000,
796 attributes: vec![("service.name".to_owned(), "zeph".to_owned())],
797 status: SpanStatus::Ok,
798 }];
799
800 let json = serialize_otlp_json(&spans, "zeph", &HashMap::new());
801 let v: serde_json::Value = serde_json::from_str(&json).unwrap();
802
803 let span = &v["resourceSpans"][0]["scopeSpans"][0]["spans"][0];
804 assert_eq!(
805 span["traceId"],
806 "abababababababababababababababababab"[..32]
807 );
808 assert_eq!(span["spanId"], "cdcdcdcdcdcdcdcd");
809 assert_eq!(span["name"], "session");
810 assert!(span["startTimeUnixNano"].is_string());
812 assert_eq!(span["status"]["code"], 1_u64);
813 }
814
815 #[test]
816 fn drop_flushes_trace() {
817 let tmp = tempdir().unwrap();
818 {
819 let mut c = make_collector(tmp.path());
820 c.begin_iteration(0, "hello");
821 }
823 assert!(
824 tmp.path().join("trace.json").exists(),
825 "Drop must flush trace.json"
826 );
827 }
828
829 #[test]
830 fn trace_metadata_included_in_resource_attributes() {
831 let tmp = tempdir().unwrap();
832 let mut meta = HashMap::new();
833 meta.insert("deployment.environment".to_owned(), "staging".to_owned());
834 meta.insert("service.name".to_owned(), "should-be-skipped".to_owned());
835 let mut c = TracingCollector::new(tmp.path(), "zeph-test", meta, false, None).unwrap();
836 c.finish();
837
838 let content = std::fs::read_to_string(tmp.path().join("trace.json")).unwrap();
839 let v: serde_json::Value = serde_json::from_str(&content).unwrap();
840 let attrs = v["resourceSpans"][0]["resource"]["attributes"]
841 .as_array()
842 .unwrap();
843
844 let svc = attrs
846 .iter()
847 .find(|a| a["key"] == "service.name")
848 .expect("service.name must be in resource attributes");
849 assert_eq!(svc["value"]["stringValue"], "zeph-test");
850
851 let env_attr = attrs.iter().find(|a| a["key"] == "deployment.environment");
853 assert!(
854 env_attr.is_some(),
855 "deployment.environment must be in resource attributes"
856 );
857 assert_eq!(env_attr.unwrap()["value"]["stringValue"], "staging");
858 }
859
860 #[test]
861 fn finish_is_idempotent() {
862 let tmp = tempdir().unwrap();
863 let mut c = make_collector(tmp.path());
864 c.finish();
865 c.finish();
866 assert!(tmp.path().join("trace.json").exists());
867 }
868
869 #[test]
870 fn concurrent_iterations_tracked_independently() {
871 let tmp = tempdir().unwrap();
872 let mut c = make_collector(tmp.path());
873 c.begin_iteration(0, "first");
874 c.begin_iteration(1, "second");
875 let id0 = c.current_iteration_span_id(0).unwrap();
876 let id1 = c.current_iteration_span_id(1).unwrap();
877 assert_ne!(
878 id0, id1,
879 "concurrent iterations must have distinct span IDs"
880 );
881 c.end_iteration(0, SpanStatus::Ok);
882 c.end_iteration(1, SpanStatus::Ok);
883 c.finish();
884
885 let v: serde_json::Value =
886 serde_json::from_str(&std::fs::read_to_string(tmp.path().join("trace.json")).unwrap())
887 .unwrap();
888 let spans = v["resourceSpans"][0]["scopeSpans"][0]["spans"]
889 .as_array()
890 .unwrap();
891 assert_eq!(spans.len(), 3);
893 }
894
895 #[test]
896 fn trace_format_skips_legacy_numbered_files() {
897 use crate::debug_dump::{DebugDumper, DumpFormat, RequestDebugDump};
898
899 let tmp = tempdir().unwrap();
900 let d = DebugDumper::new(tmp.path(), DumpFormat::Trace).unwrap();
901 let session_dir = d.dir().to_owned();
902 let id = d.dump_request(&RequestDebugDump {
903 model_name: "test",
904 messages: &[],
905 tools: &[],
906 provider_request: serde_json::json!({}),
907 memcot_state: None,
908 });
909 d.dump_response(id, "resp");
910 d.dump_tool_output("shell", "output");
911
912 let files: Vec<_> = std::fs::read_dir(&session_dir)
914 .unwrap()
915 .filter_map(std::result::Result::ok)
916 .filter(|e| {
917 e.file_name()
918 .to_string_lossy()
919 .chars()
920 .next()
921 .is_some_and(|c| c.is_ascii_digit())
922 })
923 .collect();
924 assert!(
925 files.is_empty(),
926 "no legacy numbered files in Trace format session dir"
927 );
928
929 assert!(session_dir.is_dir(), "session subdir must exist");
932 }
933
934 #[test]
935 fn tool_call_span_emitted() {
936 let tmp = tempdir().unwrap();
937 let mut c = make_collector(tmp.path());
938 c.begin_iteration(0, "test");
939 let iter_id = c.current_iteration_span_id(0).unwrap();
940 let guard = c.begin_tool_call("shell", iter_id);
941 c.end_tool_call(
942 guard,
943 "shell",
944 ToolAttributes {
945 latency_ms: 50,
946 is_error: false,
947 error_kind: None,
948 },
949 );
950 c.end_iteration(0, SpanStatus::Ok);
951 c.finish();
952
953 let content = std::fs::read_to_string(c.trace_json_path()).unwrap();
954 let v: serde_json::Value = serde_json::from_str(&content).unwrap();
955 let spans = v["resourceSpans"][0]["scopeSpans"][0]["spans"]
956 .as_array()
957 .unwrap();
958 assert!(
959 spans.iter().any(|s| s["name"] == "tool.shell"),
960 "tool.shell span must be emitted"
961 );
962 }
963
964 #[test]
965 fn tool_call_error_span_emitted() {
966 let tmp = tempdir().unwrap();
967 let mut c = make_collector(tmp.path());
968 c.begin_iteration(0, "test");
969 let iter_id = c.current_iteration_span_id(0).unwrap();
970 let guard = c.begin_tool_call("shell", iter_id);
971 c.end_tool_call(
972 guard,
973 "shell",
974 ToolAttributes {
975 latency_ms: 10,
976 is_error: true,
977 error_kind: Some("permission denied".to_owned()),
978 },
979 );
980 c.end_iteration(0, SpanStatus::Ok);
981 c.finish();
982
983 let content = std::fs::read_to_string(c.trace_json_path()).unwrap();
984 let v: serde_json::Value = serde_json::from_str(&content).unwrap();
985 let spans = v["resourceSpans"][0]["scopeSpans"][0]["spans"]
986 .as_array()
987 .unwrap();
988 let tool_span = spans
989 .iter()
990 .find(|s| s["name"] == "tool.shell")
991 .expect("tool.shell span missing");
992 assert_eq!(
993 tool_span["status"]["code"], 2_u64,
994 "error span must have status code 2"
995 );
996 }
997
998 #[test]
999 fn begin_tool_call_at_timestamps_precede_end_time() {
1000 let tmp = tempdir().unwrap();
1001 let mut c = make_collector(tmp.path());
1002 c.begin_iteration(0, "test");
1003 let iter_id = c.current_iteration_span_id(0).unwrap();
1004
1005 let started_at = std::time::Instant::now();
1007 std::thread::sleep(std::time::Duration::from_millis(2));
1008 let guard = c.begin_tool_call_at("shell", iter_id, &started_at);
1009 let span_start = guard.start_time_unix_nanos;
1010 c.end_tool_call(
1011 guard,
1012 "shell",
1013 ToolAttributes {
1014 latency_ms: 2,
1015 is_error: false,
1016 error_kind: None,
1017 },
1018 );
1019 c.end_iteration(0, SpanStatus::Ok);
1020 c.finish();
1021
1022 let content = std::fs::read_to_string(c.trace_json_path()).unwrap();
1023 let v: serde_json::Value = serde_json::from_str(&content).unwrap();
1024 let spans = v["resourceSpans"][0]["scopeSpans"][0]["spans"]
1025 .as_array()
1026 .unwrap();
1027 let tool_span = spans
1028 .iter()
1029 .find(|s| s["name"] == "tool.shell")
1030 .expect("tool.shell span missing");
1031 let recorded_start: u64 = tool_span["startTimeUnixNano"]
1032 .as_str()
1033 .unwrap()
1034 .parse()
1035 .unwrap();
1036 let recorded_end: u64 = tool_span["endTimeUnixNano"]
1037 .as_str()
1038 .unwrap()
1039 .parse()
1040 .unwrap();
1041 assert!(
1043 recorded_start < recorded_end,
1044 "start ({recorded_start}) must precede end ({recorded_end})"
1045 );
1046 assert_eq!(
1048 span_start, recorded_start,
1049 "guard start must match serialized start"
1050 );
1051 }
1052
1053 #[test]
1054 fn session_to_iteration_parent_span_id() {
1055 let tmp = tempdir().unwrap();
1056 let mut c = make_collector(tmp.path());
1057 let session_id = c.session_span_id();
1058 c.begin_iteration(0, "test");
1059 c.end_iteration(0, SpanStatus::Ok);
1060 c.finish();
1061
1062 let content = std::fs::read_to_string(c.trace_json_path()).unwrap();
1063 let v: serde_json::Value = serde_json::from_str(&content).unwrap();
1064 let spans = v["resourceSpans"][0]["scopeSpans"][0]["spans"]
1065 .as_array()
1066 .unwrap();
1067 let iter_span = spans
1068 .iter()
1069 .find(|s| s["name"] == "iteration.0")
1070 .expect("iteration.0 span missing");
1071 assert_eq!(
1072 iter_span["parentSpanId"],
1073 serde_json::json!(hex8(session_id)),
1074 "iteration span parent must be session span"
1075 );
1076 }
1077
1078 #[test]
1079 fn json_and_raw_formats_still_write_files() {
1080 use crate::debug_dump::{DebugDumper, DumpFormat, RequestDebugDump};
1081
1082 let tmp = tempdir().unwrap();
1083 for fmt in [DumpFormat::Json, DumpFormat::Raw] {
1084 let d = DebugDumper::new(tmp.path(), fmt).unwrap();
1085 let id = d.dump_request(&RequestDebugDump {
1086 model_name: "test-model",
1087 messages: &[],
1088 tools: &[],
1089 provider_request: serde_json::json!({"model": "test-model", "max_tokens": 100}),
1090 memcot_state: None,
1091 });
1092 d.dump_response(id, "hello");
1093 let session_dir = std::fs::read_dir(tmp.path())
1094 .unwrap()
1095 .filter_map(std::result::Result::ok)
1096 .find(|e| e.file_type().is_ok_and(|t| t.is_dir()))
1097 .unwrap()
1098 .path();
1099 assert!(
1100 session_dir.join("0000-request.json").exists(),
1101 "request file must exist for format {fmt:?}"
1102 );
1103 }
1104 }
1105}