Skip to main content

langgraph_tracing/
observer.rs

1use crate::event_bus::{EventBus, TracingEvent};
2use crate::store::TracingStore;
3use crate::types::*;
4use serde_json::Value as JsonValue;
5use std::sync::Arc;
6use uuid::Uuid;
7
8/// Observer that records graph execution traces and spans.
9///
10/// Can be used manually or integrated with the streaming system
11/// to automatically capture graph runs and node executions.
12pub struct TracingGraphObserver {
13    store: Arc<dyn TracingStore>,
14    event_bus: EventBus,
15    /// The current active trace (if any)
16    current_trace_id: Option<String>,
17}
18
19impl TracingGraphObserver {
20    pub fn new(store: Arc<dyn TracingStore>, event_bus: EventBus) -> Self {
21        Self {
22            store,
23            event_bus,
24            current_trace_id: None,
25        }
26    }
27
28    /// Start observing a new graph run
29    pub fn on_graph_start(&mut self, name: &str, input: JsonValue) -> String {
30        let trace_id = Uuid::new_v4().to_string();
31        let trace = Trace::new(trace_id.clone(), name.to_string(), input);
32
33        let summary = TraceSummary::from(&trace);
34        self.store.create_trace(trace);
35        self.event_bus.publish(TracingEvent::TraceCreated { trace: summary });
36
37        self.current_trace_id = Some(trace_id.clone());
38        trace_id
39    }
40
41    /// Record graph run completion
42    pub fn on_graph_end(&self, trace_id: &str, output: JsonValue, status: TraceStatus) {
43        if let Some(mut detail) = self.store.get_trace(trace_id) {
44            detail.trace.finish(output, status);
45            let summary = TraceSummary::from(&detail.trace);
46            self.store.update_trace(detail.trace);
47            self.event_bus.publish(TracingEvent::TraceUpdated { trace: summary });
48        }
49    }
50
51    /// Start a node execution span
52    pub fn on_node_start(
53        &self,
54        trace_id: &str,
55        parent_span_id: Option<&str>,
56        node_name: &str,
57        input: JsonValue,
58    ) -> String {
59        let span_id = Uuid::new_v4().to_string();
60        let span = Span::new(
61            span_id.clone(),
62            trace_id.to_string(),
63            parent_span_id.map(|s| s.to_string()),
64            node_name.to_string(),
65            SpanType::GraphNode,
66            input,
67        );
68
69        self.store.add_span(span.clone());
70        self.event_bus.publish(TracingEvent::SpanCreated { span });
71        span_id
72    }
73
74    /// Record node execution completion
75    pub fn on_node_end(&self, span_id: &str, trace_id: &str, output: JsonValue, success: bool) {
76        if let Some(detail) = self.store.get_trace(trace_id) {
77            if let Some(mut span) = detail.spans.into_iter().find(|s| s.id == span_id) {
78                let status = if success { SpanStatus::Success } else { SpanStatus::Error };
79                span.finish(output, status);
80                self.store.update_span(span.clone());
81                self.event_bus.publish(TracingEvent::SpanUpdated { span });
82            }
83        }
84    }
85
86    /// Record an LLM generation span
87    pub fn on_llm_start(
88        &self,
89        trace_id: &str,
90        parent_span_id: Option<&str>,
91        model: &str,
92        input: JsonValue,
93    ) -> String {
94        let span_id = Uuid::new_v4().to_string();
95        let mut span = Span::new(
96            span_id.clone(),
97            trace_id.to_string(),
98            parent_span_id.map(|s| s.to_string()),
99            model.to_string(),
100            SpanType::LlmGeneration,
101            input,
102        );
103        span.metadata.model = Some(model.to_string());
104
105        self.store.add_span(span.clone());
106        self.event_bus.publish(TracingEvent::SpanCreated { span });
107        span_id
108    }
109
110    /// Record LLM generation completion with token usage
111    pub fn on_llm_end(
112        &self,
113        span_id: &str,
114        trace_id: &str,
115        output: JsonValue,
116        success: bool,
117        tokens_in: Option<u32>,
118        tokens_out: Option<u32>,
119    ) {
120        if let Some(detail) = self.store.get_trace(trace_id) {
121            if let Some(mut span) = detail.spans.into_iter().find(|s| s.id == span_id) {
122                let status = if success { SpanStatus::Success } else { SpanStatus::Error };
123                span.finish(output, status);
124                span.metadata.tokens_in = tokens_in;
125                span.metadata.tokens_out = tokens_out;
126                span.metadata.total_tokens = match (tokens_in, tokens_out) {
127                    (Some(a), Some(b)) => Some(a + b),
128                    _ => None,
129                };
130                self.store.update_span(span.clone());
131                self.event_bus.publish(TracingEvent::SpanUpdated { span });
132            }
133        }
134    }
135
136    /// Record a tool call span
137    pub fn on_tool_start(
138        &self,
139        trace_id: &str,
140        parent_span_id: Option<&str>,
141        tool_name: &str,
142        input: JsonValue,
143    ) -> String {
144        let span_id = Uuid::new_v4().to_string();
145        let mut span = Span::new(
146            span_id.clone(),
147            trace_id.to_string(),
148            parent_span_id.map(|s| s.to_string()),
149            tool_name.to_string(),
150            SpanType::ToolCall,
151            input,
152        );
153        span.metadata.tool_name = Some(tool_name.to_string());
154
155        self.store.add_span(span.clone());
156        self.event_bus.publish(TracingEvent::SpanCreated { span });
157        span_id
158    }
159
160    /// Record tool call completion
161    pub fn on_tool_end(&self, span_id: &str, trace_id: &str, output: JsonValue, success: bool) {
162        if let Some(detail) = self.store.get_trace(trace_id) {
163            if let Some(mut span) = detail.spans.into_iter().find(|s| s.id == span_id) {
164                let status = if success { SpanStatus::Success } else { SpanStatus::Error };
165                span.finish(output, status);
166                self.store.update_span(span.clone());
167                self.event_bus.publish(TracingEvent::SpanUpdated { span });
168            }
169        }
170    }
171
172    pub fn store(&self) -> &dyn TracingStore {
173        self.store.as_ref()
174    }
175
176    pub fn event_bus(&self) -> &EventBus {
177        &self.event_bus
178    }
179}