1use std::cell::RefCell;
11use std::collections::BTreeMap;
12use std::time::{Instant, SystemTime, UNIX_EPOCH};
13
14use crate::value::VmValue;
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum SpanKind {
19 Pipeline,
20 FnCall,
21 LlmCall,
22 ToolCall,
23 Import,
24 Parallel,
25 Spawn,
26 Step,
28 VmSetup,
30 Suspension,
32 Resume,
34 Drain,
36 DrainDecision,
38 PoolSubmit,
40 PoolDequeue,
44 ChannelEmit,
47 ChannelMatch,
52 UserTiming,
56}
57
58impl SpanKind {
59 pub fn as_str(self) -> &'static str {
60 match self {
61 Self::Pipeline => "pipeline",
62 Self::FnCall => "fn_call",
63 Self::LlmCall => "llm_call",
64 Self::ToolCall => "tool_call",
65 Self::Import => "import",
66 Self::Parallel => "parallel",
67 Self::Spawn => "spawn",
68 Self::Step => "step",
69 Self::VmSetup => "vm_setup",
70 Self::Suspension => "suspension",
71 Self::Resume => "resume",
72 Self::Drain => "drain",
73 Self::DrainDecision => "drain_decision",
74 Self::PoolSubmit => "pool_submit",
75 Self::PoolDequeue => "pool_dequeue",
76 Self::ChannelEmit => "channel_emit",
77 Self::ChannelMatch => "channel_match",
78 Self::UserTiming => "user_timing",
79 }
80 }
81}
82
83#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
85#[serde(default)]
86pub struct SpanLink {
87 pub trace_id: String,
88 pub span_id: String,
89 #[serde(skip_serializing_if = "BTreeMap::is_empty")]
90 pub attributes: BTreeMap<String, String>,
91}
92
93impl SpanLink {
94 pub fn new(trace_id: impl Into<String>, span_id: impl Into<String>) -> Self {
95 Self {
96 trace_id: trace_id.into(),
97 span_id: span_id.into(),
98 attributes: BTreeMap::new(),
99 }
100 }
101
102 pub fn with_attributes(mut self, attributes: BTreeMap<String, String>) -> Self {
103 self.attributes = attributes;
104 self
105 }
106}
107
108#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
112#[serde(default)]
113pub struct SpanEvent {
114 pub name: String,
115 pub time_unix_ms: u64,
117 pub offset_ms: u64,
119 #[serde(skip_serializing_if = "BTreeMap::is_empty")]
120 pub attributes: BTreeMap<String, serde_json::Value>,
121}
122
123#[derive(Debug, Clone)]
125pub struct Span {
126 pub trace_id: String,
127 pub span_id: u64,
128 pub parent_id: Option<u64>,
129 pub kind: SpanKind,
130 pub name: String,
131 pub start_ms: u64,
133 pub start_unix_ms: u64,
137 pub duration_ms: u64,
138 pub metadata: BTreeMap<String, serde_json::Value>,
139 pub links: Vec<SpanLink>,
140 pub events: Vec<SpanEvent>,
141}
142
143struct OpenSpan {
145 trace_id: String,
146 span_id: u64,
147 parent_id: Option<u64>,
148 kind: SpanKind,
149 name: String,
150 started_at: Instant,
151 started_at_mock_mono_ms: Option<u64>,
155 start_unix_ms: u64,
156 metadata: BTreeMap<String, serde_json::Value>,
157 links: Vec<SpanLink>,
158 events: Vec<SpanEvent>,
159}
160
161pub struct SpanCollector {
164 trace_id: String,
165 next_id: u64,
166 active_stack: Vec<u64>,
168 open: BTreeMap<u64, OpenSpan>,
170 completed: Vec<Span>,
172 epoch: Instant,
174}
175
176impl Default for SpanCollector {
177 fn default() -> Self {
178 Self::new()
179 }
180}
181
182impl SpanCollector {
183 pub fn new() -> Self {
184 Self {
185 next_id: 1,
186 trace_id: format!("trace_{}", uuid::Uuid::now_v7()),
187 active_stack: Vec::new(),
188 open: BTreeMap::new(),
189 completed: Vec::new(),
190 epoch: Instant::now(),
191 }
192 }
193
194 pub fn start(&mut self, kind: SpanKind, name: String) -> u64 {
196 let parent_id = self.active_stack.last().copied();
197 self.start_with_parent(kind, name, Vec::new(), parent_id)
198 }
199
200 pub fn start_with_links(&mut self, kind: SpanKind, name: String, links: Vec<SpanLink>) -> u64 {
202 let parent_id = self.active_stack.last().copied();
203 self.start_with_parent(kind, name, links, parent_id)
204 }
205
206 pub fn start_detached_with_links(
208 &mut self,
209 kind: SpanKind,
210 name: String,
211 links: Vec<SpanLink>,
212 ) -> u64 {
213 self.start_with_parent(kind, name, links, None)
214 }
215
216 fn start_with_parent(
217 &mut self,
218 kind: SpanKind,
219 name: String,
220 links: Vec<SpanLink>,
221 parent_id: Option<u64>,
222 ) -> u64 {
223 let id = self.next_id;
224 self.next_id += 1;
225 let now = Instant::now();
226 let started_at_mock_mono_ms = mock_monotonic_ms();
227 let start_unix_ms = wall_clock_ms();
228
229 let mut event_metadata = BTreeMap::new();
230 if !links.is_empty() {
231 event_metadata.insert("links".to_string(), serde_json::json!(links));
232 }
233 crate::events::emit_span_start(id, parent_id, &name, kind.as_str(), event_metadata);
234
235 self.open.insert(
236 id,
237 OpenSpan {
238 trace_id: self.trace_id.clone(),
239 span_id: id,
240 parent_id,
241 kind,
242 name,
243 started_at: now,
244 started_at_mock_mono_ms,
245 start_unix_ms,
246 metadata: BTreeMap::new(),
247 links,
248 events: Vec::new(),
249 },
250 );
251 self.active_stack.push(id);
252 id
253 }
254
255 pub fn set_metadata(&mut self, span_id: u64, key: &str, value: serde_json::Value) {
257 if let Some(span) = self.open.get_mut(&span_id) {
258 span.metadata.insert(key.to_string(), value);
259 }
260 }
261
262 pub fn attach_metadata_if_absent(&mut self, span_id: u64, key: &str, value: serde_json::Value) {
264 if let Some(span) = self.open.get_mut(&span_id) {
265 span.metadata.entry(key.to_string()).or_insert(value);
266 return;
267 }
268 if let Some(span) = self
269 .completed
270 .iter_mut()
271 .rev()
272 .find(|span| span.span_id == span_id)
273 {
274 span.metadata.entry(key.to_string()).or_insert(value);
275 }
276 }
277
278 pub fn record_event(
282 &mut self,
283 span_id: u64,
284 name: String,
285 attributes: BTreeMap<String, serde_json::Value>,
286 ) -> bool {
287 let Some(span) = self.open.get_mut(&span_id) else {
288 return false;
289 };
290 let offset_ms = match (span.started_at_mock_mono_ms, mock_monotonic_ms()) {
291 (Some(start), Some(now)) => now.saturating_sub(start),
292 _ => span.started_at.elapsed().as_millis() as u64,
293 };
294 span.events.push(SpanEvent {
295 name,
296 time_unix_ms: wall_clock_ms(),
297 offset_ms,
298 attributes,
299 });
300 true
301 }
302
303 pub fn open_start_unix_ms(&self, span_id: u64) -> Option<u64> {
305 self.open.get(&span_id).map(|span| span.start_unix_ms)
306 }
307
308 pub fn end(&mut self, span_id: u64) -> Option<Span> {
312 let span = self.open.remove(&span_id)?;
313 let start_ms = span.started_at.duration_since(self.epoch).as_millis() as u64;
314 let duration_ms = match (span.started_at_mock_mono_ms, mock_monotonic_ms()) {
315 (Some(start), Some(end)) => end.saturating_sub(start),
316 _ => span.started_at.elapsed().as_millis() as u64,
317 };
318
319 let mut end_meta = span.metadata.clone();
320 end_meta.insert(
321 "duration_ms".to_string(),
322 serde_json::Value::Number(serde_json::Number::from(duration_ms)),
323 );
324 crate::events::emit_span_end(span_id, end_meta);
325
326 let completed = Span {
327 trace_id: span.trace_id,
328 span_id: span.span_id,
329 parent_id: span.parent_id,
330 kind: span.kind,
331 name: span.name,
332 start_ms,
333 start_unix_ms: span.start_unix_ms,
334 duration_ms,
335 metadata: span.metadata,
336 links: span.links,
337 events: span.events,
338 };
339 self.completed.push(completed.clone());
340
341 if let Some(pos) = self.active_stack.iter().rposition(|&id| id == span_id) {
342 self.active_stack.remove(pos);
343 }
344 Some(completed)
345 }
346
347 pub fn current_span_id(&self) -> Option<u64> {
349 self.active_stack.last().copied()
350 }
351
352 pub fn span_link(&self, span_id: u64) -> Option<SpanLink> {
354 self.open
355 .get(&span_id)
356 .map(|span| SpanLink::new(span.trace_id.clone(), span.span_id.to_string()))
357 }
358
359 pub fn current_span_link(&self) -> Option<SpanLink> {
361 self.current_span_id()
362 .and_then(|span_id| self.span_link(span_id))
363 }
364
365 pub fn take_spans(&mut self) -> Vec<Span> {
367 std::mem::take(&mut self.completed)
368 }
369
370 pub fn spans(&self) -> &[Span] {
372 &self.completed
373 }
374
375 pub fn reset(&mut self) {
377 self.active_stack.clear();
378 self.open.clear();
379 self.completed.clear();
380 self.next_id = 1;
381 self.trace_id = format!("trace_{}", uuid::Uuid::now_v7());
382 self.epoch = Instant::now();
383 }
384}
385
386thread_local! {
387 static COLLECTOR: RefCell<SpanCollector> = RefCell::new(SpanCollector::new());
388 static TRACING_ENABLED: RefCell<bool> = const { RefCell::new(false) };
389}
390
391fn wall_clock_ms() -> u64 {
396 if let Some(mock) = crate::clock_mock::active_mock_clock() {
397 return mock.now_wall_ms() as u64;
398 }
399 SystemTime::now()
400 .duration_since(UNIX_EPOCH)
401 .map(|d| d.as_millis() as u64)
402 .unwrap_or(0)
403}
404
405fn mock_monotonic_ms() -> Option<u64> {
412 crate::clock_mock::active_mock_clock().map(|mock| mock.now_monotonic_ms() as u64)
413}
414
415pub fn set_tracing_enabled(enabled: bool) {
417 TRACING_ENABLED.with(|e| *e.borrow_mut() = enabled);
418 if enabled {
419 COLLECTOR.with(|c| c.borrow_mut().reset());
420 }
421}
422
423pub fn is_tracing_enabled() -> bool {
425 TRACING_ENABLED.with(|e| *e.borrow())
426}
427
428pub fn span_start(kind: SpanKind, name: String) -> u64 {
430 if !is_tracing_enabled() {
431 return 0;
432 }
433 COLLECTOR.with(|c| c.borrow_mut().start(kind, name))
434}
435
436pub fn span_start_with_links(kind: SpanKind, name: String, links: Vec<SpanLink>) -> u64 {
438 if !is_tracing_enabled() {
439 return 0;
440 }
441 COLLECTOR.with(|c| c.borrow_mut().start_with_links(kind, name, links))
442}
443
444pub fn span_start_detached_with_links(kind: SpanKind, name: String, links: Vec<SpanLink>) -> u64 {
446 if !is_tracing_enabled() {
447 return 0;
448 }
449 COLLECTOR.with(|c| c.borrow_mut().start_detached_with_links(kind, name, links))
450}
451
452pub fn span_set_metadata(span_id: u64, key: &str, value: serde_json::Value) {
454 if span_id == 0 {
455 return;
456 }
457 COLLECTOR.with(|c| c.borrow_mut().set_metadata(span_id, key, value));
458}
459
460pub fn span_end(span_id: u64) -> Option<Span> {
463 if span_id == 0 {
464 return None;
465 }
466 COLLECTOR.with(|c| c.borrow_mut().end(span_id))
467}
468
469pub fn span_start_user_timing(
474 name: String,
475 attrs: BTreeMap<String, serde_json::Value>,
476) -> (u64, String, Option<u64>, u64) {
477 COLLECTOR.with(|c| {
478 let mut c = c.borrow_mut();
479 let id = c.start(SpanKind::UserTiming, name);
480 for (key, value) in attrs {
481 c.set_metadata(id, &key, value);
482 }
483 let parent = c.open.get(&id).and_then(|span| span.parent_id);
484 let trace_id = c
485 .open
486 .get(&id)
487 .map(|span| span.trace_id.clone())
488 .unwrap_or_default();
489 let start_unix_ms = c.open_start_unix_ms(id).unwrap_or(0);
490 (id, trace_id, parent, start_unix_ms)
491 })
492}
493
494pub fn span_record_event(
498 span_id: u64,
499 name: String,
500 attributes: BTreeMap<String, serde_json::Value>,
501) -> bool {
502 if span_id == 0 {
503 return false;
504 }
505 COLLECTOR.with(|c| c.borrow_mut().record_event(span_id, name, attributes))
506}
507
508pub fn span_attach_metadata(span_id: u64, key: &str, value: serde_json::Value) {
511 if span_id == 0 {
512 return;
513 }
514 COLLECTOR.with(|c| c.borrow_mut().set_metadata(span_id, key, value));
515}
516
517pub fn span_attach_metadata_if_absent(span_id: u64, key: &str, value: serde_json::Value) {
519 if span_id == 0 {
520 return;
521 }
522 COLLECTOR.with(|c| {
523 c.borrow_mut()
524 .attach_metadata_if_absent(span_id, key, value);
525 });
526}
527
528pub fn current_span_id() -> Option<u64> {
530 if !is_tracing_enabled() {
531 return None;
532 }
533 COLLECTOR.with(|c| c.borrow().current_span_id())
534}
535
536pub fn span_link(span_id: u64) -> Option<SpanLink> {
538 if span_id == 0 || !is_tracing_enabled() {
539 return None;
540 }
541 COLLECTOR.with(|c| c.borrow().span_link(span_id))
542}
543
544pub fn current_span_link() -> Option<SpanLink> {
546 if !is_tracing_enabled() {
547 return None;
548 }
549 COLLECTOR.with(|c| c.borrow().current_span_link())
550}
551
552pub fn take_spans() -> Vec<Span> {
554 COLLECTOR.with(|c| c.borrow_mut().take_spans())
555}
556
557pub fn peek_spans() -> Vec<Span> {
559 COLLECTOR.with(|c| c.borrow().spans().to_vec())
560}
561
562pub fn reset_tracing() {
564 COLLECTOR.with(|c| c.borrow_mut().reset());
565}
566
567pub fn span_to_vm_value(span: &Span) -> VmValue {
569 let mut d: BTreeMap<String, VmValue> = BTreeMap::new();
570 d.insert(
571 "trace_id".into(),
572 VmValue::String(arcstr::ArcStr::from(span.trace_id.as_str())),
573 );
574 d.insert("span_id".into(), VmValue::Int(span.span_id as i64));
575 d.insert(
576 "parent_id".into(),
577 span.parent_id
578 .map(|id| VmValue::Int(id as i64))
579 .unwrap_or(VmValue::Nil),
580 );
581 d.insert(
582 "kind".into(),
583 VmValue::String(arcstr::ArcStr::from(span.kind.as_str())),
584 );
585 d.insert(
586 "name".into(),
587 VmValue::String(arcstr::ArcStr::from(span.name.as_str())),
588 );
589 d.insert("start_ms".into(), VmValue::Int(span.start_ms as i64));
590 d.insert(
591 "start_unix_ms".into(),
592 VmValue::Int(span.start_unix_ms as i64),
593 );
594 d.insert("duration_ms".into(), VmValue::Int(span.duration_ms as i64));
595
596 if !span.metadata.is_empty() {
597 let meta: crate::value::DictMap = span
598 .metadata
599 .iter()
600 .map(|(k, v)| {
601 (
602 crate::value::intern_key(k),
603 crate::stdlib::json_to_vm_value(v),
604 )
605 })
606 .collect();
607 d.insert("metadata".into(), VmValue::dict(meta));
608 }
609 if !span.links.is_empty() {
610 d.insert(
611 "links".into(),
612 crate::stdlib::json_to_vm_value(&serde_json::json!(span.links)),
613 );
614 }
615 if !span.events.is_empty() {
616 d.insert(
617 "events".into(),
618 crate::stdlib::json_to_vm_value(&serde_json::json!(span.events)),
619 );
620 }
621
622 VmValue::dict(d)
623}
624
625pub fn format_summary() -> String {
627 let spans = peek_spans();
628 if spans.is_empty() {
629 return "No spans recorded.".into();
630 }
631
632 let mut lines = Vec::new();
633 let total_ms: u64 = spans
634 .iter()
635 .filter(|s| s.parent_id.is_none())
636 .map(|s| s.duration_ms)
637 .sum();
638
639 lines.push(format!("Trace: {} spans, {total_ms}ms total", spans.len()));
640 lines.push(String::new());
641
642 fn print_tree(spans: &[Span], parent_id: Option<u64>, depth: usize, lines: &mut Vec<String>) {
643 let children: Vec<&Span> = spans.iter().filter(|s| s.parent_id == parent_id).collect();
644 for span in children {
645 let indent = " ".repeat(depth);
646 let meta_str = if span.metadata.is_empty() {
647 String::new()
648 } else {
649 let parts: Vec<String> = span
650 .metadata
651 .iter()
652 .map(|(k, v)| format!("{k}={v}"))
653 .collect();
654 format!(" ({})", parts.join(", "))
655 };
656 lines.push(format!(
657 "{indent}{} {} {}ms{meta_str}",
658 span.kind.as_str(),
659 span.name,
660 span.duration_ms,
661 ));
662 print_tree(spans, Some(span.span_id), depth + 1, lines);
663 }
664 }
665
666 print_tree(&spans, None, 0, &mut lines);
667 lines.join("\n")
668}
669
670#[cfg(test)]
671mod tests {
672 use super::*;
673
674 #[test]
675 fn test_span_collector_basic() {
676 let mut c = SpanCollector::new();
677 let id = c.start(SpanKind::Pipeline, "main".into());
678 assert_eq!(id, 1);
679 assert_eq!(c.current_span_id(), Some(1));
680 assert!(c.span_link(id).is_some());
681 c.end(id);
682 assert_eq!(c.current_span_id(), None);
683 assert_eq!(c.spans().len(), 1);
684 assert_eq!(c.spans()[0].name, "main");
685 assert_eq!(c.spans()[0].parent_id, None);
686 }
687
688 #[test]
689 fn test_span_parent_child() {
690 let mut c = SpanCollector::new();
691 let parent = c.start(SpanKind::Pipeline, "main".into());
692 let child = c.start(SpanKind::FnCall, "helper".into());
693 c.end(child);
694 c.end(parent);
695 assert_eq!(c.spans().len(), 2);
696 assert_eq!(c.spans()[0].parent_id, Some(parent));
697 assert_eq!(c.spans()[1].parent_id, None);
698 }
699
700 #[test]
701 fn test_span_metadata() {
702 let mut c = SpanCollector::new();
703 let id = c.start(SpanKind::LlmCall, "gpt-4".into());
704 c.set_metadata(id, "tokens", serde_json::json!(100));
705 c.end(id);
706 assert_eq!(c.spans()[0].metadata["tokens"], serde_json::json!(100));
707 }
708
709 #[test]
710 fn test_completed_span_metadata_can_be_attached_late() {
711 let mut c = SpanCollector::new();
712 let id = c.start(SpanKind::LlmCall, "gpt-4".into());
713 c.end(id);
714
715 c.attach_metadata_if_absent(id, "first_token_ms", serde_json::json!(125));
716 c.attach_metadata_if_absent(id, "first_token_ms", serde_json::json!(250));
717
718 assert_eq!(
719 c.spans()[0].metadata["first_token_ms"],
720 serde_json::json!(125)
721 );
722 }
723
724 #[test]
725 fn test_span_links_are_preserved() {
726 let mut c = SpanCollector::new();
727 let parent = c.start(SpanKind::Suspension, "suspend worker".into());
728 let link = c.span_link(parent).expect("link for open span");
729 c.end(parent);
730
731 let child = c.start_with_links(SpanKind::Resume, "resume worker".into(), vec![link]);
732 c.end(child);
733
734 assert_eq!(c.spans().len(), 2);
735 assert_eq!(c.spans()[1].parent_id, None);
736 assert_eq!(c.spans()[1].links.len(), 1);
737 assert_eq!(c.spans()[1].links[0].span_id, parent.to_string());
738 }
739
740 #[test]
741 fn test_detached_span_links_do_not_inherit_active_parent() {
742 let mut c = SpanCollector::new();
743 let pipeline = c.start(SpanKind::Pipeline, "pipeline".into());
744 let link = c.span_link(pipeline).expect("pipeline link");
745 let drain = c.start_detached_with_links(SpanKind::Drain, "drain".into(), vec![link]);
746 c.end(drain);
747 c.end(pipeline);
748
749 let drain = c
750 .spans()
751 .iter()
752 .find(|span| span.kind == SpanKind::Drain)
753 .expect("drain span");
754 assert_eq!(drain.parent_id, None);
755 assert_eq!(drain.links.len(), 1);
756 assert_eq!(drain.links[0].span_id, pipeline.to_string());
757 }
758
759 #[test]
760 fn test_noop_when_disabled() {
761 set_tracing_enabled(false);
762 let id = span_start(SpanKind::Pipeline, "test".into());
763 assert_eq!(id, 0);
764 assert!(span_end(id).is_none());
765 }
766
767 #[test]
768 fn test_user_timing_records_when_tracing_disabled() {
769 set_tracing_enabled(false);
774 reset_tracing();
775 let mut attrs = BTreeMap::new();
776 attrs.insert("phase".into(), serde_json::json!("warmup"));
777 let (id, trace_id, parent, start_unix_ms) =
778 span_start_user_timing("script.work".into(), attrs);
779 assert!(id != 0);
780 assert!(!trace_id.is_empty());
781 assert_eq!(parent, None);
782 assert!(start_unix_ms > 0);
783
784 assert!(span_record_event(id, "checkpoint".into(), BTreeMap::new()));
785
786 let closed = span_end(id).expect("user timing always records");
787 assert_eq!(closed.kind, SpanKind::UserTiming);
788 assert_eq!(closed.events.len(), 1);
789 assert_eq!(closed.events[0].name, "checkpoint");
790 assert_eq!(closed.metadata["phase"], serde_json::json!("warmup"));
791
792 let snapshot = peek_spans();
796 assert!(snapshot
797 .iter()
798 .any(|span| span.kind == SpanKind::UserTiming && span.name == "script.work"));
799 }
800
801 #[test]
802 fn test_span_event_offset_is_monotonic() {
803 let clock = crate::clock_mock::MockClock::at_wall_ms(1_000_000_000_000);
809 let _guard = crate::clock_mock::install_override(clock.clone());
810 let mut c = SpanCollector::new();
811 let id = c.start(SpanKind::UserTiming, "outer".into());
812 assert!(c.record_event(id, "before".into(), BTreeMap::new()));
813 clock.advance_std_sync(std::time::Duration::from_millis(10));
814 assert!(c.record_event(id, "after".into(), BTreeMap::new()));
815 let closed = c.end(id).expect("open span");
816 assert_eq!(closed.events.len(), 2);
817 assert!(
818 closed.events[1].offset_ms > closed.events[0].offset_ms,
819 "second event should have a strictly greater offset after a 10ms advance; \
820 before={} after={}",
821 closed.events[0].offset_ms,
822 closed.events[1].offset_ms
823 );
824 }
825}