langgraph_tracing/
observer.rs1use crate::event_bus::{EventBus, TracingEvent};
2use crate::store::TracingStore;
3use crate::types::*;
4use serde_json::Value as JsonValue;
5use std::sync::Arc;
6use uuid::Uuid;
7
8pub struct TracingGraphObserver {
13 store: Arc<dyn TracingStore>,
14 event_bus: EventBus,
15 current_trace_id: Option<String>,
17}
18
19impl TracingGraphObserver {
20 pub fn new(store: Arc<dyn TracingStore>, event_bus: EventBus) -> Self {
21 Self {
22 store,
23 event_bus,
24 current_trace_id: None,
25 }
26 }
27
28 pub fn on_graph_start(&mut self, name: &str, input: JsonValue) -> String {
30 let trace_id = Uuid::new_v4().to_string();
31 let trace = Trace::new(trace_id.clone(), name.to_string(), input);
32
33 let summary = TraceSummary::from(&trace);
34 self.store.create_trace(trace);
35 self.event_bus.publish(TracingEvent::TraceCreated { trace: summary });
36
37 self.current_trace_id = Some(trace_id.clone());
38 trace_id
39 }
40
41 pub fn on_graph_end(&self, trace_id: &str, output: JsonValue, status: TraceStatus) {
43 if let Some(mut detail) = self.store.get_trace(trace_id) {
44 detail.trace.finish(output, status);
45 let summary = TraceSummary::from(&detail.trace);
46 self.store.update_trace(detail.trace);
47 self.event_bus.publish(TracingEvent::TraceUpdated { trace: summary });
48 }
49 }
50
51 pub fn on_node_start(
53 &self,
54 trace_id: &str,
55 parent_span_id: Option<&str>,
56 node_name: &str,
57 input: JsonValue,
58 ) -> String {
59 let span_id = Uuid::new_v4().to_string();
60 let span = Span::new(
61 span_id.clone(),
62 trace_id.to_string(),
63 parent_span_id.map(|s| s.to_string()),
64 node_name.to_string(),
65 SpanType::GraphNode,
66 input,
67 );
68
69 self.store.add_span(span.clone());
70 self.event_bus.publish(TracingEvent::SpanCreated { span });
71 span_id
72 }
73
74 pub fn on_node_end(&self, span_id: &str, trace_id: &str, output: JsonValue, success: bool) {
76 if let Some(detail) = self.store.get_trace(trace_id) {
77 if let Some(mut span) = detail.spans.into_iter().find(|s| s.id == span_id) {
78 let status = if success { SpanStatus::Success } else { SpanStatus::Error };
79 span.finish(output, status);
80 self.store.update_span(span.clone());
81 self.event_bus.publish(TracingEvent::SpanUpdated { span });
82 }
83 }
84 }
85
86 pub fn on_llm_start(
88 &self,
89 trace_id: &str,
90 parent_span_id: Option<&str>,
91 model: &str,
92 input: JsonValue,
93 ) -> String {
94 let span_id = Uuid::new_v4().to_string();
95 let mut span = Span::new(
96 span_id.clone(),
97 trace_id.to_string(),
98 parent_span_id.map(|s| s.to_string()),
99 model.to_string(),
100 SpanType::LlmGeneration,
101 input,
102 );
103 span.metadata.model = Some(model.to_string());
104
105 self.store.add_span(span.clone());
106 self.event_bus.publish(TracingEvent::SpanCreated { span });
107 span_id
108 }
109
110 pub fn on_llm_end(
112 &self,
113 span_id: &str,
114 trace_id: &str,
115 output: JsonValue,
116 success: bool,
117 tokens_in: Option<u32>,
118 tokens_out: Option<u32>,
119 ) {
120 if let Some(detail) = self.store.get_trace(trace_id) {
121 if let Some(mut span) = detail.spans.into_iter().find(|s| s.id == span_id) {
122 let status = if success { SpanStatus::Success } else { SpanStatus::Error };
123 span.finish(output, status);
124 span.metadata.tokens_in = tokens_in;
125 span.metadata.tokens_out = tokens_out;
126 span.metadata.total_tokens = match (tokens_in, tokens_out) {
127 (Some(a), Some(b)) => Some(a + b),
128 _ => None,
129 };
130 self.store.update_span(span.clone());
131 self.event_bus.publish(TracingEvent::SpanUpdated { span });
132 }
133 }
134 }
135
136 pub fn on_tool_start(
138 &self,
139 trace_id: &str,
140 parent_span_id: Option<&str>,
141 tool_name: &str,
142 input: JsonValue,
143 ) -> String {
144 let span_id = Uuid::new_v4().to_string();
145 let mut span = Span::new(
146 span_id.clone(),
147 trace_id.to_string(),
148 parent_span_id.map(|s| s.to_string()),
149 tool_name.to_string(),
150 SpanType::ToolCall,
151 input,
152 );
153 span.metadata.tool_name = Some(tool_name.to_string());
154
155 self.store.add_span(span.clone());
156 self.event_bus.publish(TracingEvent::SpanCreated { span });
157 span_id
158 }
159
160 pub fn on_tool_end(&self, span_id: &str, trace_id: &str, output: JsonValue, success: bool) {
162 if let Some(detail) = self.store.get_trace(trace_id) {
163 if let Some(mut span) = detail.spans.into_iter().find(|s| s.id == span_id) {
164 let status = if success { SpanStatus::Success } else { SpanStatus::Error };
165 span.finish(output, status);
166 self.store.update_span(span.clone());
167 self.event_bus.publish(TracingEvent::SpanUpdated { span });
168 }
169 }
170 }
171
172 pub fn store(&self) -> &dyn TracingStore {
173 self.store.as_ref()
174 }
175
176 pub fn event_bus(&self) -> &EventBus {
177 &self.event_bus
178 }
179}