1use std::collections::{HashMap, VecDeque};
19use std::path::{Path, PathBuf};
20use std::sync::atomic::{AtomicU64, Ordering};
21use std::time::{SystemTime, UNIX_EPOCH};
22
23use rand::RngExt as _;
24use serde::{Deserialize, Serialize};
25
26use crate::redact::scrub_content;
27
28static SPAN_COUNTER: AtomicU64 = AtomicU64::new(0);
31
32#[must_use]
33fn new_trace_id() -> [u8; 16] {
34 rand::rng().random()
35}
36
37#[must_use]
38fn new_span_id() -> [u8; 8] {
39 let mut id: [u8; 8] = rand::rng().random();
40 id[0] ^= (SPAN_COUNTER.fetch_add(1, Ordering::Relaxed) & 0xFF) as u8;
42 id
43}
44
45#[must_use]
46fn hex16(b: &[u8; 16]) -> String {
47 use std::fmt::Write as _;
48 let mut s = String::with_capacity(32);
49 for x in b {
50 let _ = write!(s, "{x:02x}");
51 }
52 s
53}
54
55#[must_use]
56fn hex8(b: [u8; 8]) -> String {
57 use std::fmt::Write as _;
58 let mut s = String::with_capacity(16);
59 for x in b {
60 let _ = write!(s, "{x:02x}");
61 }
62 s
63}
64
65#[must_use]
66fn now_unix_nanos() -> u64 {
67 SystemTime::now()
68 .duration_since(UNIX_EPOCH)
69 .map_or(0, |d| u64::try_from(d.as_nanos()).unwrap_or(u64::MAX))
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
76pub enum SpanStatus {
77 Ok,
78 Error { message: String },
79 Unset,
80}
81
82#[derive(Debug, Clone)]
84pub struct SpanData {
85 pub trace_id: [u8; 16],
86 pub span_id: [u8; 8],
87 pub parent_span_id: Option<[u8; 8]>,
88 pub name: String,
89 pub start_time_unix_nanos: u64,
90 pub end_time_unix_nanos: u64,
91 pub attributes: Vec<(String, String)>,
92 pub status: SpanStatus,
93}
94
95pub struct SpanGuard {
99 pub span_id: [u8; 8],
100 pub parent_span_id: [u8; 8],
101 pub name: String,
102 pub start_time_unix_nanos: u64,
103}
104
105pub struct LlmAttributes {
107 pub model: String,
108 pub prompt_tokens: u64,
109 pub completion_tokens: u64,
110 pub latency_ms: u64,
111 pub streaming: bool,
112 pub cache_hit: bool,
113}
114
115pub struct ToolAttributes {
117 pub latency_ms: u64,
118 pub is_error: bool,
119 pub error_kind: Option<String>,
120}
121
122pub struct MemorySearchAttributes {
124 pub query_preview: String,
125 pub result_count: usize,
126 pub latency_ms: u64,
127}
128
129#[derive(Debug)]
136pub struct TraceEvent {
137 pub trace_id: [u8; 16],
138 pub spans: Vec<SpanData>,
139}
140
141struct IterationEntry {
145 guard: SpanGuard,
146 user_msg_preview: String,
147}
148
149const DEFAULT_MAX_SPANS: usize = 10_000;
157
158pub struct TracingCollector {
159 trace_id: [u8; 16],
160 session_span_id: [u8; 8],
161 session_start: u64,
162 service_name: String,
163 trace_metadata: HashMap<String, String>,
166 output_dir: PathBuf,
167 active_iterations: HashMap<usize, IterationEntry>,
169 completed_spans: VecDeque<SpanData>,
170 max_spans: usize,
172 redact: bool,
174 flushed: bool,
176 trace_tx: Option<tokio::sync::mpsc::UnboundedSender<TraceEvent>>,
179}
180
181impl TracingCollector {
182 pub fn new(
191 output_dir: &Path,
192 service_name: impl Into<String>,
193 trace_metadata: HashMap<String, String>,
194 redact: bool,
195 trace_tx: Option<tokio::sync::mpsc::UnboundedSender<TraceEvent>>,
196 ) -> std::io::Result<Self> {
197 std::fs::create_dir_all(output_dir)?;
198 Ok(Self {
199 trace_id: new_trace_id(),
200 session_span_id: new_span_id(),
201 session_start: now_unix_nanos(),
202 service_name: service_name.into(),
203 trace_metadata,
204 output_dir: output_dir.to_owned(),
205 active_iterations: HashMap::new(),
206 completed_spans: VecDeque::new(),
207 max_spans: DEFAULT_MAX_SPANS,
208 redact,
209 flushed: false,
210 trace_tx,
211 })
212 }
213
214 fn maybe_redact<'a>(&self, text: &'a str) -> std::borrow::Cow<'a, str> {
215 if self.redact {
216 scrub_content(text)
217 } else {
218 std::borrow::Cow::Borrowed(text)
219 }
220 }
221
222 fn push_span(&mut self, span: SpanData) {
224 if self.completed_spans.len() >= self.max_spans {
225 tracing::warn!(
226 max_spans = self.max_spans,
227 "trace span cap reached, dropping oldest span"
228 );
229 self.completed_spans.pop_front();
230 }
231 self.completed_spans.push_back(span);
232 }
233
234 pub fn begin_iteration(&mut self, index: usize, user_msg_preview: &str) {
238 let preview = self
239 .maybe_redact(user_msg_preview)
240 .chars()
241 .take(100)
242 .collect::<String>();
243 let entry = IterationEntry {
244 guard: SpanGuard {
245 span_id: new_span_id(),
246 parent_span_id: self.session_span_id,
247 name: format!("iteration.{index}"),
248 start_time_unix_nanos: now_unix_nanos(),
249 },
250 user_msg_preview: preview,
251 };
252 self.active_iterations.insert(index, entry);
253 }
254
255 pub fn end_iteration(&mut self, index: usize, status: SpanStatus) {
257 let end_time = now_unix_nanos();
258 if let Some(entry) = self.active_iterations.remove(&index) {
259 let span = SpanData {
260 trace_id: self.trace_id,
261 span_id: entry.guard.span_id,
262 parent_span_id: Some(entry.guard.parent_span_id),
263 name: entry.guard.name,
264 start_time_unix_nanos: entry.guard.start_time_unix_nanos,
265 end_time_unix_nanos: end_time,
266 attributes: vec![(
267 "zeph.iteration.user_message_preview".to_owned(),
268 entry.user_msg_preview,
269 )],
270 status,
271 };
272 self.push_span(span);
273 } else {
274 tracing::warn!(index, "end_iteration without matching begin_iteration");
275 }
276 }
277
278 #[must_use]
282 pub fn begin_llm_request(&self, iteration_span_id: [u8; 8]) -> SpanGuard {
283 SpanGuard {
284 span_id: new_span_id(),
285 parent_span_id: iteration_span_id,
286 name: "llm.request".to_owned(),
287 start_time_unix_nanos: now_unix_nanos(),
288 }
289 }
290
291 pub fn end_llm_request(&mut self, guard: SpanGuard, attrs: &LlmAttributes) {
293 let end_time = now_unix_nanos();
294 let model_clean = self.maybe_redact(&attrs.model).into_owned();
295 self.push_span(SpanData {
296 trace_id: self.trace_id,
297 span_id: guard.span_id,
298 parent_span_id: Some(guard.parent_span_id),
299 name: guard.name,
300 start_time_unix_nanos: guard.start_time_unix_nanos,
301 end_time_unix_nanos: end_time,
302 attributes: vec![
303 ("zeph.llm.model".to_owned(), model_clean),
304 (
305 "zeph.llm.prompt_tokens".to_owned(),
306 attrs.prompt_tokens.to_string(),
307 ),
308 (
309 "zeph.llm.completion_tokens".to_owned(),
310 attrs.completion_tokens.to_string(),
311 ),
312 (
313 "zeph.llm.latency_ms".to_owned(),
314 attrs.latency_ms.to_string(),
315 ),
316 ("zeph.llm.streaming".to_owned(), attrs.streaming.to_string()),
317 ("zeph.llm.cache_hit".to_owned(), attrs.cache_hit.to_string()),
318 ],
319 status: SpanStatus::Ok,
320 });
321 }
322
323 #[must_use]
327 pub fn begin_tool_call(&self, tool_name: &str, iteration_span_id: [u8; 8]) -> SpanGuard {
328 self.begin_tool_call_at(tool_name, iteration_span_id, &std::time::Instant::now())
329 }
330
331 #[must_use]
337 pub fn begin_tool_call_at(
338 &self,
339 tool_name: &str,
340 iteration_span_id: [u8; 8],
341 started_at: &std::time::Instant,
342 ) -> SpanGuard {
343 let elapsed_nanos = u64::try_from(started_at.elapsed().as_nanos()).unwrap_or(u64::MAX);
344 let start_time_unix_nanos = now_unix_nanos().saturating_sub(elapsed_nanos);
345 SpanGuard {
346 span_id: new_span_id(),
347 parent_span_id: iteration_span_id,
348 name: format!("tool.{}", sanitize_name(tool_name)),
349 start_time_unix_nanos,
350 }
351 }
352
353 pub fn end_tool_call(&mut self, guard: SpanGuard, tool_name: &str, attrs: ToolAttributes) {
355 let end_time = now_unix_nanos();
356 let tool_clean = sanitize_name(tool_name);
357 let mut attributes = vec![
358 ("zeph.tool.name".to_owned(), tool_clean),
359 (
360 "zeph.tool.latency_ms".to_owned(),
361 attrs.latency_ms.to_string(),
362 ),
363 ("zeph.tool.is_error".to_owned(), attrs.is_error.to_string()),
364 ];
365 if let Some(kind) = attrs.error_kind {
366 let kind_clean = self.maybe_redact(&kind).into_owned();
368 attributes.push(("zeph.tool.error_kind".to_owned(), kind_clean));
369 }
370 let status = if attrs.is_error {
371 SpanStatus::Error {
372 message: "tool call failed".to_owned(),
373 }
374 } else {
375 SpanStatus::Ok
376 };
377 self.push_span(SpanData {
378 trace_id: self.trace_id,
379 span_id: guard.span_id,
380 parent_span_id: Some(guard.parent_span_id),
381 name: guard.name,
382 start_time_unix_nanos: guard.start_time_unix_nanos,
383 end_time_unix_nanos: end_time,
384 attributes,
385 status,
386 });
387 }
388
389 #[must_use]
393 pub fn begin_memory_search(&self, parent_span_id: [u8; 8]) -> SpanGuard {
394 SpanGuard {
395 span_id: new_span_id(),
396 parent_span_id,
397 name: "memory.search".to_owned(),
398 start_time_unix_nanos: now_unix_nanos(),
399 }
400 }
401
402 pub fn end_memory_search(&mut self, guard: SpanGuard, attrs: &MemorySearchAttributes) {
404 let end_time = now_unix_nanos();
405 let query_clean = self
406 .maybe_redact(&attrs.query_preview)
407 .chars()
408 .take(100)
409 .collect::<String>();
410 self.push_span(SpanData {
411 trace_id: self.trace_id,
412 span_id: guard.span_id,
413 parent_span_id: Some(guard.parent_span_id),
414 name: guard.name,
415 start_time_unix_nanos: guard.start_time_unix_nanos,
416 end_time_unix_nanos: end_time,
417 attributes: vec![
418 ("zeph.memory.query_preview".to_owned(), query_clean),
419 (
420 "zeph.memory.result_count".to_owned(),
421 attrs.result_count.to_string(),
422 ),
423 (
424 "zeph.memory.latency_ms".to_owned(),
425 attrs.latency_ms.to_string(),
426 ),
427 ],
428 status: SpanStatus::Ok,
429 });
430 }
431
432 #[must_use]
436 pub fn trace_json_path(&self) -> PathBuf {
437 self.output_dir.join("trace.json")
438 }
439
440 #[must_use]
442 pub fn current_iteration_span_id(&self, index: usize) -> Option<[u8; 8]> {
443 self.active_iterations.get(&index).map(|e| e.guard.span_id)
444 }
445
446 #[must_use]
448 pub fn session_span_id(&self) -> [u8; 8] {
449 self.session_span_id
450 }
451
452 #[must_use]
454 pub fn trace_id(&self) -> [u8; 16] {
455 self.trace_id
456 }
457
458 pub fn finish(&mut self) {
465 if self.flushed {
466 return;
467 }
468 self.flushed = true;
469
470 let open_keys: Vec<usize> = self.active_iterations.keys().copied().collect();
472 let end_time = now_unix_nanos();
473 for index in open_keys {
474 if let Some(entry) = self.active_iterations.remove(&index) {
475 self.push_span(SpanData {
476 trace_id: self.trace_id,
477 span_id: entry.guard.span_id,
478 parent_span_id: Some(entry.guard.parent_span_id),
479 name: entry.guard.name,
480 start_time_unix_nanos: entry.guard.start_time_unix_nanos,
481 end_time_unix_nanos: end_time,
482 attributes: vec![(
483 "zeph.iteration.user_message_preview".to_owned(),
484 entry.user_msg_preview,
485 )],
486 status: SpanStatus::Unset,
487 });
488 }
489 }
490
491 let session_span = SpanData {
492 trace_id: self.trace_id,
493 span_id: self.session_span_id,
494 parent_span_id: None,
495 name: "session".to_owned(),
496 start_time_unix_nanos: self.session_start,
497 end_time_unix_nanos: end_time,
498 attributes: vec![
499 ("service.name".to_owned(), self.service_name.clone()),
500 ("zeph.session.trace_id".to_owned(), hex16(&self.trace_id)),
501 ],
502 status: SpanStatus::Ok,
503 };
504
505 let mut all_spans = vec![session_span];
506 all_spans.extend(self.completed_spans.drain(..));
507
508 let json = serialize_otlp_json(&all_spans, &self.service_name, &self.trace_metadata);
509 let path = self.output_dir.join("trace.json");
510 if let Err(e) = write_trace_file(&path, json.as_bytes()) {
511 tracing::warn!(path = %path.display(), error = %e, "trace.json write failed");
512 } else {
513 tracing::info!(path = %path.display(), "OTel trace written");
514 }
515
516 if let Some(ref tx) = self.trace_tx {
518 let event = TraceEvent {
519 trace_id: self.trace_id,
520 spans: all_spans,
521 };
522 if tx.send(event).is_err() {
523 tracing::debug!("OTLP trace channel closed, skipping export");
524 }
525 }
526 }
527}
528
529impl Drop for TracingCollector {
531 fn drop(&mut self) {
532 self.finish();
533 }
534}
535
536fn span_status_code(status: &SpanStatus) -> u8 {
539 match status {
540 SpanStatus::Unset => 0,
541 SpanStatus::Ok => 1,
542 SpanStatus::Error { .. } => 2,
543 }
544}
545
546#[must_use]
553pub fn serialize_otlp_json<S: std::hash::BuildHasher>(
554 spans: &[SpanData],
555 service_name: &str,
556 trace_metadata: &HashMap<String, String, S>,
557) -> String {
558 let otlp_spans: Vec<serde_json::Value> = spans
559 .iter()
560 .map(|s| {
561 let attrs: Vec<serde_json::Value> = s
562 .attributes
563 .iter()
564 .map(|(k, v)| {
565 serde_json::json!({
566 "key": k,
567 "value": { "stringValue": v }
568 })
569 })
570 .collect();
571
572 let mut obj = serde_json::json!({
573 "traceId": hex16(&s.trace_id),
574 "spanId": hex8(s.span_id),
575 "name": s.name,
576 "startTimeUnixNano": s.start_time_unix_nanos.to_string(),
578 "endTimeUnixNano": s.end_time_unix_nanos.to_string(),
579 "attributes": attrs,
580 "status": {
581 "code": span_status_code(&s.status)
582 }
583 });
584
585 if let Some(parent) = s.parent_span_id {
586 obj["parentSpanId"] = serde_json::json!(hex8(parent));
587 }
588
589 if let SpanStatus::Error { message } = &s.status {
590 obj["status"]["message"] = serde_json::json!(message);
591 }
592
593 obj
594 })
595 .collect();
596
597 let mut resource_attrs = vec![serde_json::json!({
598 "key": "service.name",
599 "value": { "stringValue": service_name }
600 })];
601 for (k, v) in trace_metadata {
602 if k == "service.name" {
603 continue;
604 }
605 resource_attrs.push(serde_json::json!({
606 "key": k,
607 "value": { "stringValue": v }
608 }));
609 }
610
611 let payload = serde_json::json!({
612 "resourceSpans": [{
613 "resource": {
614 "attributes": resource_attrs
615 },
616 "scopeSpans": [{
617 "scope": {
618 "name": "zeph",
619 "version": env!("CARGO_PKG_VERSION")
620 },
621 "spans": otlp_spans
622 }]
623 }]
624 });
625
626 serde_json::to_string_pretty(&payload)
627 .unwrap_or_else(|e| format!("{{\"error\": \"serialization failed: {e}\"}}"))
628}
629
630fn write_trace_file(path: &Path, data: &[u8]) -> std::io::Result<()> {
635 #[cfg(unix)]
636 {
637 use std::io::Write as _;
638 use std::os::unix::fs::OpenOptionsExt as _;
639 let mut f = std::fs::OpenOptions::new()
640 .write(true)
641 .create(true)
642 .truncate(true)
643 .mode(0o600)
644 .open(path)?;
645 f.write_all(data)
646 }
647 #[cfg(not(unix))]
648 {
649 std::fs::write(path, data)
650 }
651}
652
653fn sanitize_name(name: &str) -> String {
654 name.chars()
655 .map(|c| {
656 if c.is_alphanumeric() || c == '-' || c == '_' || c == '.' {
657 c
658 } else {
659 '_'
660 }
661 })
662 .collect()
663}
664
665#[cfg(test)]
668mod tests {
669 use super::*;
670 use tempfile::tempdir;
671
672 fn make_collector(dir: &Path) -> TracingCollector {
673 TracingCollector::new(dir, "zeph-test", HashMap::new(), false, None).unwrap()
674 }
675
676 #[test]
677 fn span_id_generation_is_unique() {
678 let ids: Vec<[u8; 8]> = (0..100).map(|_| new_span_id()).collect();
679 let unique: std::collections::HashSet<[u8; 8]> = ids.into_iter().collect();
680 assert_eq!(unique.len(), 100);
681 }
682
683 #[test]
684 fn hex_lengths_correct() {
685 assert_eq!(hex16(&new_trace_id()).len(), 32);
686 assert_eq!(hex8(new_span_id()).len(), 16);
687 }
688
689 #[test]
690 fn collector_creates_output_dir() {
691 let tmp = tempdir().unwrap();
692 let sub = tmp.path().join("traces");
693 make_collector(&sub);
694 assert!(sub.exists());
695 }
696
697 #[test]
698 fn finish_writes_trace_json() {
699 let tmp = tempdir().unwrap();
700 let mut c = make_collector(tmp.path());
701 c.begin_iteration(0, "hello world");
702 c.end_iteration(0, SpanStatus::Ok);
703 c.finish();
704
705 let path = tmp.path().join("trace.json");
706 assert!(path.exists(), "trace.json must be written");
707
708 let v: serde_json::Value =
709 serde_json::from_str(&std::fs::read_to_string(&path).unwrap()).unwrap();
710 assert!(v["resourceSpans"].is_array());
711 let spans = v["resourceSpans"][0]["scopeSpans"][0]["spans"]
713 .as_array()
714 .unwrap();
715 assert_eq!(spans.len(), 2);
716 }
717
718 #[test]
719 fn span_hierarchy_parent_child_correct() {
720 let tmp = tempdir().unwrap();
721 let mut c = make_collector(tmp.path());
722 c.begin_iteration(0, "test");
723 let iter_id = c.current_iteration_span_id(0).unwrap();
724 let guard = c.begin_llm_request(iter_id);
725 c.end_llm_request(
726 guard,
727 &LlmAttributes {
728 model: "test-model".to_owned(),
729 prompt_tokens: 100,
730 completion_tokens: 50,
731 latency_ms: 200,
732 streaming: false,
733 cache_hit: false,
734 },
735 );
736 c.end_iteration(0, SpanStatus::Ok);
737 c.finish();
738
739 let content = std::fs::read_to_string(tmp.path().join("trace.json")).unwrap();
740 let v: serde_json::Value = serde_json::from_str(&content).unwrap();
741 let spans = v["resourceSpans"][0]["scopeSpans"][0]["spans"]
742 .as_array()
743 .unwrap();
744 assert_eq!(spans.len(), 3, "session + iteration + llm = 3 spans");
745
746 let llm_span = spans
747 .iter()
748 .find(|s| s["name"] == "llm.request")
749 .expect("llm.request span missing");
750 let iter_span = spans
751 .iter()
752 .find(|s| s["name"] == "iteration.0")
753 .expect("iteration.0 span missing");
754
755 assert_eq!(
756 llm_span["parentSpanId"], iter_span["spanId"],
757 "llm span parent must be iteration span"
758 );
759 }
760
761 #[test]
762 fn redaction_applied_to_text_attributes() {
763 let tmp = tempdir().unwrap();
764 let mut c = TracingCollector::new(tmp.path(), "test", HashMap::new(), true, None).unwrap();
765 let iter_id = c.session_span_id();
766 let guard = c.begin_memory_search(iter_id);
767 c.end_memory_search(
768 guard,
769 &MemorySearchAttributes {
770 query_preview: "search sk-secretkey123 here".to_owned(),
771 result_count: 3,
772 latency_ms: 10,
773 },
774 );
775 c.finish();
776
777 let content = std::fs::read_to_string(tmp.path().join("trace.json")).unwrap();
778 assert!(
779 !content.contains("sk-secretkey123"),
780 "raw secret must be redacted from trace"
781 );
782 }
783
784 #[test]
785 fn otlp_json_format_spec_compliant() {
786 let trace_id = [0xAB_u8; 16];
787 let span_id = [0xCD_u8; 8];
788 let spans = vec![SpanData {
789 trace_id,
790 span_id,
791 parent_span_id: None,
792 name: "session".to_owned(),
793 start_time_unix_nanos: 1_000_000,
794 end_time_unix_nanos: 2_000_000,
795 attributes: vec![("service.name".to_owned(), "zeph".to_owned())],
796 status: SpanStatus::Ok,
797 }];
798
799 let json = serialize_otlp_json(&spans, "zeph", &HashMap::new());
800 let v: serde_json::Value = serde_json::from_str(&json).unwrap();
801
802 let span = &v["resourceSpans"][0]["scopeSpans"][0]["spans"][0];
803 assert_eq!(
804 span["traceId"],
805 "abababababababababababababababababab"[..32]
806 );
807 assert_eq!(span["spanId"], "cdcdcdcdcdcdcdcd");
808 assert_eq!(span["name"], "session");
809 assert!(span["startTimeUnixNano"].is_string());
811 assert_eq!(span["status"]["code"], 1_u64);
812 }
813
814 #[test]
815 fn drop_flushes_trace() {
816 let tmp = tempdir().unwrap();
817 {
818 let mut c = make_collector(tmp.path());
819 c.begin_iteration(0, "hello");
820 }
822 assert!(
823 tmp.path().join("trace.json").exists(),
824 "Drop must flush trace.json"
825 );
826 }
827
828 #[test]
829 fn trace_metadata_included_in_resource_attributes() {
830 let tmp = tempdir().unwrap();
831 let mut meta = HashMap::new();
832 meta.insert("deployment.environment".to_owned(), "staging".to_owned());
833 meta.insert("service.name".to_owned(), "should-be-skipped".to_owned());
834 let mut c = TracingCollector::new(tmp.path(), "zeph-test", meta, false, None).unwrap();
835 c.finish();
836
837 let content = std::fs::read_to_string(tmp.path().join("trace.json")).unwrap();
838 let v: serde_json::Value = serde_json::from_str(&content).unwrap();
839 let attrs = v["resourceSpans"][0]["resource"]["attributes"]
840 .as_array()
841 .unwrap();
842
843 let svc = attrs
845 .iter()
846 .find(|a| a["key"] == "service.name")
847 .expect("service.name must be in resource attributes");
848 assert_eq!(svc["value"]["stringValue"], "zeph-test");
849
850 let env_attr = attrs.iter().find(|a| a["key"] == "deployment.environment");
852 assert!(
853 env_attr.is_some(),
854 "deployment.environment must be in resource attributes"
855 );
856 assert_eq!(env_attr.unwrap()["value"]["stringValue"], "staging");
857 }
858
859 #[test]
860 fn finish_is_idempotent() {
861 let tmp = tempdir().unwrap();
862 let mut c = make_collector(tmp.path());
863 c.finish();
864 c.finish();
865 assert!(tmp.path().join("trace.json").exists());
866 }
867
868 #[test]
869 fn concurrent_iterations_tracked_independently() {
870 let tmp = tempdir().unwrap();
871 let mut c = make_collector(tmp.path());
872 c.begin_iteration(0, "first");
873 c.begin_iteration(1, "second");
874 let id0 = c.current_iteration_span_id(0).unwrap();
875 let id1 = c.current_iteration_span_id(1).unwrap();
876 assert_ne!(
877 id0, id1,
878 "concurrent iterations must have distinct span IDs"
879 );
880 c.end_iteration(0, SpanStatus::Ok);
881 c.end_iteration(1, SpanStatus::Ok);
882 c.finish();
883
884 let v: serde_json::Value =
885 serde_json::from_str(&std::fs::read_to_string(tmp.path().join("trace.json")).unwrap())
886 .unwrap();
887 let spans = v["resourceSpans"][0]["scopeSpans"][0]["spans"]
888 .as_array()
889 .unwrap();
890 assert_eq!(spans.len(), 3);
892 }
893
894 #[test]
895 fn trace_format_skips_legacy_numbered_files() {
896 use crate::debug_dump::{DebugDumper, DumpFormat, RequestDebugDump};
897
898 let tmp = tempdir().unwrap();
899 let d = DebugDumper::new(tmp.path(), DumpFormat::Trace).unwrap();
900 let session_dir = d.dir().to_owned();
901 let id = d.dump_request(&RequestDebugDump {
902 model_name: "test",
903 messages: &[],
904 tools: &[],
905 provider_request: serde_json::json!({}),
906 memcot_state: None,
907 });
908 d.dump_response(id, "resp");
909 d.dump_tool_output("shell", "output");
910
911 let files: Vec<_> = std::fs::read_dir(&session_dir)
913 .unwrap()
914 .filter_map(std::result::Result::ok)
915 .filter(|e| {
916 e.file_name()
917 .to_string_lossy()
918 .chars()
919 .next()
920 .is_some_and(|c| c.is_ascii_digit())
921 })
922 .collect();
923 assert!(
924 files.is_empty(),
925 "no legacy numbered files in Trace format session dir"
926 );
927
928 assert!(session_dir.is_dir(), "session subdir must exist");
931 }
932
933 #[test]
934 fn tool_call_span_emitted() {
935 let tmp = tempdir().unwrap();
936 let mut c = make_collector(tmp.path());
937 c.begin_iteration(0, "test");
938 let iter_id = c.current_iteration_span_id(0).unwrap();
939 let guard = c.begin_tool_call("shell", iter_id);
940 c.end_tool_call(
941 guard,
942 "shell",
943 ToolAttributes {
944 latency_ms: 50,
945 is_error: false,
946 error_kind: None,
947 },
948 );
949 c.end_iteration(0, SpanStatus::Ok);
950 c.finish();
951
952 let content = std::fs::read_to_string(c.trace_json_path()).unwrap();
953 let v: serde_json::Value = serde_json::from_str(&content).unwrap();
954 let spans = v["resourceSpans"][0]["scopeSpans"][0]["spans"]
955 .as_array()
956 .unwrap();
957 assert!(
958 spans.iter().any(|s| s["name"] == "tool.shell"),
959 "tool.shell span must be emitted"
960 );
961 }
962
963 #[test]
964 fn tool_call_error_span_emitted() {
965 let tmp = tempdir().unwrap();
966 let mut c = make_collector(tmp.path());
967 c.begin_iteration(0, "test");
968 let iter_id = c.current_iteration_span_id(0).unwrap();
969 let guard = c.begin_tool_call("shell", iter_id);
970 c.end_tool_call(
971 guard,
972 "shell",
973 ToolAttributes {
974 latency_ms: 10,
975 is_error: true,
976 error_kind: Some("permission denied".to_owned()),
977 },
978 );
979 c.end_iteration(0, SpanStatus::Ok);
980 c.finish();
981
982 let content = std::fs::read_to_string(c.trace_json_path()).unwrap();
983 let v: serde_json::Value = serde_json::from_str(&content).unwrap();
984 let spans = v["resourceSpans"][0]["scopeSpans"][0]["spans"]
985 .as_array()
986 .unwrap();
987 let tool_span = spans
988 .iter()
989 .find(|s| s["name"] == "tool.shell")
990 .expect("tool.shell span missing");
991 assert_eq!(
992 tool_span["status"]["code"], 2_u64,
993 "error span must have status code 2"
994 );
995 }
996
997 #[test]
998 fn begin_tool_call_at_timestamps_precede_end_time() {
999 let tmp = tempdir().unwrap();
1000 let mut c = make_collector(tmp.path());
1001 c.begin_iteration(0, "test");
1002 let iter_id = c.current_iteration_span_id(0).unwrap();
1003
1004 let started_at = std::time::Instant::now();
1006 std::thread::sleep(std::time::Duration::from_millis(2));
1007 let guard = c.begin_tool_call_at("shell", iter_id, &started_at);
1008 let span_start = guard.start_time_unix_nanos;
1009 c.end_tool_call(
1010 guard,
1011 "shell",
1012 ToolAttributes {
1013 latency_ms: 2,
1014 is_error: false,
1015 error_kind: None,
1016 },
1017 );
1018 c.end_iteration(0, SpanStatus::Ok);
1019 c.finish();
1020
1021 let content = std::fs::read_to_string(c.trace_json_path()).unwrap();
1022 let v: serde_json::Value = serde_json::from_str(&content).unwrap();
1023 let spans = v["resourceSpans"][0]["scopeSpans"][0]["spans"]
1024 .as_array()
1025 .unwrap();
1026 let tool_span = spans
1027 .iter()
1028 .find(|s| s["name"] == "tool.shell")
1029 .expect("tool.shell span missing");
1030 let recorded_start: u64 = tool_span["startTimeUnixNano"]
1031 .as_str()
1032 .unwrap()
1033 .parse()
1034 .unwrap();
1035 let recorded_end: u64 = tool_span["endTimeUnixNano"]
1036 .as_str()
1037 .unwrap()
1038 .parse()
1039 .unwrap();
1040 assert!(
1042 recorded_start < recorded_end,
1043 "start ({recorded_start}) must precede end ({recorded_end})"
1044 );
1045 assert_eq!(
1047 span_start, recorded_start,
1048 "guard start must match serialized start"
1049 );
1050 }
1051
1052 #[test]
1053 fn session_to_iteration_parent_span_id() {
1054 let tmp = tempdir().unwrap();
1055 let mut c = make_collector(tmp.path());
1056 let session_id = c.session_span_id();
1057 c.begin_iteration(0, "test");
1058 c.end_iteration(0, SpanStatus::Ok);
1059 c.finish();
1060
1061 let content = std::fs::read_to_string(c.trace_json_path()).unwrap();
1062 let v: serde_json::Value = serde_json::from_str(&content).unwrap();
1063 let spans = v["resourceSpans"][0]["scopeSpans"][0]["spans"]
1064 .as_array()
1065 .unwrap();
1066 let iter_span = spans
1067 .iter()
1068 .find(|s| s["name"] == "iteration.0")
1069 .expect("iteration.0 span missing");
1070 assert_eq!(
1071 iter_span["parentSpanId"],
1072 serde_json::json!(hex8(session_id)),
1073 "iteration span parent must be session span"
1074 );
1075 }
1076
1077 #[test]
1078 fn json_and_raw_formats_still_write_files() {
1079 use crate::debug_dump::{DebugDumper, DumpFormat, RequestDebugDump};
1080
1081 let tmp = tempdir().unwrap();
1082 for fmt in [DumpFormat::Json, DumpFormat::Raw] {
1083 let d = DebugDumper::new(tmp.path(), fmt).unwrap();
1084 let id = d.dump_request(&RequestDebugDump {
1085 model_name: "test-model",
1086 messages: &[],
1087 tools: &[],
1088 provider_request: serde_json::json!({"model": "test-model", "max_tokens": 100}),
1089 memcot_state: None,
1090 });
1091 d.dump_response(id, "hello");
1092 let session_dir = std::fs::read_dir(tmp.path())
1093 .unwrap()
1094 .filter_map(std::result::Result::ok)
1095 .find(|e| e.file_type().is_ok_and(|t| t.is_dir()))
1096 .unwrap()
1097 .path();
1098 assert!(
1099 session_dir.join("0000-request.json").exists(),
1100 "request file must exist for format {fmt:?}"
1101 );
1102 }
1103 }
1104}