simple_agents_workflow/observability/
tracing.rs1use std::collections::{BTreeMap, HashMap};
2use std::sync::{Once, OnceLock};
3
4use opentelemetry::global;
5use opentelemetry::propagation::{Extractor, TextMapPropagator};
6use opentelemetry::trace::{Span as OtelSpan, Tracer};
7use opentelemetry::{Context, KeyValue};
8use opentelemetry_otlp::WithExportConfig;
9use opentelemetry_sdk::trace::TracerProvider as SdkTracerProvider;
10use opentelemetry_sdk::{propagation::TraceContextPropagator, Resource};
11
12#[derive(Debug, Clone, Default, PartialEq, Eq)]
14pub struct TraceContext {
15 pub trace_id: Option<String>,
16 pub span_id: Option<String>,
17 pub parent_span_id: Option<String>,
18 pub traceparent: Option<String>,
19 pub tracestate: Option<String>,
20 pub baggage: BTreeMap<String, String>,
21}
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum SpanKind {
26 Workflow,
27 Node,
28}
29
30pub trait WorkflowSpan: Send {
32 fn set_attribute(&mut self, key: &str, value: &str);
33 fn add_event(&mut self, name: &str);
34 fn end(self: Box<Self>);
35}
36
37pub trait WorkflowTracer: Send + Sync {
39 fn start_span(
40 &self,
41 name: &str,
42 kind: SpanKind,
43 parent: Option<&TraceContext>,
44 ) -> (TraceContext, Box<dyn WorkflowSpan>);
45}
46
47static WORKFLOW_TRACER: OnceLock<Box<dyn WorkflowTracer>> = OnceLock::new();
48static OTEL_PROVIDER: OnceLock<Result<SdkTracerProvider, String>> = OnceLock::new();
49static OTEL_GLOBAL_SET: Once = Once::new();
50
51pub fn workflow_tracer() -> &'static dyn WorkflowTracer {
52 WORKFLOW_TRACER
53 .get_or_init(|| {
54 if OtelWorkflowTracer::init_from_env().is_ok() {
55 Box::new(OtelWorkflowTracer)
56 } else {
57 Box::new(NoopWorkflowTracer)
58 }
59 })
60 .as_ref()
61}
62
63pub fn flush_workflow_tracer() {
64 if let Some(Ok(provider)) = OTEL_PROVIDER.get() {
65 let _ = provider.force_flush();
66 }
67}
68
69#[derive(Debug, Default)]
71pub struct NoopWorkflowSpan;
72
73impl WorkflowSpan for NoopWorkflowSpan {
74 fn set_attribute(&mut self, _key: &str, _value: &str) {}
75
76 fn add_event(&mut self, _name: &str) {}
77
78 fn end(self: Box<Self>) {}
79}
80
81#[derive(Debug, Default)]
83pub struct NoopWorkflowTracer;
84
85impl WorkflowTracer for NoopWorkflowTracer {
86 fn start_span(
87 &self,
88 _name: &str,
89 _kind: SpanKind,
90 parent: Option<&TraceContext>,
91 ) -> (TraceContext, Box<dyn WorkflowSpan>) {
92 (
93 parent.cloned().unwrap_or_default(),
94 Box::<NoopWorkflowSpan>::default(),
95 )
96 }
97}
98
99#[derive(Debug)]
100pub struct OtelWorkflowTracer;
101
102#[derive(Debug)]
103pub struct OtelWorkflowSpan {
104 inner: opentelemetry::global::BoxedSpan,
105}
106
107impl WorkflowSpan for OtelWorkflowSpan {
108 fn set_attribute(&mut self, key: &str, value: &str) {
109 self.inner
110 .set_attribute(KeyValue::new(key.to_string(), value.to_string()));
111 }
112
113 fn add_event(&mut self, name: &str) {
114 self.inner.add_event(name.to_string(), Vec::new());
115 }
116
117 fn end(self: Box<Self>) {
118 let mut inner = self.inner;
119 inner.end();
120 }
121}
122
123impl OtelWorkflowTracer {
124 fn init_from_env() -> Result<(), String> {
125 if !std::env::var("SIMPLE_AGENTS_OTEL_ENABLED")
126 .map(|value| value == "1" || value.eq_ignore_ascii_case("true"))
127 .unwrap_or(false)
128 {
129 return Err("otel disabled".to_string());
130 }
131
132 let provider_result = OTEL_PROVIDER.get_or_init(|| {
133 let endpoint = std::env::var("SIMPLE_AGENTS_OTEL_ENDPOINT")
134 .unwrap_or_else(|_| "http://localhost:4317".to_string());
135 let service_name = std::env::var("SIMPLE_AGENTS_SERVICE_NAME")
136 .unwrap_or_else(|_| "simple-agents-workflow".to_string());
137
138 let exporter = opentelemetry_otlp::new_exporter()
139 .tonic()
140 .with_endpoint(endpoint)
141 .build_span_exporter()
142 .map_err(|error| error.to_string())?;
143
144 let provider = SdkTracerProvider::builder()
145 .with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
146 .with_config(opentelemetry_sdk::trace::Config::default().with_resource(
147 Resource::new(vec![KeyValue::new("service.name", service_name)]),
148 ))
149 .build();
150
151 Ok(provider)
152 });
153
154 match provider_result {
155 Ok(provider) => {
156 OTEL_GLOBAL_SET.call_once(|| {
157 global::set_tracer_provider(provider.clone());
158 });
159 Ok(())
160 }
161 Err(error) => Err(error.clone()),
162 }
163 }
164
165 fn parse_parent_context(parent: &TraceContext) -> Option<Context> {
166 let mut headers = HashMap::new();
167 if let Some(traceparent) = parent.traceparent.as_ref() {
168 headers.insert("traceparent".to_string(), traceparent.clone());
169 }
170 if let Some(tracestate) = parent.tracestate.as_ref() {
171 headers.insert("tracestate".to_string(), tracestate.clone());
172 }
173 if !headers.contains_key("traceparent") {
174 if let (Some(trace_id), Some(span_id)) =
175 (parent.trace_id.as_ref(), parent.span_id.as_ref())
176 {
177 let value = format!("00-{trace_id}-{span_id}-01");
178 headers.insert("traceparent".to_string(), value);
179 }
180 }
181 if headers.is_empty() {
182 return None;
183 }
184
185 struct HeaderExtractor<'a> {
186 inner: &'a HashMap<String, String>,
187 }
188
189 impl Extractor for HeaderExtractor<'_> {
190 fn get(&self, key: &str) -> Option<&str> {
191 self.inner.get(key).map(String::as_str)
192 }
193
194 fn keys(&self) -> Vec<&str> {
195 self.inner.keys().map(String::as_str).collect()
196 }
197 }
198
199 let propagator = TraceContextPropagator::new();
200 let extractor = HeaderExtractor { inner: &headers };
201 Some(propagator.extract(&extractor))
202 }
203}
204
205impl WorkflowTracer for OtelWorkflowTracer {
206 fn start_span(
207 &self,
208 name: &str,
209 _kind: SpanKind,
210 parent: Option<&TraceContext>,
211 ) -> (TraceContext, Box<dyn WorkflowSpan>) {
212 let tracer = global::tracer("simple-agents-workflow");
213 let span = match parent.and_then(Self::parse_parent_context) {
214 Some(parent_context) => tracer.start_with_context(name.to_string(), &parent_context),
215 None => tracer.start(name.to_string()),
216 };
217 let span_context = span.span_context().clone();
218 let context = TraceContext {
219 trace_id: Some(span_context.trace_id().to_string()),
220 span_id: Some(span_context.span_id().to_string()),
221 parent_span_id: parent.and_then(|value| value.span_id.clone()),
222 traceparent: Some(format!(
223 "00-{}-{}-{:02x}",
224 span_context.trace_id(),
225 span_context.span_id(),
226 span_context.trace_flags().to_u8()
227 )),
228 tracestate: parent.and_then(|value| value.tracestate.clone()),
229 baggage: parent
230 .map(|value| value.baggage.clone())
231 .unwrap_or_default(),
232 };
233
234 (context, Box::new(OtelWorkflowSpan { inner: span }))
235 }
236}
237
238#[cfg(test)]
239mod tests {
240 use super::{NoopWorkflowTracer, SpanKind, TraceContext, WorkflowTracer};
241
242 #[test]
243 fn noop_tracer_supports_span_lifecycle() {
244 let tracer = NoopWorkflowTracer;
245 let parent = TraceContext {
246 trace_id: Some("trace-1".to_string()),
247 ..TraceContext::default()
248 };
249 let (ctx, mut span) = tracer.start_span("node.llm", SpanKind::Node, Some(&parent));
250 assert_eq!(ctx.trace_id.as_deref(), Some("trace-1"));
251 span.set_attribute("node.id", "llm");
252 span.add_event("start");
253 span.end();
254 }
255}