langgraph_tracing/
store.rs1use crate::types::*;
2use std::collections::HashMap;
3use std::sync::{Arc, RwLock};
4
5pub trait TracingStore: Send + Sync {
7 fn create_trace(&self, trace: Trace);
8 fn update_trace(&self, trace: Trace);
9 fn get_trace(&self, trace_id: &str) -> Option<TraceDetail>;
10 fn list_traces(&self, filter: &TraceFilter) -> Vec<TraceSummary>;
11 fn add_span(&self, span: Span);
12 fn update_span(&self, span: Span);
13 fn clear(&self);
14}
15
16#[derive(Debug, Clone, Default)]
18pub struct TraceFilter {
19 pub status: Option<TraceStatus>,
20 pub name_contains: Option<String>,
21 pub limit: Option<usize>,
22 pub offset: Option<usize>,
23}
24
25#[derive(Clone)]
27pub struct InMemoryTracingStore {
28 traces: Arc<RwLock<HashMap<String, Trace>>>,
29 spans: Arc<RwLock<HashMap<String, Vec<Span>>>>,
30}
31
32impl InMemoryTracingStore {
33 pub fn new() -> Self {
34 Self {
35 traces: Arc::new(RwLock::new(HashMap::new())),
36 spans: Arc::new(RwLock::new(HashMap::new())),
37 }
38 }
39}
40
41impl Default for InMemoryTracingStore {
42 fn default() -> Self {
43 Self::new()
44 }
45}
46
47impl TracingStore for InMemoryTracingStore {
48 fn create_trace(&self, trace: Trace) {
49 let mut traces = self.traces.write().unwrap();
50 traces.insert(trace.id.clone(), trace);
51 }
52
53 fn update_trace(&self, trace: Trace) {
54 let mut traces = self.traces.write().unwrap();
55 traces.insert(trace.id.clone(), trace);
56 }
57
58 fn get_trace(&self, trace_id: &str) -> Option<TraceDetail> {
59 let traces = self.traces.read().unwrap();
60 let spans = self.spans.read().unwrap();
61 traces.get(trace_id).map(|trace| TraceDetail {
62 trace: trace.clone(),
63 spans: spans.get(trace_id).cloned().unwrap_or_default(),
64 })
65 }
66
67 fn list_traces(&self, filter: &TraceFilter) -> Vec<TraceSummary> {
68 let traces = self.traces.read().unwrap();
69 let spans = self.spans.read().unwrap();
70
71 let mut summaries: Vec<TraceSummary> = traces
72 .values()
73 .filter(|t| {
74 if let Some(ref status) = filter.status {
75 if &t.status != status {
76 return false;
77 }
78 }
79 if let Some(ref name) = filter.name_contains {
80 if !t.name.contains(name.as_str()) {
81 return false;
82 }
83 }
84 true
85 })
86 .map(|t| {
87 let mut summary = TraceSummary::from(t);
88 summary.span_count = spans.get(&t.id).map(|s| s.len()).unwrap_or(0);
89 summary
90 })
91 .collect();
92
93 summaries.sort_by(|a, b| b.start_time.cmp(&a.start_time));
95
96 let offset = filter.offset.unwrap_or(0);
97 let limit = filter.limit.unwrap_or(summaries.len());
98
99 summaries
100 .into_iter()
101 .skip(offset)
102 .take(limit)
103 .collect()
104 }
105
106 fn add_span(&self, span: Span) {
107 let mut spans = self.spans.write().unwrap();
108 spans
109 .entry(span.trace_id.clone())
110 .or_default()
111 .push(span);
112 }
113
114 fn update_span(&self, span: Span) {
115 let mut spans = self.spans.write().unwrap();
116 if let Some(trace_spans) = spans.get_mut(&span.trace_id) {
117 if let Some(existing) = trace_spans.iter_mut().find(|s| s.id == span.id) {
118 *existing = span;
119 }
120 }
121 }
122
123 fn clear(&self) {
124 self.traces.write().unwrap().clear();
125 self.spans.write().unwrap().clear();
126 }
127}