Skip to main content

simple_agents_workflow/observability/
tracing.rs

1use std::collections::{BTreeMap, HashMap};
2use std::sync::{Once, OnceLock};
3
4use opentelemetry::global;
5use opentelemetry::propagation::{Extractor, TextMapPropagator};
6use opentelemetry::trace::{Span as OtelSpan, Tracer};
7use opentelemetry::{Context, KeyValue};
8use opentelemetry_otlp::{Protocol, WithExportConfig};
9use opentelemetry_sdk::trace::TracerProvider as SdkTracerProvider;
10use opentelemetry_sdk::{propagation::TraceContextPropagator, Resource};
11use tonic::metadata::{AsciiMetadataKey, MetadataMap, MetadataValue};
12
13pub const ENV_TRACING_ENABLED: &str = "SIMPLE_AGENTS_TRACING_ENABLED";
14pub const ENV_OTLP_ENDPOINT: &str = "OTEL_EXPORTER_OTLP_ENDPOINT";
15pub const ENV_OTLP_PROTOCOL: &str = "OTEL_EXPORTER_OTLP_PROTOCOL";
16pub const ENV_OTLP_HEADERS: &str = "OTEL_EXPORTER_OTLP_HEADERS";
17pub const ENV_SERVICE_NAME: &str = "OTEL_SERVICE_NAME";
18
19const DEFAULT_SERVICE_NAME: &str = "simple-agents-workflow";
20const DEFAULT_GRPC_ENDPOINT: &str = "http://localhost:4317";
21const DEFAULT_HTTP_ENDPOINT: &str = "http://localhost:4318";
22
23/// OpenTelemetry-friendly trace context carrier.
24#[derive(Debug, Clone, Default, PartialEq, Eq)]
25pub struct TraceContext {
26    pub trace_id: Option<String>,
27    pub span_id: Option<String>,
28    pub parent_span_id: Option<String>,
29    pub traceparent: Option<String>,
30    pub tracestate: Option<String>,
31    pub baggage: BTreeMap<String, String>,
32}
33
34/// Span operation kind.
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub enum SpanKind {
37    Workflow,
38    Node,
39}
40
41/// Trait for a mutable span handle.
42pub trait WorkflowSpan: Send {
43    fn set_attribute(&mut self, key: &str, value: &str);
44    fn add_event(&mut self, name: &str);
45    fn end(self: Box<Self>);
46}
47
48/// Workflow-level tracing adapter surface.
49pub trait WorkflowTracer: Send + Sync {
50    fn start_span(
51        &self,
52        name: &str,
53        kind: SpanKind,
54        parent: Option<&TraceContext>,
55    ) -> (TraceContext, Box<dyn WorkflowSpan>);
56}
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59enum OtlpProtocol {
60    Grpc,
61    HttpProtobuf,
62}
63
64impl OtlpProtocol {
65    fn parse(value: &str) -> Result<Self, String> {
66        let normalized = value.trim().to_ascii_lowercase();
67        match normalized.as_str() {
68            "grpc" => Ok(Self::Grpc),
69            "http/protobuf" => Ok(Self::HttpProtobuf),
70            _ => Err(format!(
71                "{ENV_OTLP_PROTOCOL} must be one of: grpc, http/protobuf"
72            )),
73        }
74    }
75
76    fn to_otlp_protocol(self) -> Protocol {
77        match self {
78            Self::Grpc => Protocol::Grpc,
79            Self::HttpProtobuf => Protocol::HttpBinary,
80        }
81    }
82}
83
84#[derive(Debug, Clone, PartialEq, Eq)]
85struct TracingConfig {
86    enabled: bool,
87    endpoint: String,
88    protocol: OtlpProtocol,
89    headers: HashMap<String, String>,
90    service_name: String,
91}
92
93impl TracingConfig {
94    fn from_env() -> Result<Self, String> {
95        Self::from_lookup(|key| std::env::var(key).ok())
96    }
97
98    fn from_lookup<F>(lookup: F) -> Result<Self, String>
99    where
100        F: Fn(&str) -> Option<String>,
101    {
102        let enabled = lookup(ENV_TRACING_ENABLED)
103            .as_deref()
104            .map(parse_bool)
105            .transpose()?
106            .unwrap_or(false);
107
108        let protocol = lookup(ENV_OTLP_PROTOCOL)
109            .as_deref()
110            .map(OtlpProtocol::parse)
111            .transpose()?
112            .unwrap_or(OtlpProtocol::Grpc);
113
114        let endpoint = lookup(ENV_OTLP_ENDPOINT).unwrap_or_else(|| match protocol {
115            OtlpProtocol::Grpc => DEFAULT_GRPC_ENDPOINT.to_string(),
116            OtlpProtocol::HttpProtobuf => DEFAULT_HTTP_ENDPOINT.to_string(),
117        });
118
119        let service_name = lookup(ENV_SERVICE_NAME)
120            .filter(|value| !value.trim().is_empty())
121            .unwrap_or_else(|| DEFAULT_SERVICE_NAME.to_string());
122
123        let headers = lookup(ENV_OTLP_HEADERS)
124            .as_deref()
125            .map(parse_headers)
126            .transpose()?
127            .unwrap_or_default();
128
129        Ok(Self {
130            enabled,
131            endpoint,
132            protocol,
133            headers,
134            service_name,
135        })
136    }
137}
138
139fn parse_bool(value: &str) -> Result<bool, String> {
140    let normalized = value.trim().to_ascii_lowercase();
141    match normalized.as_str() {
142        "1" | "true" | "yes" | "on" => Ok(true),
143        "0" | "false" | "no" | "off" => Ok(false),
144        _ => Err(format!(
145            "{ENV_TRACING_ENABLED} must be a boolean-like value (true/false/1/0/yes/no/on/off)"
146        )),
147    }
148}
149
150fn parse_headers(raw: &str) -> Result<HashMap<String, String>, String> {
151    let mut headers = HashMap::new();
152    let trimmed = raw.trim();
153    if trimmed.is_empty() {
154        return Ok(headers);
155    }
156
157    for part in trimmed.split(',') {
158        let entry = part.trim();
159        if entry.is_empty() {
160            continue;
161        }
162        let Some((name, value)) = entry.split_once('=') else {
163            return Err(format!(
164                "{ENV_OTLP_HEADERS} must contain comma-separated key=value entries"
165            ));
166        };
167        let key = name.trim();
168        let val = value.trim();
169        if key.is_empty() {
170            return Err(format!("{ENV_OTLP_HEADERS} contains an empty header name"));
171        }
172        if val.is_empty() {
173            return Err(format!(
174                "{ENV_OTLP_HEADERS} header '{key}' has an empty value"
175            ));
176        }
177        headers.insert(key.to_string(), val.to_string());
178    }
179
180    Ok(headers)
181}
182
183struct TracerProviderFactory;
184
185impl TracerProviderFactory {
186    fn build(config: &TracingConfig) -> Result<SdkTracerProvider, String> {
187        let exporter = OtlpExporterFactory::build_span_exporter(config)?;
188
189        let provider = SdkTracerProvider::builder()
190            .with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
191            .with_config(
192                opentelemetry_sdk::trace::Config::default().with_resource(Resource::new(vec![
193                    KeyValue::new("service.name", config.service_name.clone()),
194                ])),
195            )
196            .build();
197
198        Ok(provider)
199    }
200}
201
202struct OtlpExporterFactory;
203
204impl OtlpExporterFactory {
205    fn build_span_exporter(
206        config: &TracingConfig,
207    ) -> Result<opentelemetry_otlp::SpanExporter, String> {
208        match config.protocol {
209            OtlpProtocol::Grpc => {
210                let builder = opentelemetry_otlp::new_exporter()
211                    .tonic()
212                    .with_protocol(config.protocol.to_otlp_protocol())
213                    .with_endpoint(config.endpoint.clone());
214                let builder = if config.headers.is_empty() {
215                    builder
216                } else {
217                    builder.with_metadata(grpc_metadata_from_headers(&config.headers)?)
218                };
219                builder
220                    .build_span_exporter()
221                    .map_err(|error| error.to_string())
222            }
223            OtlpProtocol::HttpProtobuf => opentelemetry_otlp::new_exporter()
224                .http()
225                .with_protocol(config.protocol.to_otlp_protocol())
226                .with_endpoint(normalize_http_traces_endpoint(&config.endpoint))
227                .with_headers(config.headers.clone())
228                .build_span_exporter()
229                .map_err(|error| error.to_string()),
230        }
231    }
232}
233
234fn normalize_http_traces_endpoint(endpoint: &str) -> String {
235    let trimmed = endpoint.trim();
236    if trimmed.is_empty() {
237        return "/v1/traces".to_string();
238    }
239    let without_trailing_slash = trimmed.trim_end_matches('/');
240    if without_trailing_slash.ends_with("/v1/traces") {
241        return without_trailing_slash.to_string();
242    }
243    format!("{without_trailing_slash}/v1/traces")
244}
245
246fn grpc_metadata_from_headers(headers: &HashMap<String, String>) -> Result<MetadataMap, String> {
247    let mut metadata = MetadataMap::new();
248    for (name, value) in headers {
249        let key = AsciiMetadataKey::from_bytes(name.as_bytes())
250            .map_err(|_| format!("{ENV_OTLP_HEADERS} has invalid gRPC header key '{name}'"))?;
251        let metadata_value = MetadataValue::try_from(value.as_str()).map_err(|_| {
252            format!("{ENV_OTLP_HEADERS} has invalid gRPC header value for key '{name}'")
253        })?;
254        metadata.insert(key, metadata_value);
255    }
256    Ok(metadata)
257}
258
259static WORKFLOW_TRACER: OnceLock<Box<dyn WorkflowTracer>> = OnceLock::new();
260static OTEL_PROVIDER: OnceLock<Result<SdkTracerProvider, String>> = OnceLock::new();
261static OTEL_GLOBAL_SET: Once = Once::new();
262
263pub fn workflow_tracer() -> &'static dyn WorkflowTracer {
264    WORKFLOW_TRACER
265        .get_or_init(|| {
266            if OtelWorkflowTracer::init_from_env().is_ok() {
267                Box::new(OtelWorkflowTracer)
268            } else {
269                Box::new(NoopWorkflowTracer)
270            }
271        })
272        .as_ref()
273}
274
275pub fn flush_workflow_tracer() {
276    if let Some(Ok(provider)) = OTEL_PROVIDER.get() {
277        let _ = provider.force_flush();
278    }
279}
280
281/// No-op span for deployments without tracing backends.
282#[derive(Debug, Default)]
283pub struct NoopWorkflowSpan;
284
285impl WorkflowSpan for NoopWorkflowSpan {
286    fn set_attribute(&mut self, _key: &str, _value: &str) {}
287
288    fn add_event(&mut self, _name: &str) {}
289
290    fn end(self: Box<Self>) {}
291}
292
293/// No-op tracer for deployments without OpenTelemetry.
294#[derive(Debug, Default)]
295pub struct NoopWorkflowTracer;
296
297impl WorkflowTracer for NoopWorkflowTracer {
298    fn start_span(
299        &self,
300        _name: &str,
301        _kind: SpanKind,
302        parent: Option<&TraceContext>,
303    ) -> (TraceContext, Box<dyn WorkflowSpan>) {
304        (
305            parent.cloned().unwrap_or_default(),
306            Box::<NoopWorkflowSpan>::default(),
307        )
308    }
309}
310
311#[derive(Debug)]
312pub struct OtelWorkflowTracer;
313
314#[derive(Debug)]
315pub struct OtelWorkflowSpan {
316    inner: opentelemetry::global::BoxedSpan,
317}
318
319impl WorkflowSpan for OtelWorkflowSpan {
320    fn set_attribute(&mut self, key: &str, value: &str) {
321        self.inner
322            .set_attribute(KeyValue::new(key.to_string(), value.to_string()));
323    }
324
325    fn add_event(&mut self, name: &str) {
326        self.inner.add_event(name.to_string(), Vec::new());
327    }
328
329    fn end(self: Box<Self>) {
330        let mut inner = self.inner;
331        inner.end();
332    }
333}
334
335impl OtelWorkflowTracer {
336    fn init_from_env() -> Result<(), String> {
337        let config = TracingConfig::from_env()?;
338        if !config.enabled {
339            return Err("tracing disabled".to_string());
340        }
341
342        let provider_result = OTEL_PROVIDER.get_or_init(|| TracerProviderFactory::build(&config));
343
344        match provider_result {
345            Ok(provider) => {
346                OTEL_GLOBAL_SET.call_once(|| {
347                    global::set_tracer_provider(provider.clone());
348                });
349                Ok(())
350            }
351            Err(error) => Err(error.clone()),
352        }
353    }
354
355    fn parse_parent_context(parent: &TraceContext) -> Option<Context> {
356        let mut headers = HashMap::new();
357        if let Some(traceparent) = parent.traceparent.as_ref() {
358            headers.insert("traceparent".to_string(), traceparent.clone());
359        }
360        if let Some(tracestate) = parent.tracestate.as_ref() {
361            headers.insert("tracestate".to_string(), tracestate.clone());
362        }
363        if !headers.contains_key("traceparent") {
364            if let (Some(trace_id), Some(span_id)) =
365                (parent.trace_id.as_ref(), parent.span_id.as_ref())
366            {
367                let value = format!("00-{trace_id}-{span_id}-01");
368                headers.insert("traceparent".to_string(), value);
369            }
370        }
371        if headers.is_empty() {
372            return None;
373        }
374
375        struct HeaderExtractor<'a> {
376            inner: &'a HashMap<String, String>,
377        }
378
379        impl Extractor for HeaderExtractor<'_> {
380            fn get(&self, key: &str) -> Option<&str> {
381                self.inner.get(key).map(String::as_str)
382            }
383
384            fn keys(&self) -> Vec<&str> {
385                self.inner.keys().map(String::as_str).collect()
386            }
387        }
388
389        let propagator = TraceContextPropagator::new();
390        let extractor = HeaderExtractor { inner: &headers };
391        Some(propagator.extract(&extractor))
392    }
393}
394
395impl WorkflowTracer for OtelWorkflowTracer {
396    fn start_span(
397        &self,
398        name: &str,
399        _kind: SpanKind,
400        parent: Option<&TraceContext>,
401    ) -> (TraceContext, Box<dyn WorkflowSpan>) {
402        let tracer = global::tracer("simple-agents-workflow");
403        let span = match parent.and_then(Self::parse_parent_context) {
404            Some(parent_context) => tracer.start_with_context(name.to_string(), &parent_context),
405            None => tracer.start(name.to_string()),
406        };
407        let span_context = span.span_context().clone();
408        let context = TraceContext {
409            trace_id: Some(span_context.trace_id().to_string()),
410            span_id: Some(span_context.span_id().to_string()),
411            parent_span_id: parent.and_then(|value| value.span_id.clone()),
412            traceparent: Some(format!(
413                "00-{}-{}-{:02x}",
414                span_context.trace_id(),
415                span_context.span_id(),
416                span_context.trace_flags().to_u8()
417            )),
418            tracestate: parent.and_then(|value| value.tracestate.clone()),
419            baggage: parent
420                .map(|value| value.baggage.clone())
421                .unwrap_or_default(),
422        };
423
424        (context, Box::new(OtelWorkflowSpan { inner: span }))
425    }
426}
427
428#[cfg(test)]
429mod tests {
430    use std::collections::HashMap;
431
432    use super::{
433        grpc_metadata_from_headers, normalize_http_traces_endpoint, parse_headers,
434        NoopWorkflowTracer, OtlpProtocol, SpanKind, TraceContext, TracingConfig, WorkflowTracer,
435        ENV_OTLP_ENDPOINT, ENV_OTLP_HEADERS, ENV_OTLP_PROTOCOL, ENV_TRACING_ENABLED,
436    };
437
438    #[test]
439    fn noop_tracer_supports_span_lifecycle() {
440        let tracer = NoopWorkflowTracer;
441        let parent = TraceContext {
442            trace_id: Some("trace-1".to_string()),
443            ..TraceContext::default()
444        };
445        let (ctx, mut span) = tracer.start_span("node.llm", SpanKind::Node, Some(&parent));
446        assert_eq!(ctx.trace_id.as_deref(), Some("trace-1"));
447        span.set_attribute("node.id", "llm");
448        span.add_event("start");
449        span.end();
450    }
451
452    #[test]
453    fn parse_headers_accepts_comma_separated_pairs() {
454        let headers = parse_headers("a=b, c=d").expect("headers should parse");
455        assert_eq!(headers.get("a").map(String::as_str), Some("b"));
456        assert_eq!(headers.get("c").map(String::as_str), Some("d"));
457    }
458
459    #[test]
460    fn parse_headers_rejects_invalid_entry() {
461        let error = parse_headers("invalid").expect_err("invalid header should fail");
462        assert!(error.contains(ENV_OTLP_HEADERS));
463    }
464
465    #[test]
466    fn tracing_config_defaults_to_disabled_grpc() {
467        let config = TracingConfig::from_lookup(|_| None).expect("config should parse");
468        assert!(!config.enabled);
469        assert_eq!(config.protocol, OtlpProtocol::Grpc);
470        assert_eq!(config.endpoint, "http://localhost:4317");
471        assert!(config.headers.is_empty());
472    }
473
474    #[test]
475    fn tracing_config_parses_http_protocol_and_headers() {
476        let mut values = HashMap::new();
477        values.insert(ENV_TRACING_ENABLED.to_string(), "true".to_string());
478        values.insert(ENV_OTLP_PROTOCOL.to_string(), "http/protobuf".to_string());
479        values.insert(
480            ENV_OTLP_ENDPOINT.to_string(),
481            "https://cloud.langfuse.com/api/public/otel".to_string(),
482        );
483        values.insert(
484            ENV_OTLP_HEADERS.to_string(),
485            "Authorization=Basic test,x-langfuse-ingestion-version=4".to_string(),
486        );
487
488        let config = TracingConfig::from_lookup(|key| values.get(key).cloned())
489            .expect("config should parse");
490
491        assert!(config.enabled);
492        assert_eq!(config.protocol, OtlpProtocol::HttpProtobuf);
493        assert_eq!(
494            config.endpoint,
495            "https://cloud.langfuse.com/api/public/otel"
496        );
497        assert_eq!(
498            config
499                .headers
500                .get("x-langfuse-ingestion-version")
501                .map(String::as_str),
502            Some("4")
503        );
504    }
505
506    #[test]
507    fn tracing_config_rejects_invalid_protocol() {
508        let mut values = HashMap::new();
509        values.insert(ENV_OTLP_PROTOCOL.to_string(), "http/json".to_string());
510        let error = TracingConfig::from_lookup(|key| values.get(key).cloned())
511            .expect_err("invalid protocol should fail");
512        assert!(error.contains(ENV_OTLP_PROTOCOL));
513    }
514
515    #[test]
516    fn http_endpoint_normalizer_appends_v1_traces() {
517        assert_eq!(
518            normalize_http_traces_endpoint("http://localhost:4318"),
519            "http://localhost:4318/v1/traces"
520        );
521        assert_eq!(
522            normalize_http_traces_endpoint("http://localhost:4318/"),
523            "http://localhost:4318/v1/traces"
524        );
525        assert_eq!(
526            normalize_http_traces_endpoint("https://cloud.langfuse.com/api/public/otel"),
527            "https://cloud.langfuse.com/api/public/otel/v1/traces"
528        );
529        assert_eq!(
530            normalize_http_traces_endpoint("https://example.com/v1/traces"),
531            "https://example.com/v1/traces"
532        );
533    }
534
535    #[test]
536    fn grpc_metadata_builder_rejects_invalid_keys() {
537        let mut headers = HashMap::new();
538        headers.insert("Invalid Header".to_string(), "value".to_string());
539        let error =
540            grpc_metadata_from_headers(&headers).expect_err("invalid metadata key should fail");
541        assert!(error.contains(ENV_OTLP_HEADERS));
542    }
543}