1use std::cell::RefCell;
11use std::collections::BTreeMap;
12use std::time::{Instant, SystemTime, UNIX_EPOCH};
13
14use crate::value::VmValue;
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum SpanKind {
19 Pipeline,
20 FnCall,
21 LlmCall,
22 ToolCall,
23 Import,
24 Parallel,
25 Spawn,
26 Step,
28 VmSetup,
30 Suspension,
32 Resume,
34 Drain,
36 DrainDecision,
38 PoolSubmit,
40 PoolDequeue,
44 ChannelEmit,
47 ChannelMatch,
52 UserTiming,
56}
57
58impl SpanKind {
59 pub fn as_str(self) -> &'static str {
60 match self {
61 Self::Pipeline => "pipeline",
62 Self::FnCall => "fn_call",
63 Self::LlmCall => "llm_call",
64 Self::ToolCall => "tool_call",
65 Self::Import => "import",
66 Self::Parallel => "parallel",
67 Self::Spawn => "spawn",
68 Self::Step => "step",
69 Self::VmSetup => "vm_setup",
70 Self::Suspension => "suspension",
71 Self::Resume => "resume",
72 Self::Drain => "drain",
73 Self::DrainDecision => "drain_decision",
74 Self::PoolSubmit => "pool_submit",
75 Self::PoolDequeue => "pool_dequeue",
76 Self::ChannelEmit => "channel_emit",
77 Self::ChannelMatch => "channel_match",
78 Self::UserTiming => "user_timing",
79 }
80 }
81}
82
83#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
85#[serde(default)]
86pub struct SpanLink {
87 pub trace_id: String,
88 pub span_id: String,
89 #[serde(skip_serializing_if = "BTreeMap::is_empty")]
90 pub attributes: BTreeMap<String, String>,
91}
92
93impl SpanLink {
94 pub fn new(trace_id: impl Into<String>, span_id: impl Into<String>) -> Self {
95 Self {
96 trace_id: trace_id.into(),
97 span_id: span_id.into(),
98 attributes: BTreeMap::new(),
99 }
100 }
101
102 pub fn with_attributes(mut self, attributes: BTreeMap<String, String>) -> Self {
103 self.attributes = attributes;
104 self
105 }
106}
107
108#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
112#[serde(default)]
113pub struct SpanEvent {
114 pub name: String,
115 pub time_unix_ms: u64,
117 pub offset_ms: u64,
119 #[serde(skip_serializing_if = "BTreeMap::is_empty")]
120 pub attributes: BTreeMap<String, serde_json::Value>,
121}
122
123#[derive(Debug, Clone)]
125pub struct Span {
126 pub trace_id: String,
127 pub span_id: u64,
128 pub parent_id: Option<u64>,
129 pub kind: SpanKind,
130 pub name: String,
131 pub start_ms: u64,
133 pub start_unix_ms: u64,
137 pub duration_ms: u64,
138 pub metadata: BTreeMap<String, serde_json::Value>,
139 pub links: Vec<SpanLink>,
140 pub events: Vec<SpanEvent>,
141}
142
143struct OpenSpan {
145 trace_id: String,
146 span_id: u64,
147 parent_id: Option<u64>,
148 kind: SpanKind,
149 name: String,
150 started_at: Instant,
151 started_at_mock_mono_ms: Option<u64>,
155 start_unix_ms: u64,
156 metadata: BTreeMap<String, serde_json::Value>,
157 links: Vec<SpanLink>,
158 events: Vec<SpanEvent>,
159}
160
161pub struct SpanCollector {
164 trace_id: String,
165 next_id: u64,
166 active_stack: Vec<u64>,
168 open: BTreeMap<u64, OpenSpan>,
170 completed: Vec<Span>,
172 epoch: Instant,
174}
175
176impl Default for SpanCollector {
177 fn default() -> Self {
178 Self::new()
179 }
180}
181
182impl SpanCollector {
183 pub fn new() -> Self {
184 Self {
185 next_id: 1,
186 trace_id: format!("trace_{}", uuid::Uuid::now_v7()),
187 active_stack: Vec::new(),
188 open: BTreeMap::new(),
189 completed: Vec::new(),
190 epoch: Instant::now(),
191 }
192 }
193
194 pub fn start(&mut self, kind: SpanKind, name: String) -> u64 {
196 let parent_id = self.active_stack.last().copied();
197 self.start_with_parent(kind, name, Vec::new(), parent_id)
198 }
199
200 pub fn start_with_links(&mut self, kind: SpanKind, name: String, links: Vec<SpanLink>) -> u64 {
202 let parent_id = self.active_stack.last().copied();
203 self.start_with_parent(kind, name, links, parent_id)
204 }
205
206 pub fn start_detached_with_links(
208 &mut self,
209 kind: SpanKind,
210 name: String,
211 links: Vec<SpanLink>,
212 ) -> u64 {
213 self.start_with_parent(kind, name, links, None)
214 }
215
216 fn start_with_parent(
217 &mut self,
218 kind: SpanKind,
219 name: String,
220 links: Vec<SpanLink>,
221 parent_id: Option<u64>,
222 ) -> u64 {
223 let id = self.next_id;
224 self.next_id += 1;
225 let now = Instant::now();
226 let started_at_mock_mono_ms = mock_monotonic_ms();
227 let start_unix_ms = wall_clock_ms();
228
229 let mut event_metadata = BTreeMap::new();
230 if !links.is_empty() {
231 event_metadata.insert("links".to_string(), serde_json::json!(links));
232 }
233 crate::events::emit_span_start(id, parent_id, &name, kind.as_str(), event_metadata);
234
235 self.open.insert(
236 id,
237 OpenSpan {
238 trace_id: self.trace_id.clone(),
239 span_id: id,
240 parent_id,
241 kind,
242 name,
243 started_at: now,
244 started_at_mock_mono_ms,
245 start_unix_ms,
246 metadata: BTreeMap::new(),
247 links,
248 events: Vec::new(),
249 },
250 );
251 self.active_stack.push(id);
252 id
253 }
254
255 pub fn set_metadata(&mut self, span_id: u64, key: &str, value: serde_json::Value) {
257 if let Some(span) = self.open.get_mut(&span_id) {
258 span.metadata.insert(key.to_string(), value);
259 }
260 }
261
262 pub fn attach_metadata_if_absent(&mut self, span_id: u64, key: &str, value: serde_json::Value) {
264 if let Some(span) = self.open.get_mut(&span_id) {
265 span.metadata.entry(key.to_string()).or_insert(value);
266 return;
267 }
268 if let Some(span) = self
269 .completed
270 .iter_mut()
271 .rev()
272 .find(|span| span.span_id == span_id)
273 {
274 span.metadata.entry(key.to_string()).or_insert(value);
275 }
276 }
277
278 pub fn record_event(
282 &mut self,
283 span_id: u64,
284 name: String,
285 attributes: BTreeMap<String, serde_json::Value>,
286 ) -> bool {
287 let Some(span) = self.open.get_mut(&span_id) else {
288 return false;
289 };
290 let offset_ms = match (span.started_at_mock_mono_ms, mock_monotonic_ms()) {
291 (Some(start), Some(now)) => now.saturating_sub(start),
292 _ => span.started_at.elapsed().as_millis() as u64,
293 };
294 span.events.push(SpanEvent {
295 name,
296 time_unix_ms: wall_clock_ms(),
297 offset_ms,
298 attributes,
299 });
300 true
301 }
302
303 pub fn open_start_unix_ms(&self, span_id: u64) -> Option<u64> {
305 self.open.get(&span_id).map(|span| span.start_unix_ms)
306 }
307
308 pub fn end(&mut self, span_id: u64) -> Option<Span> {
312 let span = self.open.remove(&span_id)?;
313 let start_ms = span.started_at.duration_since(self.epoch).as_millis() as u64;
314 let duration_ms = match (span.started_at_mock_mono_ms, mock_monotonic_ms()) {
315 (Some(start), Some(end)) => end.saturating_sub(start),
316 _ => span.started_at.elapsed().as_millis() as u64,
317 };
318
319 let mut end_meta = span.metadata.clone();
320 end_meta.insert(
321 "duration_ms".to_string(),
322 serde_json::Value::Number(serde_json::Number::from(duration_ms)),
323 );
324 crate::events::emit_span_end(span_id, end_meta);
325
326 let completed = Span {
327 trace_id: span.trace_id,
328 span_id: span.span_id,
329 parent_id: span.parent_id,
330 kind: span.kind,
331 name: span.name,
332 start_ms,
333 start_unix_ms: span.start_unix_ms,
334 duration_ms,
335 metadata: span.metadata,
336 links: span.links,
337 events: span.events,
338 };
339 self.completed.push(completed.clone());
340
341 if let Some(pos) = self.active_stack.iter().rposition(|&id| id == span_id) {
342 self.active_stack.remove(pos);
343 }
344 Some(completed)
345 }
346
347 pub fn current_span_id(&self) -> Option<u64> {
349 self.active_stack.last().copied()
350 }
351
352 pub fn span_link(&self, span_id: u64) -> Option<SpanLink> {
354 self.open
355 .get(&span_id)
356 .map(|span| SpanLink::new(span.trace_id.clone(), span.span_id.to_string()))
357 }
358
359 pub fn current_span_link(&self) -> Option<SpanLink> {
361 self.current_span_id()
362 .and_then(|span_id| self.span_link(span_id))
363 }
364
365 pub fn take_spans(&mut self) -> Vec<Span> {
367 std::mem::take(&mut self.completed)
368 }
369
370 pub fn spans(&self) -> &[Span] {
372 &self.completed
373 }
374
375 pub fn reset(&mut self) {
377 self.active_stack.clear();
378 self.open.clear();
379 self.completed.clear();
380 self.next_id = 1;
381 self.trace_id = format!("trace_{}", uuid::Uuid::now_v7());
382 self.epoch = Instant::now();
383 }
384}
385
386thread_local! {
387 static COLLECTOR: RefCell<SpanCollector> = RefCell::new(SpanCollector::new());
388 static TRACING_ENABLED: RefCell<bool> = const { RefCell::new(false) };
389}
390
391fn wall_clock_ms() -> u64 {
396 if let Some(mock) = crate::clock_mock::active_mock_clock() {
397 return mock.now_wall_ms() as u64;
398 }
399 SystemTime::now()
400 .duration_since(UNIX_EPOCH)
401 .map(|d| d.as_millis() as u64)
402 .unwrap_or(0)
403}
404
405fn mock_monotonic_ms() -> Option<u64> {
412 crate::clock_mock::active_mock_clock().map(|mock| mock.now_monotonic_ms() as u64)
413}
414
415pub fn set_tracing_enabled(enabled: bool) {
417 TRACING_ENABLED.with(|e| *e.borrow_mut() = enabled);
418 if enabled {
419 COLLECTOR.with(|c| c.borrow_mut().reset());
420 }
421}
422
423pub fn is_tracing_enabled() -> bool {
425 TRACING_ENABLED.with(|e| *e.borrow())
426}
427
428pub fn span_start(kind: SpanKind, name: String) -> u64 {
430 if !is_tracing_enabled() {
431 return 0;
432 }
433 COLLECTOR.with(|c| c.borrow_mut().start(kind, name))
434}
435
436pub fn span_start_with_links(kind: SpanKind, name: String, links: Vec<SpanLink>) -> u64 {
438 if !is_tracing_enabled() {
439 return 0;
440 }
441 COLLECTOR.with(|c| c.borrow_mut().start_with_links(kind, name, links))
442}
443
444pub fn span_start_detached_with_links(kind: SpanKind, name: String, links: Vec<SpanLink>) -> u64 {
446 if !is_tracing_enabled() {
447 return 0;
448 }
449 COLLECTOR.with(|c| c.borrow_mut().start_detached_with_links(kind, name, links))
450}
451
452pub fn span_set_metadata(span_id: u64, key: &str, value: serde_json::Value) {
454 if span_id == 0 {
455 return;
456 }
457 COLLECTOR.with(|c| c.borrow_mut().set_metadata(span_id, key, value));
458}
459
460pub fn span_end(span_id: u64) -> Option<Span> {
463 if span_id == 0 {
464 return None;
465 }
466 COLLECTOR.with(|c| c.borrow_mut().end(span_id))
467}
468
469pub fn span_start_user_timing(
474 name: String,
475 attrs: BTreeMap<String, serde_json::Value>,
476) -> (u64, String, Option<u64>, u64) {
477 COLLECTOR.with(|c| {
478 let mut c = c.borrow_mut();
479 let id = c.start(SpanKind::UserTiming, name);
480 for (key, value) in attrs {
481 c.set_metadata(id, &key, value);
482 }
483 let parent = c.open.get(&id).and_then(|span| span.parent_id);
484 let trace_id = c
485 .open
486 .get(&id)
487 .map(|span| span.trace_id.clone())
488 .unwrap_or_default();
489 let start_unix_ms = c.open_start_unix_ms(id).unwrap_or(0);
490 (id, trace_id, parent, start_unix_ms)
491 })
492}
493
494pub fn span_record_event(
498 span_id: u64,
499 name: String,
500 attributes: BTreeMap<String, serde_json::Value>,
501) -> bool {
502 if span_id == 0 {
503 return false;
504 }
505 COLLECTOR.with(|c| c.borrow_mut().record_event(span_id, name, attributes))
506}
507
508pub fn span_attach_metadata(span_id: u64, key: &str, value: serde_json::Value) {
511 if span_id == 0 {
512 return;
513 }
514 COLLECTOR.with(|c| c.borrow_mut().set_metadata(span_id, key, value));
515}
516
517pub fn span_attach_metadata_if_absent(span_id: u64, key: &str, value: serde_json::Value) {
519 if span_id == 0 {
520 return;
521 }
522 COLLECTOR.with(|c| {
523 c.borrow_mut()
524 .attach_metadata_if_absent(span_id, key, value);
525 });
526}
527
528pub fn current_span_id() -> Option<u64> {
530 if !is_tracing_enabled() {
531 return None;
532 }
533 COLLECTOR.with(|c| c.borrow().current_span_id())
534}
535
536pub fn span_link(span_id: u64) -> Option<SpanLink> {
538 if span_id == 0 || !is_tracing_enabled() {
539 return None;
540 }
541 COLLECTOR.with(|c| c.borrow().span_link(span_id))
542}
543
544pub fn current_span_link() -> Option<SpanLink> {
546 if !is_tracing_enabled() {
547 return None;
548 }
549 COLLECTOR.with(|c| c.borrow().current_span_link())
550}
551
552pub fn take_spans() -> Vec<Span> {
554 COLLECTOR.with(|c| c.borrow_mut().take_spans())
555}
556
557pub fn peek_spans() -> Vec<Span> {
559 COLLECTOR.with(|c| c.borrow().spans().to_vec())
560}
561
562pub fn reset_tracing() {
564 COLLECTOR.with(|c| c.borrow_mut().reset());
565}
566
567pub fn span_to_vm_value(span: &Span) -> VmValue {
569 let mut d = BTreeMap::new();
570 d.insert(
571 "trace_id".into(),
572 VmValue::String(std::sync::Arc::from(span.trace_id.as_str())),
573 );
574 d.insert("span_id".into(), VmValue::Int(span.span_id as i64));
575 d.insert(
576 "parent_id".into(),
577 span.parent_id
578 .map(|id| VmValue::Int(id as i64))
579 .unwrap_or(VmValue::Nil),
580 );
581 d.insert(
582 "kind".into(),
583 VmValue::String(std::sync::Arc::from(span.kind.as_str())),
584 );
585 d.insert(
586 "name".into(),
587 VmValue::String(std::sync::Arc::from(span.name.as_str())),
588 );
589 d.insert("start_ms".into(), VmValue::Int(span.start_ms as i64));
590 d.insert(
591 "start_unix_ms".into(),
592 VmValue::Int(span.start_unix_ms as i64),
593 );
594 d.insert("duration_ms".into(), VmValue::Int(span.duration_ms as i64));
595
596 if !span.metadata.is_empty() {
597 let meta: BTreeMap<String, VmValue> = span
598 .metadata
599 .iter()
600 .map(|(k, v)| (k.clone(), crate::stdlib::json_to_vm_value(v)))
601 .collect();
602 d.insert("metadata".into(), VmValue::Dict(std::sync::Arc::new(meta)));
603 }
604 if !span.links.is_empty() {
605 d.insert(
606 "links".into(),
607 crate::stdlib::json_to_vm_value(&serde_json::json!(span.links)),
608 );
609 }
610 if !span.events.is_empty() {
611 d.insert(
612 "events".into(),
613 crate::stdlib::json_to_vm_value(&serde_json::json!(span.events)),
614 );
615 }
616
617 VmValue::Dict(std::sync::Arc::new(d))
618}
619
620pub fn format_summary() -> String {
622 let spans = peek_spans();
623 if spans.is_empty() {
624 return "No spans recorded.".into();
625 }
626
627 let mut lines = Vec::new();
628 let total_ms: u64 = spans
629 .iter()
630 .filter(|s| s.parent_id.is_none())
631 .map(|s| s.duration_ms)
632 .sum();
633
634 lines.push(format!("Trace: {} spans, {total_ms}ms total", spans.len()));
635 lines.push(String::new());
636
637 fn print_tree(spans: &[Span], parent_id: Option<u64>, depth: usize, lines: &mut Vec<String>) {
638 let children: Vec<&Span> = spans.iter().filter(|s| s.parent_id == parent_id).collect();
639 for span in children {
640 let indent = " ".repeat(depth);
641 let meta_str = if span.metadata.is_empty() {
642 String::new()
643 } else {
644 let parts: Vec<String> = span
645 .metadata
646 .iter()
647 .map(|(k, v)| format!("{k}={v}"))
648 .collect();
649 format!(" ({})", parts.join(", "))
650 };
651 lines.push(format!(
652 "{indent}{} {} {}ms{meta_str}",
653 span.kind.as_str(),
654 span.name,
655 span.duration_ms,
656 ));
657 print_tree(spans, Some(span.span_id), depth + 1, lines);
658 }
659 }
660
661 print_tree(&spans, None, 0, &mut lines);
662 lines.join("\n")
663}
664
665#[cfg(test)]
666mod tests {
667 use super::*;
668
669 #[test]
670 fn test_span_collector_basic() {
671 let mut c = SpanCollector::new();
672 let id = c.start(SpanKind::Pipeline, "main".into());
673 assert_eq!(id, 1);
674 assert_eq!(c.current_span_id(), Some(1));
675 assert!(c.span_link(id).is_some());
676 c.end(id);
677 assert_eq!(c.current_span_id(), None);
678 assert_eq!(c.spans().len(), 1);
679 assert_eq!(c.spans()[0].name, "main");
680 assert_eq!(c.spans()[0].parent_id, None);
681 }
682
683 #[test]
684 fn test_span_parent_child() {
685 let mut c = SpanCollector::new();
686 let parent = c.start(SpanKind::Pipeline, "main".into());
687 let child = c.start(SpanKind::FnCall, "helper".into());
688 c.end(child);
689 c.end(parent);
690 assert_eq!(c.spans().len(), 2);
691 assert_eq!(c.spans()[0].parent_id, Some(parent));
692 assert_eq!(c.spans()[1].parent_id, None);
693 }
694
695 #[test]
696 fn test_span_metadata() {
697 let mut c = SpanCollector::new();
698 let id = c.start(SpanKind::LlmCall, "gpt-4".into());
699 c.set_metadata(id, "tokens", serde_json::json!(100));
700 c.end(id);
701 assert_eq!(c.spans()[0].metadata["tokens"], serde_json::json!(100));
702 }
703
704 #[test]
705 fn test_completed_span_metadata_can_be_attached_late() {
706 let mut c = SpanCollector::new();
707 let id = c.start(SpanKind::LlmCall, "gpt-4".into());
708 c.end(id);
709
710 c.attach_metadata_if_absent(id, "first_token_ms", serde_json::json!(125));
711 c.attach_metadata_if_absent(id, "first_token_ms", serde_json::json!(250));
712
713 assert_eq!(
714 c.spans()[0].metadata["first_token_ms"],
715 serde_json::json!(125)
716 );
717 }
718
719 #[test]
720 fn test_span_links_are_preserved() {
721 let mut c = SpanCollector::new();
722 let parent = c.start(SpanKind::Suspension, "suspend worker".into());
723 let link = c.span_link(parent).expect("link for open span");
724 c.end(parent);
725
726 let child = c.start_with_links(SpanKind::Resume, "resume worker".into(), vec![link]);
727 c.end(child);
728
729 assert_eq!(c.spans().len(), 2);
730 assert_eq!(c.spans()[1].parent_id, None);
731 assert_eq!(c.spans()[1].links.len(), 1);
732 assert_eq!(c.spans()[1].links[0].span_id, parent.to_string());
733 }
734
735 #[test]
736 fn test_detached_span_links_do_not_inherit_active_parent() {
737 let mut c = SpanCollector::new();
738 let pipeline = c.start(SpanKind::Pipeline, "pipeline".into());
739 let link = c.span_link(pipeline).expect("pipeline link");
740 let drain = c.start_detached_with_links(SpanKind::Drain, "drain".into(), vec![link]);
741 c.end(drain);
742 c.end(pipeline);
743
744 let drain = c
745 .spans()
746 .iter()
747 .find(|span| span.kind == SpanKind::Drain)
748 .expect("drain span");
749 assert_eq!(drain.parent_id, None);
750 assert_eq!(drain.links.len(), 1);
751 assert_eq!(drain.links[0].span_id, pipeline.to_string());
752 }
753
754 #[test]
755 fn test_noop_when_disabled() {
756 set_tracing_enabled(false);
757 let id = span_start(SpanKind::Pipeline, "test".into());
758 assert_eq!(id, 0);
759 assert!(span_end(id).is_none());
760 }
761
762 #[test]
763 fn test_user_timing_records_when_tracing_disabled() {
764 set_tracing_enabled(false);
769 reset_tracing();
770 let mut attrs = BTreeMap::new();
771 attrs.insert("phase".into(), serde_json::json!("warmup"));
772 let (id, trace_id, parent, start_unix_ms) =
773 span_start_user_timing("script.work".into(), attrs);
774 assert!(id != 0);
775 assert!(!trace_id.is_empty());
776 assert_eq!(parent, None);
777 assert!(start_unix_ms > 0);
778
779 assert!(span_record_event(id, "checkpoint".into(), BTreeMap::new()));
780
781 let closed = span_end(id).expect("user timing always records");
782 assert_eq!(closed.kind, SpanKind::UserTiming);
783 assert_eq!(closed.events.len(), 1);
784 assert_eq!(closed.events[0].name, "checkpoint");
785 assert_eq!(closed.metadata["phase"], serde_json::json!("warmup"));
786
787 let snapshot = peek_spans();
791 assert!(snapshot
792 .iter()
793 .any(|span| span.kind == SpanKind::UserTiming && span.name == "script.work"));
794 }
795
796 #[test]
797 fn test_span_event_offset_is_monotonic() {
798 let clock = crate::clock_mock::MockClock::at_wall_ms(1_000_000_000_000);
804 let _guard = crate::clock_mock::install_override(clock.clone());
805 let mut c = SpanCollector::new();
806 let id = c.start(SpanKind::UserTiming, "outer".into());
807 assert!(c.record_event(id, "before".into(), BTreeMap::new()));
808 clock.advance_std_sync(std::time::Duration::from_millis(10));
809 assert!(c.record_event(id, "after".into(), BTreeMap::new()));
810 let closed = c.end(id).expect("open span");
811 assert_eq!(closed.events.len(), 2);
812 assert!(
813 closed.events[1].offset_ms > closed.events[0].offset_ms,
814 "second event should have a strictly greater offset after a 10ms advance; \
815 before={} after={}",
816 closed.events[0].offset_ms,
817 closed.events[1].offset_ms
818 );
819 }
820}