1use std::collections::{BTreeMap, HashMap};
2use std::sync::{Once, OnceLock};
3
4use opentelemetry::global;
5use opentelemetry::propagation::{Extractor, TextMapPropagator};
6use opentelemetry::trace::{Span as OtelSpan, Tracer};
7use opentelemetry::{Context, KeyValue};
8use opentelemetry_otlp::{Protocol, WithExportConfig};
9use opentelemetry_sdk::trace::TracerProvider as SdkTracerProvider;
10use opentelemetry_sdk::{propagation::TraceContextPropagator, Resource};
11use tonic::metadata::{AsciiMetadataKey, MetadataMap, MetadataValue};
12
13pub const ENV_TRACING_ENABLED: &str = "SIMPLE_AGENTS_TRACING_ENABLED";
14pub const ENV_OTLP_ENDPOINT: &str = "OTEL_EXPORTER_OTLP_ENDPOINT";
15pub const ENV_OTLP_PROTOCOL: &str = "OTEL_EXPORTER_OTLP_PROTOCOL";
16pub const ENV_OTLP_HEADERS: &str = "OTEL_EXPORTER_OTLP_HEADERS";
17pub const ENV_SERVICE_NAME: &str = "OTEL_SERVICE_NAME";
18
19const DEFAULT_SERVICE_NAME: &str = "simple-agents-workflow";
20const DEFAULT_GRPC_ENDPOINT: &str = "http://localhost:4317";
21const DEFAULT_HTTP_ENDPOINT: &str = "http://localhost:4318";
22
23#[derive(Debug, Clone, Default, PartialEq, Eq)]
25pub struct TraceContext {
26 pub trace_id: Option<String>,
27 pub span_id: Option<String>,
28 pub parent_span_id: Option<String>,
29 pub traceparent: Option<String>,
30 pub tracestate: Option<String>,
31 pub baggage: BTreeMap<String, String>,
32}
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub enum SpanKind {
37 Workflow,
38 Node,
39}
40
41pub trait WorkflowSpan: Send {
43 fn set_attribute(&mut self, key: &str, value: &str);
44 fn add_event(&mut self, name: &str);
45 fn end(self: Box<Self>);
46}
47
48pub trait WorkflowTracer: Send + Sync {
50 fn start_span(
51 &self,
52 name: &str,
53 kind: SpanKind,
54 parent: Option<&TraceContext>,
55 ) -> (TraceContext, Box<dyn WorkflowSpan>);
56}
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59enum OtlpProtocol {
60 Grpc,
61 HttpProtobuf,
62}
63
64impl OtlpProtocol {
65 fn parse(value: &str) -> Result<Self, String> {
66 let normalized = value.trim().to_ascii_lowercase();
67 match normalized.as_str() {
68 "grpc" => Ok(Self::Grpc),
69 "http/protobuf" => Ok(Self::HttpProtobuf),
70 _ => Err(format!(
71 "{ENV_OTLP_PROTOCOL} must be one of: grpc, http/protobuf"
72 )),
73 }
74 }
75
76 fn to_otlp_protocol(self) -> Protocol {
77 match self {
78 Self::Grpc => Protocol::Grpc,
79 Self::HttpProtobuf => Protocol::HttpBinary,
80 }
81 }
82}
83
84#[derive(Debug, Clone, PartialEq, Eq)]
85struct TracingConfig {
86 enabled: bool,
87 endpoint: String,
88 protocol: OtlpProtocol,
89 headers: HashMap<String, String>,
90 service_name: String,
91}
92
93impl TracingConfig {
94 fn from_env() -> Result<Self, String> {
95 Self::from_lookup(|key| std::env::var(key).ok())
96 }
97
98 fn from_lookup<F>(lookup: F) -> Result<Self, String>
99 where
100 F: Fn(&str) -> Option<String>,
101 {
102 let enabled = lookup(ENV_TRACING_ENABLED)
103 .as_deref()
104 .map(parse_bool)
105 .transpose()?
106 .unwrap_or(false);
107
108 let protocol = lookup(ENV_OTLP_PROTOCOL)
109 .as_deref()
110 .map(OtlpProtocol::parse)
111 .transpose()?
112 .unwrap_or(OtlpProtocol::Grpc);
113
114 let endpoint = lookup(ENV_OTLP_ENDPOINT).unwrap_or_else(|| match protocol {
115 OtlpProtocol::Grpc => DEFAULT_GRPC_ENDPOINT.to_string(),
116 OtlpProtocol::HttpProtobuf => DEFAULT_HTTP_ENDPOINT.to_string(),
117 });
118
119 let service_name = lookup(ENV_SERVICE_NAME)
120 .filter(|value| !value.trim().is_empty())
121 .unwrap_or_else(|| DEFAULT_SERVICE_NAME.to_string());
122
123 let headers = lookup(ENV_OTLP_HEADERS)
124 .as_deref()
125 .map(parse_headers)
126 .transpose()?
127 .unwrap_or_default();
128
129 Ok(Self {
130 enabled,
131 endpoint,
132 protocol,
133 headers,
134 service_name,
135 })
136 }
137}
138
139fn parse_bool(value: &str) -> Result<bool, String> {
140 let normalized = value.trim().to_ascii_lowercase();
141 match normalized.as_str() {
142 "1" | "true" | "yes" | "on" => Ok(true),
143 "0" | "false" | "no" | "off" => Ok(false),
144 _ => Err(format!(
145 "{ENV_TRACING_ENABLED} must be a boolean-like value (true/false/1/0/yes/no/on/off)"
146 )),
147 }
148}
149
150fn parse_headers(raw: &str) -> Result<HashMap<String, String>, String> {
151 let mut headers = HashMap::new();
152 let trimmed = raw.trim();
153 if trimmed.is_empty() {
154 return Ok(headers);
155 }
156
157 for part in trimmed.split(',') {
158 let entry = part.trim();
159 if entry.is_empty() {
160 continue;
161 }
162 let Some((name, value)) = entry.split_once('=') else {
163 return Err(format!(
164 "{ENV_OTLP_HEADERS} must contain comma-separated key=value entries"
165 ));
166 };
167 let key = name.trim();
168 let val = value.trim();
169 if key.is_empty() {
170 return Err(format!("{ENV_OTLP_HEADERS} contains an empty header name"));
171 }
172 if val.is_empty() {
173 return Err(format!(
174 "{ENV_OTLP_HEADERS} header '{key}' has an empty value"
175 ));
176 }
177 headers.insert(key.to_string(), val.to_string());
178 }
179
180 Ok(headers)
181}
182
183struct TracerProviderFactory;
184
185impl TracerProviderFactory {
186 fn build(config: &TracingConfig) -> Result<SdkTracerProvider, String> {
187 let exporter = OtlpExporterFactory::build_span_exporter(config)?;
188
189 let provider = SdkTracerProvider::builder()
190 .with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
191 .with_config(
192 opentelemetry_sdk::trace::Config::default().with_resource(Resource::new(vec![
193 KeyValue::new("service.name", config.service_name.clone()),
194 ])),
195 )
196 .build();
197
198 Ok(provider)
199 }
200}
201
202struct OtlpExporterFactory;
203
204impl OtlpExporterFactory {
205 fn build_span_exporter(
206 config: &TracingConfig,
207 ) -> Result<opentelemetry_otlp::SpanExporter, String> {
208 match config.protocol {
209 OtlpProtocol::Grpc => {
210 let builder = opentelemetry_otlp::new_exporter()
211 .tonic()
212 .with_protocol(config.protocol.to_otlp_protocol())
213 .with_endpoint(config.endpoint.clone());
214 let builder = if config.headers.is_empty() {
215 builder
216 } else {
217 builder.with_metadata(grpc_metadata_from_headers(&config.headers)?)
218 };
219 builder
220 .build_span_exporter()
221 .map_err(|error| error.to_string())
222 }
223 OtlpProtocol::HttpProtobuf => opentelemetry_otlp::new_exporter()
224 .http()
225 .with_protocol(config.protocol.to_otlp_protocol())
226 .with_endpoint(normalize_http_traces_endpoint(&config.endpoint))
227 .with_headers(config.headers.clone())
228 .build_span_exporter()
229 .map_err(|error| error.to_string()),
230 }
231 }
232}
233
234fn normalize_http_traces_endpoint(endpoint: &str) -> String {
235 let trimmed = endpoint.trim();
236 if trimmed.is_empty() {
237 return "/v1/traces".to_string();
238 }
239 let without_trailing_slash = trimmed.trim_end_matches('/');
240 if without_trailing_slash.ends_with("/v1/traces") {
241 return without_trailing_slash.to_string();
242 }
243 format!("{without_trailing_slash}/v1/traces")
244}
245
246fn grpc_metadata_from_headers(headers: &HashMap<String, String>) -> Result<MetadataMap, String> {
247 let mut metadata = MetadataMap::new();
248 for (name, value) in headers {
249 let key = AsciiMetadataKey::from_bytes(name.as_bytes())
250 .map_err(|_| format!("{ENV_OTLP_HEADERS} has invalid gRPC header key '{name}'"))?;
251 let metadata_value = MetadataValue::try_from(value.as_str()).map_err(|_| {
252 format!("{ENV_OTLP_HEADERS} has invalid gRPC header value for key '{name}'")
253 })?;
254 metadata.insert(key, metadata_value);
255 }
256 Ok(metadata)
257}
258
259static WORKFLOW_TRACER: OnceLock<Box<dyn WorkflowTracer>> = OnceLock::new();
260static OTEL_PROVIDER: OnceLock<Result<SdkTracerProvider, String>> = OnceLock::new();
261static OTEL_GLOBAL_SET: Once = Once::new();
262
263pub fn workflow_tracer() -> &'static dyn WorkflowTracer {
264 WORKFLOW_TRACER
265 .get_or_init(|| {
266 if OtelWorkflowTracer::init_from_env().is_ok() {
267 Box::new(OtelWorkflowTracer)
268 } else {
269 Box::new(NoopWorkflowTracer)
270 }
271 })
272 .as_ref()
273}
274
275pub fn flush_workflow_tracer() {
276 if let Some(Ok(provider)) = OTEL_PROVIDER.get() {
277 let _ = provider.force_flush();
278 }
279}
280
281#[derive(Debug, Default)]
283pub struct NoopWorkflowSpan;
284
285impl WorkflowSpan for NoopWorkflowSpan {
286 fn set_attribute(&mut self, _key: &str, _value: &str) {}
287
288 fn add_event(&mut self, _name: &str) {}
289
290 fn end(self: Box<Self>) {}
291}
292
293#[derive(Debug, Default)]
295pub struct NoopWorkflowTracer;
296
297impl WorkflowTracer for NoopWorkflowTracer {
298 fn start_span(
299 &self,
300 _name: &str,
301 _kind: SpanKind,
302 parent: Option<&TraceContext>,
303 ) -> (TraceContext, Box<dyn WorkflowSpan>) {
304 (
305 parent.cloned().unwrap_or_default(),
306 Box::<NoopWorkflowSpan>::default(),
307 )
308 }
309}
310
311#[derive(Debug)]
312pub struct OtelWorkflowTracer;
313
314#[derive(Debug)]
315pub struct OtelWorkflowSpan {
316 inner: opentelemetry::global::BoxedSpan,
317}
318
319impl WorkflowSpan for OtelWorkflowSpan {
320 fn set_attribute(&mut self, key: &str, value: &str) {
321 self.inner
322 .set_attribute(KeyValue::new(key.to_string(), value.to_string()));
323 }
324
325 fn add_event(&mut self, name: &str) {
326 self.inner.add_event(name.to_string(), Vec::new());
327 }
328
329 fn end(self: Box<Self>) {
330 let mut inner = self.inner;
331 inner.end();
332 }
333}
334
335impl OtelWorkflowTracer {
336 fn init_from_env() -> Result<(), String> {
337 let config = TracingConfig::from_env()?;
338 if !config.enabled {
339 return Err("tracing disabled".to_string());
340 }
341
342 let provider_result = OTEL_PROVIDER.get_or_init(|| TracerProviderFactory::build(&config));
343
344 match provider_result {
345 Ok(provider) => {
346 OTEL_GLOBAL_SET.call_once(|| {
347 global::set_tracer_provider(provider.clone());
348 });
349 Ok(())
350 }
351 Err(error) => Err(error.clone()),
352 }
353 }
354
355 fn parse_parent_context(parent: &TraceContext) -> Option<Context> {
356 let mut headers = HashMap::new();
357 if let Some(traceparent) = parent.traceparent.as_ref() {
358 headers.insert("traceparent".to_string(), traceparent.clone());
359 }
360 if let Some(tracestate) = parent.tracestate.as_ref() {
361 headers.insert("tracestate".to_string(), tracestate.clone());
362 }
363 if !headers.contains_key("traceparent") {
364 if let (Some(trace_id), Some(span_id)) =
365 (parent.trace_id.as_ref(), parent.span_id.as_ref())
366 {
367 let value = format!("00-{trace_id}-{span_id}-01");
368 headers.insert("traceparent".to_string(), value);
369 }
370 }
371 if headers.is_empty() {
372 return None;
373 }
374
375 struct HeaderExtractor<'a> {
376 inner: &'a HashMap<String, String>,
377 }
378
379 impl Extractor for HeaderExtractor<'_> {
380 fn get(&self, key: &str) -> Option<&str> {
381 self.inner.get(key).map(String::as_str)
382 }
383
384 fn keys(&self) -> Vec<&str> {
385 self.inner.keys().map(String::as_str).collect()
386 }
387 }
388
389 let propagator = TraceContextPropagator::new();
390 let extractor = HeaderExtractor { inner: &headers };
391 Some(propagator.extract(&extractor))
392 }
393}
394
395impl WorkflowTracer for OtelWorkflowTracer {
396 fn start_span(
397 &self,
398 name: &str,
399 _kind: SpanKind,
400 parent: Option<&TraceContext>,
401 ) -> (TraceContext, Box<dyn WorkflowSpan>) {
402 let tracer = global::tracer("simple-agents-workflow");
403 let span = match parent.and_then(Self::parse_parent_context) {
404 Some(parent_context) => tracer.start_with_context(name.to_string(), &parent_context),
405 None => tracer.start(name.to_string()),
406 };
407 let span_context = span.span_context().clone();
408 let context = TraceContext {
409 trace_id: Some(span_context.trace_id().to_string()),
410 span_id: Some(span_context.span_id().to_string()),
411 parent_span_id: parent.and_then(|value| value.span_id.clone()),
412 traceparent: Some(format!(
413 "00-{}-{}-{:02x}",
414 span_context.trace_id(),
415 span_context.span_id(),
416 span_context.trace_flags().to_u8()
417 )),
418 tracestate: parent.and_then(|value| value.tracestate.clone()),
419 baggage: parent
420 .map(|value| value.baggage.clone())
421 .unwrap_or_default(),
422 };
423
424 (context, Box::new(OtelWorkflowSpan { inner: span }))
425 }
426}
427
428#[cfg(test)]
429mod tests {
430 use std::collections::HashMap;
431
432 use super::{
433 grpc_metadata_from_headers, normalize_http_traces_endpoint, parse_headers,
434 NoopWorkflowTracer, OtlpProtocol, SpanKind, TraceContext, TracingConfig, WorkflowTracer,
435 ENV_OTLP_ENDPOINT, ENV_OTLP_HEADERS, ENV_OTLP_PROTOCOL, ENV_TRACING_ENABLED,
436 };
437
438 #[test]
439 fn noop_tracer_supports_span_lifecycle() {
440 let tracer = NoopWorkflowTracer;
441 let parent = TraceContext {
442 trace_id: Some("trace-1".to_string()),
443 ..TraceContext::default()
444 };
445 let (ctx, mut span) = tracer.start_span("node.llm", SpanKind::Node, Some(&parent));
446 assert_eq!(ctx.trace_id.as_deref(), Some("trace-1"));
447 span.set_attribute("node.id", "llm");
448 span.add_event("start");
449 span.end();
450 }
451
452 #[test]
453 fn parse_headers_accepts_comma_separated_pairs() {
454 let headers = parse_headers("a=b, c=d").expect("headers should parse");
455 assert_eq!(headers.get("a").map(String::as_str), Some("b"));
456 assert_eq!(headers.get("c").map(String::as_str), Some("d"));
457 }
458
459 #[test]
460 fn parse_headers_rejects_invalid_entry() {
461 let error = parse_headers("invalid").expect_err("invalid header should fail");
462 assert!(error.contains(ENV_OTLP_HEADERS));
463 }
464
465 #[test]
466 fn tracing_config_defaults_to_disabled_grpc() {
467 let config = TracingConfig::from_lookup(|_| None).expect("config should parse");
468 assert!(!config.enabled);
469 assert_eq!(config.protocol, OtlpProtocol::Grpc);
470 assert_eq!(config.endpoint, "http://localhost:4317");
471 assert!(config.headers.is_empty());
472 }
473
474 #[test]
475 fn tracing_config_parses_http_protocol_and_headers() {
476 let mut values = HashMap::new();
477 values.insert(ENV_TRACING_ENABLED.to_string(), "true".to_string());
478 values.insert(ENV_OTLP_PROTOCOL.to_string(), "http/protobuf".to_string());
479 values.insert(
480 ENV_OTLP_ENDPOINT.to_string(),
481 "https://cloud.langfuse.com/api/public/otel".to_string(),
482 );
483 values.insert(
484 ENV_OTLP_HEADERS.to_string(),
485 "Authorization=Basic test,x-langfuse-ingestion-version=4".to_string(),
486 );
487
488 let config = TracingConfig::from_lookup(|key| values.get(key).cloned())
489 .expect("config should parse");
490
491 assert!(config.enabled);
492 assert_eq!(config.protocol, OtlpProtocol::HttpProtobuf);
493 assert_eq!(
494 config.endpoint,
495 "https://cloud.langfuse.com/api/public/otel"
496 );
497 assert_eq!(
498 config
499 .headers
500 .get("x-langfuse-ingestion-version")
501 .map(String::as_str),
502 Some("4")
503 );
504 }
505
506 #[test]
507 fn tracing_config_rejects_invalid_protocol() {
508 let mut values = HashMap::new();
509 values.insert(ENV_OTLP_PROTOCOL.to_string(), "http/json".to_string());
510 let error = TracingConfig::from_lookup(|key| values.get(key).cloned())
511 .expect_err("invalid protocol should fail");
512 assert!(error.contains(ENV_OTLP_PROTOCOL));
513 }
514
515 #[test]
516 fn http_endpoint_normalizer_appends_v1_traces() {
517 assert_eq!(
518 normalize_http_traces_endpoint("http://localhost:4318"),
519 "http://localhost:4318/v1/traces"
520 );
521 assert_eq!(
522 normalize_http_traces_endpoint("http://localhost:4318/"),
523 "http://localhost:4318/v1/traces"
524 );
525 assert_eq!(
526 normalize_http_traces_endpoint("https://cloud.langfuse.com/api/public/otel"),
527 "https://cloud.langfuse.com/api/public/otel/v1/traces"
528 );
529 assert_eq!(
530 normalize_http_traces_endpoint("https://example.com/v1/traces"),
531 "https://example.com/v1/traces"
532 );
533 }
534
535 #[test]
536 fn grpc_metadata_builder_rejects_invalid_keys() {
537 let mut headers = HashMap::new();
538 headers.insert("Invalid Header".to_string(), "value".to_string());
539 let error =
540 grpc_metadata_from_headers(&headers).expect_err("invalid metadata key should fail");
541 assert!(error.contains(ENV_OTLP_HEADERS));
542 }
543}