Skip to main content

simple_agents_workflow/observability/
tracing.rs

1use std::collections::{BTreeMap, HashMap};
2use std::sync::{Once, OnceLock};
3
4use opentelemetry::global;
5use opentelemetry::propagation::{Extractor, TextMapPropagator};
6use opentelemetry::trace::{Span as OtelSpan, Tracer};
7use opentelemetry::{Context, KeyValue};
8use opentelemetry_otlp::WithExportConfig;
9use opentelemetry_sdk::trace::TracerProvider as SdkTracerProvider;
10use opentelemetry_sdk::{propagation::TraceContextPropagator, Resource};
11
12/// OpenTelemetry-friendly trace context carrier.
13#[derive(Debug, Clone, Default, PartialEq, Eq)]
14pub struct TraceContext {
15    pub trace_id: Option<String>,
16    pub span_id: Option<String>,
17    pub parent_span_id: Option<String>,
18    pub traceparent: Option<String>,
19    pub tracestate: Option<String>,
20    pub baggage: BTreeMap<String, String>,
21}
22
23/// Span operation kind.
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum SpanKind {
26    Workflow,
27    Node,
28}
29
30/// Trait for a mutable span handle.
31pub trait WorkflowSpan: Send {
32    fn set_attribute(&mut self, key: &str, value: &str);
33    fn add_event(&mut self, name: &str);
34    fn end(self: Box<Self>);
35}
36
37/// Workflow-level tracing adapter surface.
38pub trait WorkflowTracer: Send + Sync {
39    fn start_span(
40        &self,
41        name: &str,
42        kind: SpanKind,
43        parent: Option<&TraceContext>,
44    ) -> (TraceContext, Box<dyn WorkflowSpan>);
45}
46
47static WORKFLOW_TRACER: OnceLock<Box<dyn WorkflowTracer>> = OnceLock::new();
48static OTEL_PROVIDER: OnceLock<Result<SdkTracerProvider, String>> = OnceLock::new();
49static OTEL_GLOBAL_SET: Once = Once::new();
50
51pub fn workflow_tracer() -> &'static dyn WorkflowTracer {
52    WORKFLOW_TRACER
53        .get_or_init(|| {
54            if OtelWorkflowTracer::init_from_env().is_ok() {
55                Box::new(OtelWorkflowTracer)
56            } else {
57                Box::new(NoopWorkflowTracer)
58            }
59        })
60        .as_ref()
61}
62
63pub fn flush_workflow_tracer() {
64    if let Some(Ok(provider)) = OTEL_PROVIDER.get() {
65        let _ = provider.force_flush();
66    }
67}
68
69/// No-op span for deployments without tracing backends.
70#[derive(Debug, Default)]
71pub struct NoopWorkflowSpan;
72
73impl WorkflowSpan for NoopWorkflowSpan {
74    fn set_attribute(&mut self, _key: &str, _value: &str) {}
75
76    fn add_event(&mut self, _name: &str) {}
77
78    fn end(self: Box<Self>) {}
79}
80
81/// No-op tracer for deployments without OpenTelemetry.
82#[derive(Debug, Default)]
83pub struct NoopWorkflowTracer;
84
85impl WorkflowTracer for NoopWorkflowTracer {
86    fn start_span(
87        &self,
88        _name: &str,
89        _kind: SpanKind,
90        parent: Option<&TraceContext>,
91    ) -> (TraceContext, Box<dyn WorkflowSpan>) {
92        (
93            parent.cloned().unwrap_or_default(),
94            Box::<NoopWorkflowSpan>::default(),
95        )
96    }
97}
98
99#[derive(Debug)]
100pub struct OtelWorkflowTracer;
101
102#[derive(Debug)]
103pub struct OtelWorkflowSpan {
104    inner: opentelemetry::global::BoxedSpan,
105}
106
107impl WorkflowSpan for OtelWorkflowSpan {
108    fn set_attribute(&mut self, key: &str, value: &str) {
109        self.inner
110            .set_attribute(KeyValue::new(key.to_string(), value.to_string()));
111    }
112
113    fn add_event(&mut self, name: &str) {
114        self.inner.add_event(name.to_string(), Vec::new());
115    }
116
117    fn end(self: Box<Self>) {
118        let mut inner = self.inner;
119        inner.end();
120    }
121}
122
123impl OtelWorkflowTracer {
124    fn init_from_env() -> Result<(), String> {
125        if !std::env::var("SIMPLE_AGENTS_OTEL_ENABLED")
126            .map(|value| value == "1" || value.eq_ignore_ascii_case("true"))
127            .unwrap_or(false)
128        {
129            return Err("otel disabled".to_string());
130        }
131
132        let provider_result = OTEL_PROVIDER.get_or_init(|| {
133            let endpoint = std::env::var("SIMPLE_AGENTS_OTEL_ENDPOINT")
134                .unwrap_or_else(|_| "http://localhost:4317".to_string());
135            let service_name = std::env::var("SIMPLE_AGENTS_SERVICE_NAME")
136                .unwrap_or_else(|_| "simple-agents-workflow".to_string());
137
138            let exporter = opentelemetry_otlp::new_exporter()
139                .tonic()
140                .with_endpoint(endpoint)
141                .build_span_exporter()
142                .map_err(|error| error.to_string())?;
143
144            let provider = SdkTracerProvider::builder()
145                .with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
146                .with_config(opentelemetry_sdk::trace::Config::default().with_resource(
147                    Resource::new(vec![KeyValue::new("service.name", service_name)]),
148                ))
149                .build();
150
151            Ok(provider)
152        });
153
154        match provider_result {
155            Ok(provider) => {
156                OTEL_GLOBAL_SET.call_once(|| {
157                    global::set_tracer_provider(provider.clone());
158                });
159                Ok(())
160            }
161            Err(error) => Err(error.clone()),
162        }
163    }
164
165    fn parse_parent_context(parent: &TraceContext) -> Option<Context> {
166        let mut headers = HashMap::new();
167        if let Some(traceparent) = parent.traceparent.as_ref() {
168            headers.insert("traceparent".to_string(), traceparent.clone());
169        }
170        if let Some(tracestate) = parent.tracestate.as_ref() {
171            headers.insert("tracestate".to_string(), tracestate.clone());
172        }
173        if !headers.contains_key("traceparent") {
174            if let (Some(trace_id), Some(span_id)) =
175                (parent.trace_id.as_ref(), parent.span_id.as_ref())
176            {
177                let value = format!("00-{trace_id}-{span_id}-01");
178                headers.insert("traceparent".to_string(), value);
179            }
180        }
181        if headers.is_empty() {
182            return None;
183        }
184
185        struct HeaderExtractor<'a> {
186            inner: &'a HashMap<String, String>,
187        }
188
189        impl Extractor for HeaderExtractor<'_> {
190            fn get(&self, key: &str) -> Option<&str> {
191                self.inner.get(key).map(String::as_str)
192            }
193
194            fn keys(&self) -> Vec<&str> {
195                self.inner.keys().map(String::as_str).collect()
196            }
197        }
198
199        let propagator = TraceContextPropagator::new();
200        let extractor = HeaderExtractor { inner: &headers };
201        Some(propagator.extract(&extractor))
202    }
203}
204
205impl WorkflowTracer for OtelWorkflowTracer {
206    fn start_span(
207        &self,
208        name: &str,
209        _kind: SpanKind,
210        parent: Option<&TraceContext>,
211    ) -> (TraceContext, Box<dyn WorkflowSpan>) {
212        let tracer = global::tracer("simple-agents-workflow");
213        let span = match parent.and_then(Self::parse_parent_context) {
214            Some(parent_context) => tracer.start_with_context(name.to_string(), &parent_context),
215            None => tracer.start(name.to_string()),
216        };
217        let span_context = span.span_context().clone();
218        let context = TraceContext {
219            trace_id: Some(span_context.trace_id().to_string()),
220            span_id: Some(span_context.span_id().to_string()),
221            parent_span_id: parent.and_then(|value| value.span_id.clone()),
222            traceparent: Some(format!(
223                "00-{}-{}-{:02x}",
224                span_context.trace_id(),
225                span_context.span_id(),
226                span_context.trace_flags().to_u8()
227            )),
228            tracestate: parent.and_then(|value| value.tracestate.clone()),
229            baggage: parent
230                .map(|value| value.baggage.clone())
231                .unwrap_or_default(),
232        };
233
234        (context, Box::new(OtelWorkflowSpan { inner: span }))
235    }
236}
237
238#[cfg(test)]
239mod tests {
240    use super::{NoopWorkflowTracer, SpanKind, TraceContext, WorkflowTracer};
241
242    #[test]
243    fn noop_tracer_supports_span_lifecycle() {
244        let tracer = NoopWorkflowTracer;
245        let parent = TraceContext {
246            trace_id: Some("trace-1".to_string()),
247            ..TraceContext::default()
248        };
249        let (ctx, mut span) = tracer.start_span("node.llm", SpanKind::Node, Some(&parent));
250        assert_eq!(ctx.trace_id.as_deref(), Some("trace-1"));
251        span.set_attribute("node.id", "llm");
252        span.add_event("start");
253        span.end();
254    }
255}