1use std::cell::RefCell;
11use std::collections::BTreeMap;
12use std::rc::Rc;
13use std::time::{Instant, SystemTime, UNIX_EPOCH};
14
15use crate::value::VmValue;
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum SpanKind {
20 Pipeline,
21 FnCall,
22 LlmCall,
23 ToolCall,
24 Import,
25 Parallel,
26 Spawn,
27 Step,
29 VmSetup,
31 Suspension,
33 Resume,
35 Drain,
37 DrainDecision,
39 PoolSubmit,
41 PoolDequeue,
45 ChannelEmit,
48 ChannelMatch,
53 UserTiming,
57}
58
59impl SpanKind {
60 pub fn as_str(self) -> &'static str {
61 match self {
62 Self::Pipeline => "pipeline",
63 Self::FnCall => "fn_call",
64 Self::LlmCall => "llm_call",
65 Self::ToolCall => "tool_call",
66 Self::Import => "import",
67 Self::Parallel => "parallel",
68 Self::Spawn => "spawn",
69 Self::Step => "step",
70 Self::VmSetup => "vm_setup",
71 Self::Suspension => "suspension",
72 Self::Resume => "resume",
73 Self::Drain => "drain",
74 Self::DrainDecision => "drain_decision",
75 Self::PoolSubmit => "pool_submit",
76 Self::PoolDequeue => "pool_dequeue",
77 Self::ChannelEmit => "channel_emit",
78 Self::ChannelMatch => "channel_match",
79 Self::UserTiming => "user_timing",
80 }
81 }
82}
83
84#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
86#[serde(default)]
87pub struct SpanLink {
88 pub trace_id: String,
89 pub span_id: String,
90 #[serde(skip_serializing_if = "BTreeMap::is_empty")]
91 pub attributes: BTreeMap<String, String>,
92}
93
94impl SpanLink {
95 pub fn new(trace_id: impl Into<String>, span_id: impl Into<String>) -> Self {
96 Self {
97 trace_id: trace_id.into(),
98 span_id: span_id.into(),
99 attributes: BTreeMap::new(),
100 }
101 }
102
103 pub fn with_attributes(mut self, attributes: BTreeMap<String, String>) -> Self {
104 self.attributes = attributes;
105 self
106 }
107}
108
109#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
113#[serde(default)]
114pub struct SpanEvent {
115 pub name: String,
116 pub time_unix_ms: u64,
118 pub offset_ms: u64,
120 #[serde(skip_serializing_if = "BTreeMap::is_empty")]
121 pub attributes: BTreeMap<String, serde_json::Value>,
122}
123
124#[derive(Debug, Clone)]
126pub struct Span {
127 pub trace_id: String,
128 pub span_id: u64,
129 pub parent_id: Option<u64>,
130 pub kind: SpanKind,
131 pub name: String,
132 pub start_ms: u64,
134 pub start_unix_ms: u64,
138 pub duration_ms: u64,
139 pub metadata: BTreeMap<String, serde_json::Value>,
140 pub links: Vec<SpanLink>,
141 pub events: Vec<SpanEvent>,
142}
143
144struct OpenSpan {
146 trace_id: String,
147 span_id: u64,
148 parent_id: Option<u64>,
149 kind: SpanKind,
150 name: String,
151 started_at: Instant,
152 started_at_mock_mono_ms: Option<u64>,
156 start_unix_ms: u64,
157 metadata: BTreeMap<String, serde_json::Value>,
158 links: Vec<SpanLink>,
159 events: Vec<SpanEvent>,
160}
161
162pub struct SpanCollector {
165 trace_id: String,
166 next_id: u64,
167 active_stack: Vec<u64>,
169 open: BTreeMap<u64, OpenSpan>,
171 completed: Vec<Span>,
173 epoch: Instant,
175}
176
177impl Default for SpanCollector {
178 fn default() -> Self {
179 Self::new()
180 }
181}
182
183impl SpanCollector {
184 pub fn new() -> Self {
185 Self {
186 next_id: 1,
187 trace_id: format!("trace_{}", uuid::Uuid::now_v7()),
188 active_stack: Vec::new(),
189 open: BTreeMap::new(),
190 completed: Vec::new(),
191 epoch: Instant::now(),
192 }
193 }
194
195 pub fn start(&mut self, kind: SpanKind, name: String) -> u64 {
197 let parent_id = self.active_stack.last().copied();
198 self.start_with_parent(kind, name, Vec::new(), parent_id)
199 }
200
201 pub fn start_with_links(&mut self, kind: SpanKind, name: String, links: Vec<SpanLink>) -> u64 {
203 let parent_id = self.active_stack.last().copied();
204 self.start_with_parent(kind, name, links, parent_id)
205 }
206
207 pub fn start_detached_with_links(
209 &mut self,
210 kind: SpanKind,
211 name: String,
212 links: Vec<SpanLink>,
213 ) -> u64 {
214 self.start_with_parent(kind, name, links, None)
215 }
216
217 fn start_with_parent(
218 &mut self,
219 kind: SpanKind,
220 name: String,
221 links: Vec<SpanLink>,
222 parent_id: Option<u64>,
223 ) -> u64 {
224 let id = self.next_id;
225 self.next_id += 1;
226 let now = Instant::now();
227 let started_at_mock_mono_ms = mock_monotonic_ms();
228 let start_unix_ms = wall_clock_ms();
229
230 let mut event_metadata = BTreeMap::new();
231 if !links.is_empty() {
232 event_metadata.insert("links".to_string(), serde_json::json!(links));
233 }
234 crate::events::emit_span_start(id, parent_id, &name, kind.as_str(), event_metadata);
235
236 self.open.insert(
237 id,
238 OpenSpan {
239 trace_id: self.trace_id.clone(),
240 span_id: id,
241 parent_id,
242 kind,
243 name,
244 started_at: now,
245 started_at_mock_mono_ms,
246 start_unix_ms,
247 metadata: BTreeMap::new(),
248 links,
249 events: Vec::new(),
250 },
251 );
252 self.active_stack.push(id);
253 id
254 }
255
256 pub fn set_metadata(&mut self, span_id: u64, key: &str, value: serde_json::Value) {
258 if let Some(span) = self.open.get_mut(&span_id) {
259 span.metadata.insert(key.to_string(), value);
260 }
261 }
262
263 pub fn record_event(
267 &mut self,
268 span_id: u64,
269 name: String,
270 attributes: BTreeMap<String, serde_json::Value>,
271 ) -> bool {
272 let Some(span) = self.open.get_mut(&span_id) else {
273 return false;
274 };
275 let offset_ms = match (span.started_at_mock_mono_ms, mock_monotonic_ms()) {
276 (Some(start), Some(now)) => now.saturating_sub(start),
277 _ => span.started_at.elapsed().as_millis() as u64,
278 };
279 span.events.push(SpanEvent {
280 name,
281 time_unix_ms: wall_clock_ms(),
282 offset_ms,
283 attributes,
284 });
285 true
286 }
287
288 pub fn open_start_unix_ms(&self, span_id: u64) -> Option<u64> {
290 self.open.get(&span_id).map(|span| span.start_unix_ms)
291 }
292
293 pub fn end(&mut self, span_id: u64) -> Option<Span> {
297 let span = self.open.remove(&span_id)?;
298 let start_ms = span.started_at.duration_since(self.epoch).as_millis() as u64;
299 let duration_ms = match (span.started_at_mock_mono_ms, mock_monotonic_ms()) {
300 (Some(start), Some(end)) => end.saturating_sub(start),
301 _ => span.started_at.elapsed().as_millis() as u64,
302 };
303
304 let mut end_meta = span.metadata.clone();
305 end_meta.insert(
306 "duration_ms".to_string(),
307 serde_json::Value::Number(serde_json::Number::from(duration_ms)),
308 );
309 crate::events::emit_span_end(span_id, end_meta);
310
311 let completed = Span {
312 trace_id: span.trace_id,
313 span_id: span.span_id,
314 parent_id: span.parent_id,
315 kind: span.kind,
316 name: span.name,
317 start_ms,
318 start_unix_ms: span.start_unix_ms,
319 duration_ms,
320 metadata: span.metadata,
321 links: span.links,
322 events: span.events,
323 };
324 self.completed.push(completed.clone());
325
326 if let Some(pos) = self.active_stack.iter().rposition(|&id| id == span_id) {
327 self.active_stack.remove(pos);
328 }
329 Some(completed)
330 }
331
332 pub fn current_span_id(&self) -> Option<u64> {
334 self.active_stack.last().copied()
335 }
336
337 pub fn span_link(&self, span_id: u64) -> Option<SpanLink> {
339 self.open
340 .get(&span_id)
341 .map(|span| SpanLink::new(span.trace_id.clone(), span.span_id.to_string()))
342 }
343
344 pub fn current_span_link(&self) -> Option<SpanLink> {
346 self.current_span_id()
347 .and_then(|span_id| self.span_link(span_id))
348 }
349
350 pub fn take_spans(&mut self) -> Vec<Span> {
352 std::mem::take(&mut self.completed)
353 }
354
355 pub fn spans(&self) -> &[Span] {
357 &self.completed
358 }
359
360 pub fn reset(&mut self) {
362 self.active_stack.clear();
363 self.open.clear();
364 self.completed.clear();
365 self.next_id = 1;
366 self.trace_id = format!("trace_{}", uuid::Uuid::now_v7());
367 self.epoch = Instant::now();
368 }
369}
370
371thread_local! {
372 static COLLECTOR: RefCell<SpanCollector> = RefCell::new(SpanCollector::new());
373 static TRACING_ENABLED: RefCell<bool> = const { RefCell::new(false) };
374}
375
376fn wall_clock_ms() -> u64 {
381 if let Some(mock) = crate::clock_mock::active_mock_clock() {
382 return mock.now_wall_ms() as u64;
383 }
384 SystemTime::now()
385 .duration_since(UNIX_EPOCH)
386 .map(|d| d.as_millis() as u64)
387 .unwrap_or(0)
388}
389
390fn mock_monotonic_ms() -> Option<u64> {
397 crate::clock_mock::active_mock_clock().map(|mock| mock.now_monotonic_ms() as u64)
398}
399
400pub fn set_tracing_enabled(enabled: bool) {
402 TRACING_ENABLED.with(|e| *e.borrow_mut() = enabled);
403 if enabled {
404 COLLECTOR.with(|c| c.borrow_mut().reset());
405 }
406}
407
408pub fn is_tracing_enabled() -> bool {
410 TRACING_ENABLED.with(|e| *e.borrow())
411}
412
413pub fn span_start(kind: SpanKind, name: String) -> u64 {
415 if !is_tracing_enabled() {
416 return 0;
417 }
418 COLLECTOR.with(|c| c.borrow_mut().start(kind, name))
419}
420
421pub fn span_start_with_links(kind: SpanKind, name: String, links: Vec<SpanLink>) -> u64 {
423 if !is_tracing_enabled() {
424 return 0;
425 }
426 COLLECTOR.with(|c| c.borrow_mut().start_with_links(kind, name, links))
427}
428
429pub fn span_start_detached_with_links(kind: SpanKind, name: String, links: Vec<SpanLink>) -> u64 {
431 if !is_tracing_enabled() {
432 return 0;
433 }
434 COLLECTOR.with(|c| c.borrow_mut().start_detached_with_links(kind, name, links))
435}
436
437pub fn span_set_metadata(span_id: u64, key: &str, value: serde_json::Value) {
439 if span_id == 0 {
440 return;
441 }
442 COLLECTOR.with(|c| c.borrow_mut().set_metadata(span_id, key, value));
443}
444
445pub fn span_end(span_id: u64) -> Option<Span> {
448 if span_id == 0 {
449 return None;
450 }
451 COLLECTOR.with(|c| c.borrow_mut().end(span_id))
452}
453
454pub fn span_start_user_timing(
459 name: String,
460 attrs: BTreeMap<String, serde_json::Value>,
461) -> (u64, String, Option<u64>, u64) {
462 COLLECTOR.with(|c| {
463 let mut c = c.borrow_mut();
464 let id = c.start(SpanKind::UserTiming, name);
465 for (key, value) in attrs {
466 c.set_metadata(id, &key, value);
467 }
468 let parent = c.open.get(&id).and_then(|span| span.parent_id);
469 let trace_id = c
470 .open
471 .get(&id)
472 .map(|span| span.trace_id.clone())
473 .unwrap_or_default();
474 let start_unix_ms = c.open_start_unix_ms(id).unwrap_or(0);
475 (id, trace_id, parent, start_unix_ms)
476 })
477}
478
479pub fn span_record_event(
483 span_id: u64,
484 name: String,
485 attributes: BTreeMap<String, serde_json::Value>,
486) -> bool {
487 if span_id == 0 {
488 return false;
489 }
490 COLLECTOR.with(|c| c.borrow_mut().record_event(span_id, name, attributes))
491}
492
493pub fn span_attach_metadata(span_id: u64, key: &str, value: serde_json::Value) {
496 if span_id == 0 {
497 return;
498 }
499 COLLECTOR.with(|c| c.borrow_mut().set_metadata(span_id, key, value));
500}
501
502pub fn current_span_id() -> Option<u64> {
504 if !is_tracing_enabled() {
505 return None;
506 }
507 COLLECTOR.with(|c| c.borrow().current_span_id())
508}
509
510pub fn span_link(span_id: u64) -> Option<SpanLink> {
512 if span_id == 0 || !is_tracing_enabled() {
513 return None;
514 }
515 COLLECTOR.with(|c| c.borrow().span_link(span_id))
516}
517
518pub fn current_span_link() -> Option<SpanLink> {
520 if !is_tracing_enabled() {
521 return None;
522 }
523 COLLECTOR.with(|c| c.borrow().current_span_link())
524}
525
526pub fn take_spans() -> Vec<Span> {
528 COLLECTOR.with(|c| c.borrow_mut().take_spans())
529}
530
531pub fn peek_spans() -> Vec<Span> {
533 COLLECTOR.with(|c| c.borrow().spans().to_vec())
534}
535
536pub fn reset_tracing() {
538 COLLECTOR.with(|c| c.borrow_mut().reset());
539}
540
541pub fn span_to_vm_value(span: &Span) -> VmValue {
543 let mut d = BTreeMap::new();
544 d.insert(
545 "trace_id".into(),
546 VmValue::String(Rc::from(span.trace_id.as_str())),
547 );
548 d.insert("span_id".into(), VmValue::Int(span.span_id as i64));
549 d.insert(
550 "parent_id".into(),
551 span.parent_id
552 .map(|id| VmValue::Int(id as i64))
553 .unwrap_or(VmValue::Nil),
554 );
555 d.insert("kind".into(), VmValue::String(Rc::from(span.kind.as_str())));
556 d.insert("name".into(), VmValue::String(Rc::from(span.name.as_str())));
557 d.insert("start_ms".into(), VmValue::Int(span.start_ms as i64));
558 d.insert(
559 "start_unix_ms".into(),
560 VmValue::Int(span.start_unix_ms as i64),
561 );
562 d.insert("duration_ms".into(), VmValue::Int(span.duration_ms as i64));
563
564 if !span.metadata.is_empty() {
565 let meta: BTreeMap<String, VmValue> = span
566 .metadata
567 .iter()
568 .map(|(k, v)| (k.clone(), crate::stdlib::json_to_vm_value(v)))
569 .collect();
570 d.insert("metadata".into(), VmValue::Dict(Rc::new(meta)));
571 }
572 if !span.links.is_empty() {
573 d.insert(
574 "links".into(),
575 crate::stdlib::json_to_vm_value(&serde_json::json!(span.links)),
576 );
577 }
578 if !span.events.is_empty() {
579 d.insert(
580 "events".into(),
581 crate::stdlib::json_to_vm_value(&serde_json::json!(span.events)),
582 );
583 }
584
585 VmValue::Dict(Rc::new(d))
586}
587
588pub fn format_summary() -> String {
590 let spans = peek_spans();
591 if spans.is_empty() {
592 return "No spans recorded.".into();
593 }
594
595 let mut lines = Vec::new();
596 let total_ms: u64 = spans
597 .iter()
598 .filter(|s| s.parent_id.is_none())
599 .map(|s| s.duration_ms)
600 .sum();
601
602 lines.push(format!("Trace: {} spans, {total_ms}ms total", spans.len()));
603 lines.push(String::new());
604
605 fn print_tree(spans: &[Span], parent_id: Option<u64>, depth: usize, lines: &mut Vec<String>) {
606 let children: Vec<&Span> = spans.iter().filter(|s| s.parent_id == parent_id).collect();
607 for span in children {
608 let indent = " ".repeat(depth);
609 let meta_str = if span.metadata.is_empty() {
610 String::new()
611 } else {
612 let parts: Vec<String> = span
613 .metadata
614 .iter()
615 .map(|(k, v)| format!("{k}={v}"))
616 .collect();
617 format!(" ({})", parts.join(", "))
618 };
619 lines.push(format!(
620 "{indent}{} {} {}ms{meta_str}",
621 span.kind.as_str(),
622 span.name,
623 span.duration_ms,
624 ));
625 print_tree(spans, Some(span.span_id), depth + 1, lines);
626 }
627 }
628
629 print_tree(&spans, None, 0, &mut lines);
630 lines.join("\n")
631}
632
633#[cfg(test)]
634mod tests {
635 use super::*;
636
637 #[test]
638 fn test_span_collector_basic() {
639 let mut c = SpanCollector::new();
640 let id = c.start(SpanKind::Pipeline, "main".into());
641 assert_eq!(id, 1);
642 assert_eq!(c.current_span_id(), Some(1));
643 assert!(c.span_link(id).is_some());
644 c.end(id);
645 assert_eq!(c.current_span_id(), None);
646 assert_eq!(c.spans().len(), 1);
647 assert_eq!(c.spans()[0].name, "main");
648 assert_eq!(c.spans()[0].parent_id, None);
649 }
650
651 #[test]
652 fn test_span_parent_child() {
653 let mut c = SpanCollector::new();
654 let parent = c.start(SpanKind::Pipeline, "main".into());
655 let child = c.start(SpanKind::FnCall, "helper".into());
656 c.end(child);
657 c.end(parent);
658 assert_eq!(c.spans().len(), 2);
659 assert_eq!(c.spans()[0].parent_id, Some(parent));
660 assert_eq!(c.spans()[1].parent_id, None);
661 }
662
663 #[test]
664 fn test_span_metadata() {
665 let mut c = SpanCollector::new();
666 let id = c.start(SpanKind::LlmCall, "gpt-4".into());
667 c.set_metadata(id, "tokens", serde_json::json!(100));
668 c.end(id);
669 assert_eq!(c.spans()[0].metadata["tokens"], serde_json::json!(100));
670 }
671
672 #[test]
673 fn test_span_links_are_preserved() {
674 let mut c = SpanCollector::new();
675 let parent = c.start(SpanKind::Suspension, "suspend worker".into());
676 let link = c.span_link(parent).expect("link for open span");
677 c.end(parent);
678
679 let child = c.start_with_links(SpanKind::Resume, "resume worker".into(), vec![link]);
680 c.end(child);
681
682 assert_eq!(c.spans().len(), 2);
683 assert_eq!(c.spans()[1].parent_id, None);
684 assert_eq!(c.spans()[1].links.len(), 1);
685 assert_eq!(c.spans()[1].links[0].span_id, parent.to_string());
686 }
687
688 #[test]
689 fn test_detached_span_links_do_not_inherit_active_parent() {
690 let mut c = SpanCollector::new();
691 let pipeline = c.start(SpanKind::Pipeline, "pipeline".into());
692 let link = c.span_link(pipeline).expect("pipeline link");
693 let drain = c.start_detached_with_links(SpanKind::Drain, "drain".into(), vec![link]);
694 c.end(drain);
695 c.end(pipeline);
696
697 let drain = c
698 .spans()
699 .iter()
700 .find(|span| span.kind == SpanKind::Drain)
701 .expect("drain span");
702 assert_eq!(drain.parent_id, None);
703 assert_eq!(drain.links.len(), 1);
704 assert_eq!(drain.links[0].span_id, pipeline.to_string());
705 }
706
707 #[test]
708 fn test_noop_when_disabled() {
709 set_tracing_enabled(false);
710 let id = span_start(SpanKind::Pipeline, "test".into());
711 assert_eq!(id, 0);
712 assert!(span_end(id).is_none());
713 }
714
715 #[test]
716 fn test_user_timing_records_when_tracing_disabled() {
717 set_tracing_enabled(false);
722 reset_tracing();
723 let mut attrs = BTreeMap::new();
724 attrs.insert("phase".into(), serde_json::json!("warmup"));
725 let (id, trace_id, parent, start_unix_ms) =
726 span_start_user_timing("script.work".into(), attrs);
727 assert!(id != 0);
728 assert!(!trace_id.is_empty());
729 assert_eq!(parent, None);
730 assert!(start_unix_ms > 0);
731
732 assert!(span_record_event(id, "checkpoint".into(), BTreeMap::new()));
733
734 let closed = span_end(id).expect("user timing always records");
735 assert_eq!(closed.kind, SpanKind::UserTiming);
736 assert_eq!(closed.events.len(), 1);
737 assert_eq!(closed.events[0].name, "checkpoint");
738 assert_eq!(closed.metadata["phase"], serde_json::json!("warmup"));
739
740 let snapshot = peek_spans();
744 assert!(snapshot
745 .iter()
746 .any(|span| span.kind == SpanKind::UserTiming && span.name == "script.work"));
747 }
748
749 #[test]
750 fn test_span_event_offset_is_monotonic() {
751 let mut c = SpanCollector::new();
752 let id = c.start(SpanKind::UserTiming, "outer".into());
753 assert!(c.record_event(id, "before".into(), BTreeMap::new()));
754 std::thread::sleep(std::time::Duration::from_millis(2));
756 assert!(c.record_event(id, "after".into(), BTreeMap::new()));
757 let closed = c.end(id).expect("open span");
758 assert_eq!(closed.events.len(), 2);
759 assert!(closed.events[1].offset_ms >= closed.events[0].offset_ms);
760 }
761}