1use std::collections::HashMap;
20use std::sync::{Arc, Mutex};
21use std::time::{Duration, SystemTime, UNIX_EPOCH};
22
23use crate::api::event::Event;
24use crate::api::event::ScopeCategory;
25use crate::api::runtime::EventSubscriberFn;
26use crate::api::scope::ScopeType;
27use crate::api::subscriber::{deregister_subscriber, register_subscriber};
28use crate::error::FlowError;
29use chrono::{DateTime, Utc};
30use opentelemetry::trace::{
31 Span as _, SpanContext, SpanKind, TraceContextExt, Tracer, TracerProvider as _,
32};
33use opentelemetry::{Context, KeyValue};
34use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig, WithHttpConfig};
35use opentelemetry_sdk::Resource;
36use opentelemetry_sdk::trace::{SdkTracer, SdkTracerProvider, Span};
37use serde::Serialize;
38use uuid::Uuid;
39
40#[cfg(target_arch = "wasm32")]
41use async_trait::async_trait;
42#[cfg(target_arch = "wasm32")]
43use opentelemetry_http::{
44 Bytes, HttpClient, HttpError, Request as HttpRequest, Response as HttpResponse,
45};
46#[cfg(not(target_arch = "wasm32"))]
47use opentelemetry_otlp::WithTonicConfig;
48#[cfg(not(target_arch = "wasm32"))]
49use tokio::runtime::Handle;
50#[cfg(not(target_arch = "wasm32"))]
51use tonic::metadata::{MetadataKey, MetadataMap, MetadataValue};
52#[cfg(target_arch = "wasm32")]
53use wasm_bindgen::{JsCast, JsValue};
54#[cfg(target_arch = "wasm32")]
55use wasm_bindgen_futures::{JsFuture, spawn_local};
56#[cfg(target_arch = "wasm32")]
57use web_sys::{Request as WebRequest, RequestInit};
58
59pub type Result<T> = std::result::Result<T, OpenTelemetryError>;
61
62#[derive(Debug, thiserror::Error)]
64pub enum OpenTelemetryError {
65 #[error("the OTLP gRPC exporter requires an active Tokio runtime")]
67 MissingTokioRuntime,
68 #[error("the OTLP {transport} transport is not supported on this target")]
70 UnsupportedTransport {
71 transport: &'static str,
73 },
74 #[error("invalid OTLP gRPC header {key:?}: {message}")]
76 InvalidGrpcHeader {
77 key: String,
79 message: String,
81 },
82 #[error("failed to build the OTLP exporter: {0}")]
84 ExporterBuild(String),
85 #[error("OpenTelemetry tracer provider error: {0}")]
87 Provider(String),
88 #[error(transparent)]
90 Core(#[from] FlowError),
91}
92
93#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
95pub enum OtlpTransport {
96 #[default]
98 HttpBinary,
99 Grpc,
101}
102
103#[derive(Debug, Clone)]
105pub struct OpenTelemetryConfig {
106 endpoint: Option<String>,
107 headers: HashMap<String, String>,
108 resource_attributes: HashMap<String, String>,
109 service_name: String,
110 service_namespace: Option<String>,
111 service_version: Option<String>,
112 instrumentation_scope: String,
113 timeout: Duration,
114 transport: OtlpTransport,
115}
116
117impl Default for OpenTelemetryConfig {
118 fn default() -> Self {
119 Self {
120 endpoint: None,
121 headers: HashMap::new(),
122 resource_attributes: HashMap::new(),
123 service_name: "nemo-flow".to_string(),
124 service_namespace: None,
125 service_version: None,
126 instrumentation_scope: "nemo-flow-otel".to_string(),
127 timeout: Duration::from_secs(3),
128 transport: OtlpTransport::HttpBinary,
129 }
130 }
131}
132
133impl OpenTelemetryConfig {
134 pub fn http_binary(service_name: impl Into<String>) -> Self {
136 Self {
137 service_name: service_name.into(),
138 transport: OtlpTransport::HttpBinary,
139 ..Self::default()
140 }
141 }
142
143 pub fn grpc(service_name: impl Into<String>) -> Self {
145 Self {
146 service_name: service_name.into(),
147 transport: OtlpTransport::Grpc,
148 ..Self::default()
149 }
150 }
151
152 pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
154 self.endpoint = Some(endpoint.into());
155 self
156 }
157
158 pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
160 self.headers.insert(key.into(), value.into());
161 self
162 }
163
164 pub fn with_resource_attribute(
166 mut self,
167 key: impl Into<String>,
168 value: impl Into<String>,
169 ) -> Self {
170 self.resource_attributes.insert(key.into(), value.into());
171 self
172 }
173
174 pub fn with_timeout(mut self, timeout: Duration) -> Self {
176 self.timeout = timeout;
177 self
178 }
179
180 pub fn with_service_namespace(mut self, namespace: impl Into<String>) -> Self {
182 self.service_namespace = Some(namespace.into());
183 self
184 }
185
186 pub fn with_service_version(mut self, version: impl Into<String>) -> Self {
188 self.service_version = Some(version.into());
189 self
190 }
191
192 pub fn with_instrumentation_scope(mut self, scope: impl Into<String>) -> Self {
194 self.instrumentation_scope = scope.into();
195 self
196 }
197}
198
199#[derive(Clone)]
201pub struct OpenTelemetrySubscriber {
202 inner: Arc<Inner>,
203}
204
205struct Inner {
206 processor: Arc<Mutex<OtelEventProcessor>>,
207 subscriber: EventSubscriberFn,
208}
209
210impl OpenTelemetrySubscriber {
211 pub fn new(config: OpenTelemetryConfig) -> Result<Self> {
213 #[cfg(not(target_arch = "wasm32"))]
214 if config.transport == OtlpTransport::Grpc && tokio::runtime::Handle::try_current().is_err()
215 {
216 return Err(OpenTelemetryError::MissingTokioRuntime);
217 }
218 #[cfg(target_arch = "wasm32")]
219 if config.transport == OtlpTransport::Grpc {
220 return Err(OpenTelemetryError::UnsupportedTransport { transport: "gRPC" });
221 }
222
223 let provider = build_tracer_provider(&config)?;
224 Ok(Self::from_tracer_provider_with_scope(
225 provider,
226 config.instrumentation_scope,
227 ))
228 }
229
230 pub fn from_tracer_provider(
232 provider: SdkTracerProvider,
233 instrumentation_scope: impl Into<String>,
234 ) -> Self {
235 Self::from_tracer_provider_with_scope(provider, instrumentation_scope.into())
236 }
237
238 fn from_tracer_provider_with_scope(
239 provider: SdkTracerProvider,
240 instrumentation_scope: String,
241 ) -> Self {
242 let processor = Arc::new(Mutex::new(OtelEventProcessor::new(
243 provider,
244 instrumentation_scope,
245 )));
246 let processor_for_callback = Arc::clone(&processor);
247 let subscriber: EventSubscriberFn = Arc::new(move |event: &Event| {
248 let Ok(mut guard) = processor_for_callback.lock() else {
249 return;
252 };
253 guard.process(event);
254 });
255
256 Self {
257 inner: Arc::new(Inner {
258 processor,
259 subscriber,
260 }),
261 }
262 }
263
264 pub fn subscriber(&self) -> EventSubscriberFn {
266 Arc::clone(&self.inner.subscriber)
267 }
268
269 pub fn register(&self, name: &str) -> Result<()> {
271 register_subscriber(name, self.subscriber()).map_err(Into::into)
272 }
273
274 pub fn deregister(&self, name: &str) -> Result<bool> {
276 deregister_subscriber(name).map_err(Into::into)
277 }
278
279 pub fn force_flush(&self) -> Result<()> {
281 let guard = self.inner.processor.lock().map_err(|_| {
282 OpenTelemetryError::Provider("the subscriber state lock was poisoned".to_string())
283 })?;
284 guard.force_flush()
285 }
286
287 pub fn shutdown(&self) -> Result<()> {
291 let guard = self.inner.processor.lock().map_err(|_| {
292 OpenTelemetryError::Provider("the subscriber state lock was poisoned".to_string())
293 })?;
294 guard.shutdown()
295 }
296}
297
298#[cfg(target_arch = "wasm32")]
299#[derive(Debug, Clone, Copy, Default)]
300struct WasmHttpClient;
301
302#[cfg(target_arch = "wasm32")]
303#[async_trait]
304impl HttpClient for WasmHttpClient {
305 async fn send_bytes(
306 &self,
307 request: HttpRequest<Bytes>,
308 ) -> std::result::Result<HttpResponse<Bytes>, HttpError> {
309 let (parts, body) = request.into_parts();
310
311 let request = {
312 let request_url = parts.uri.to_string();
313 let init = RequestInit::new();
314 init.set_method(parts.method.as_str());
315 if !body.is_empty() {
316 let body_bytes = js_sys::Uint8Array::from(body.as_ref());
317 init.set_body_opt_u8_array(Some(&body_bytes));
318 }
319
320 let request =
321 WebRequest::new_with_str_and_init(&request_url, &init).map_err(js_error)?;
322 let request_headers = request.headers();
323 for (name, value) in &parts.headers {
324 let value = value
325 .to_str()
326 .map_err(|e| http_error(format!("invalid OTLP HTTP header {name}: {e}")))?;
327 request_headers
328 .set(name.as_str(), value)
329 .map_err(js_error)?;
330 }
331 request
332 };
333
334 let fetch_promise = if let Some(window) = web_sys::window() {
335 window.fetch_with_request(&request)
336 } else {
337 let global = js_sys::global();
338 let fetch = js_sys::Reflect::get(&global, &JsValue::from_str("fetch"))
339 .map_err(js_error)?
340 .dyn_into::<js_sys::Function>()
341 .map_err(js_error)?;
342 fetch.call1(&global, &request).map_err(js_error)?.into()
343 };
344 spawn_local(async move {
347 if let Err(error) = JsFuture::from(fetch_promise).await {
348 web_sys::console::warn_1(&JsValue::from_str(&format!(
349 "OpenTelemetry OTLP/HTTP export failed: {error:?}"
350 )));
351 }
352 });
353
354 HttpResponse::builder()
355 .status(202)
356 .body(Bytes::new())
357 .map_err(|e| http_error(e.to_string()))
358 }
359}
360
361#[cfg(target_arch = "wasm32")]
362fn js_error(value: JsValue) -> HttpError {
363 http_error(
364 value
365 .as_string()
366 .unwrap_or_else(|| format!("JavaScript error: {value:?}")),
367 )
368}
369
370#[cfg(target_arch = "wasm32")]
371fn http_error(message: impl Into<String>) -> HttpError {
372 Box::new(std::io::Error::other(message.into()))
373}
374
375fn build_tracer_provider(config: &OpenTelemetryConfig) -> Result<SdkTracerProvider> {
376 let exporter = match config.transport {
377 OtlpTransport::HttpBinary => {
378 let mut builder = SpanExporter::builder()
379 .with_http()
380 .with_protocol(Protocol::HttpBinary)
381 .with_timeout(config.timeout);
382 if let Some(endpoint) = &config.endpoint {
383 builder = builder.with_endpoint(endpoint.clone());
384 }
385 if !config.headers.is_empty() {
386 builder = builder.with_headers(config.headers.clone());
387 }
388 #[cfg(target_arch = "wasm32")]
389 {
390 builder = builder.with_http_client(WasmHttpClient);
391 }
392 builder
393 .build()
394 .map_err(|e| OpenTelemetryError::ExporterBuild(e.to_string()))?
395 }
396 #[cfg(not(target_arch = "wasm32"))]
397 OtlpTransport::Grpc => {
398 let mut builder = SpanExporter::builder()
399 .with_tonic()
400 .with_protocol(Protocol::Grpc)
401 .with_timeout(config.timeout);
402 if let Some(endpoint) = &config.endpoint {
403 builder = builder.with_endpoint(endpoint.clone());
404 }
405 if !config.headers.is_empty() {
406 builder = builder.with_metadata(build_grpc_metadata(&config.headers)?);
407 }
408 builder
409 .build()
410 .map_err(|e| OpenTelemetryError::ExporterBuild(e.to_string()))?
411 }
412 #[cfg(target_arch = "wasm32")]
413 OtlpTransport::Grpc => {
414 return Err(OpenTelemetryError::UnsupportedTransport { transport: "gRPC" });
415 }
416 };
417
418 let mut resource_attributes = vec![KeyValue::new("service.name", config.service_name.clone())];
419 if let Some(service_namespace) = &config.service_namespace {
420 resource_attributes.push(KeyValue::new(
421 "service.namespace",
422 service_namespace.clone(),
423 ));
424 }
425 if let Some(service_version) = &config.service_version {
426 resource_attributes.push(KeyValue::new("service.version", service_version.clone()));
427 }
428 for (key, value) in &config.resource_attributes {
429 resource_attributes.push(KeyValue::new(key.clone(), value.clone()));
430 }
431
432 let builder = SdkTracerProvider::builder()
436 .with_resource(
437 Resource::builder_empty()
438 .with_attributes(resource_attributes)
439 .build(),
440 )
441 .with_max_attributes_per_span(u32::MAX)
442 .with_max_attributes_per_event(u32::MAX);
443
444 #[cfg(not(target_arch = "wasm32"))]
445 {
446 if Handle::try_current().is_ok() {
447 Ok(builder.with_batch_exporter(exporter).build())
448 } else {
449 Ok(builder.with_simple_exporter(exporter).build())
450 }
451 }
452 #[cfg(target_arch = "wasm32")]
453 {
454 Ok(builder.with_simple_exporter(exporter).build())
455 }
456}
457
458#[cfg(not(target_arch = "wasm32"))]
459fn build_grpc_metadata(headers: &HashMap<String, String>) -> Result<MetadataMap> {
460 let mut metadata = MetadataMap::new();
461 for (key, value) in headers {
462 let metadata_key = MetadataKey::from_bytes(key.as_bytes()).map_err(|e| {
463 OpenTelemetryError::InvalidGrpcHeader {
464 key: key.clone(),
465 message: e.to_string(),
466 }
467 })?;
468 let metadata_value = MetadataValue::try_from(value.as_str()).map_err(|e| {
469 OpenTelemetryError::InvalidGrpcHeader {
470 key: key.clone(),
471 message: e.to_string(),
472 }
473 })?;
474 metadata.insert(metadata_key, metadata_value);
475 }
476 Ok(metadata)
477}
478
479struct ActiveSpan {
480 span: Span,
481 span_context: SpanContext,
482}
483
484struct OtelEventProcessor {
485 active_spans: HashMap<Uuid, ActiveSpan>,
486 provider: SdkTracerProvider,
487 tracer: SdkTracer,
488}
489
490impl OtelEventProcessor {
491 fn new(provider: SdkTracerProvider, instrumentation_scope: String) -> Self {
492 let tracer = provider.tracer(instrumentation_scope);
493 Self {
494 active_spans: HashMap::new(),
495 provider,
496 tracer,
497 }
498 }
499
500 fn process(&mut self, event: &Event) {
501 match event.scope_category() {
502 Some(ScopeCategory::Start) => self.process_start(event),
503 Some(ScopeCategory::End) => self.process_end(event),
504 None => self.process_mark(event),
505 }
506 }
507
508 fn force_flush(&self) -> Result<()> {
509 self.provider
510 .force_flush()
511 .map_err(|e| OpenTelemetryError::Provider(e.to_string()))
512 }
513
514 fn shutdown(&self) -> Result<()> {
515 self.provider
516 .shutdown()
517 .map_err(|e| OpenTelemetryError::Provider(e.to_string()))
518 }
519
520 fn process_start(&mut self, event: &Event) {
521 let mut span = self
522 .tracer
523 .span_builder(span_name(event))
524 .with_kind(span_kind(event))
525 .with_start_time(to_system_time(*event.timestamp()))
526 .start_with_context(&self.tracer, &self.parent_context(event));
527 span.set_attributes(start_attributes(event));
528 let span_context = local_parent_span_context(span.span_context());
529 self.active_spans
530 .insert(event.uuid(), ActiveSpan { span, span_context });
531 }
532
533 fn process_end(&mut self, event: &Event) {
534 let Some(mut active_span) = self.active_spans.remove(&event.uuid()) else {
535 return;
536 };
537 active_span.span.set_attributes(end_attributes(event));
538 active_span
539 .span
540 .end_with_timestamp(to_system_time(*event.timestamp()));
541 }
542
543 fn process_mark(&mut self, event: &Event) {
544 let mark_name = event.name().to_string();
545 let timestamp = to_system_time(*event.timestamp());
546 let attributes = mark_attributes(event);
547
548 if let Some(parent_span) = self.find_parent_span_mut(event) {
549 parent_span
550 .span
551 .add_event_with_timestamp(mark_name, timestamp, attributes);
552 return;
553 }
554
555 let mut span = self
556 .tracer
557 .span_builder(format!("mark:{mark_name}"))
558 .with_kind(SpanKind::Internal)
559 .with_start_time(timestamp)
560 .start_with_context(&self.tracer, &self.parent_context(event));
561 let mut span_attributes = attributes;
562 span_attributes.push(KeyValue::new("nemo_flow.mark.orphan", true));
563 span.set_attributes(span_attributes);
564 span.end_with_timestamp(timestamp);
565 }
566
567 fn parent_context(&self, event: &Event) -> Context {
568 self.find_parent_span(event)
569 .map(|active_span| {
570 Context::new().with_remote_span_context(active_span.span_context.clone())
571 })
572 .unwrap_or_default()
573 }
574
575 fn parent_span_uuid(&self, event: &Event) -> Option<Uuid> {
576 event
577 .parent_uuid()
578 .filter(|uuid| self.active_spans.contains_key(uuid))
579 }
580
581 fn find_parent_span(&self, event: &Event) -> Option<&ActiveSpan> {
582 self.parent_span_uuid(event)
583 .and_then(|uuid| self.active_spans.get(&uuid))
584 }
585
586 fn find_parent_span_mut(&mut self, event: &Event) -> Option<&mut ActiveSpan> {
587 self.parent_span_uuid(event)
588 .and_then(|uuid| self.active_spans.get_mut(&uuid))
589 }
590}
591
592fn span_kind(event: &Event) -> SpanKind {
593 match semantic_scope_type(event) {
594 Some(ScopeType::Llm) => SpanKind::Client,
595 Some(
596 ScopeType::Tool | ScopeType::Retriever | ScopeType::Embedder | ScopeType::Reranker,
597 ) => SpanKind::Client,
598 _ => SpanKind::Internal,
599 }
600}
601
602fn span_name(event: &Event) -> String {
603 event.name().to_string()
604}
605
606fn semantic_scope_type(event: &Event) -> Option<ScopeType> {
607 event.scope_type()
608}
609
610fn scope_type_name(scope_type: Option<ScopeType>) -> &'static str {
611 match scope_type {
612 Some(ScopeType::Agent) => "agent",
613 Some(ScopeType::Function) => "function",
614 Some(ScopeType::Tool) => "tool",
615 Some(ScopeType::Llm) => "llm",
616 Some(ScopeType::Retriever) => "retriever",
617 Some(ScopeType::Embedder) => "embedder",
618 Some(ScopeType::Reranker) => "reranker",
619 Some(ScopeType::Guardrail) => "guardrail",
620 Some(ScopeType::Evaluator) => "evaluator",
621 Some(ScopeType::Custom) => "custom",
622 Some(ScopeType::Unknown) | None => "unknown",
623 }
624}
625
626fn start_attributes(event: &Event) -> Vec<KeyValue> {
627 let mut attributes = common_attributes(event);
628 let handle_attributes = event.attributes();
629 push_serialized(
630 &mut attributes,
631 "nemo_flow.handle_attributes_json",
632 handle_attributes,
633 );
634 push_serialized(&mut attributes, "nemo_flow.start.data_json", event.data());
635 push_serialized(
636 &mut attributes,
637 "nemo_flow.start.metadata_json",
638 event.metadata(),
639 );
640 push_serialized(&mut attributes, "nemo_flow.start.input_json", event.input());
641 attributes
642}
643
644fn end_attributes(event: &Event) -> Vec<KeyValue> {
645 let mut attributes = Vec::new();
646 push_serialized(&mut attributes, "nemo_flow.end.data_json", event.data());
647 push_serialized(
648 &mut attributes,
649 "nemo_flow.end.metadata_json",
650 event.metadata(),
651 );
652 push_serialized(&mut attributes, "nemo_flow.end.output_json", event.output());
653 attributes
654}
655
656fn mark_attributes(event: &Event) -> Vec<KeyValue> {
657 let handle_attributes = event.attributes();
658 let mut attributes = vec![
659 KeyValue::new("nemo_flow.mark.uuid", event.uuid().to_string()),
660 KeyValue::new(
661 "nemo_flow.mark.parent_uuid",
662 event
663 .parent_uuid()
664 .map(|uuid| uuid.to_string())
665 .unwrap_or_default(),
666 ),
667 ];
668 push_serialized(
669 &mut attributes,
670 "nemo_flow.mark.attributes_json",
671 handle_attributes,
672 );
673 push_serialized(&mut attributes, "nemo_flow.mark.data_json", event.data());
674 push_serialized(
675 &mut attributes,
676 "nemo_flow.mark.metadata_json",
677 event.metadata(),
678 );
679 attributes
680}
681
682fn common_attributes(event: &Event) -> Vec<KeyValue> {
683 let mut attributes = vec![
684 KeyValue::new("nemo_flow.uuid", event.uuid().to_string()),
685 KeyValue::new(
686 "nemo_flow.parent_uuid",
687 event
688 .parent_uuid()
689 .map(|uuid| uuid.to_string())
690 .unwrap_or_default(),
691 ),
692 KeyValue::new(
693 "nemo_flow.scope_type",
694 scope_type_name(semantic_scope_type(event)),
695 ),
696 ];
697
698 if let Some(model_name) = event.model_name() {
699 attributes.push(KeyValue::new(
700 "nemo_flow.model_name",
701 model_name.to_string(),
702 ));
703 }
704 if let Some(tool_call_id) = event.tool_call_id() {
705 attributes.push(KeyValue::new(
706 "nemo_flow.tool_call_id",
707 tool_call_id.to_string(),
708 ));
709 }
710
711 attributes
712}
713
714fn push_serialized<T: Serialize + ?Sized>(
715 attributes: &mut Vec<KeyValue>,
716 key: &'static str,
717 value: Option<&T>,
718) {
719 if let Some(value) = value
720 && let Ok(json) = serde_json::to_string(value)
721 {
722 attributes.push(KeyValue::new(key, json));
723 }
724}
725
726fn local_parent_span_context(span_context: &SpanContext) -> SpanContext {
727 SpanContext::new(
728 span_context.trace_id(),
729 span_context.span_id(),
730 span_context.trace_flags(),
731 false,
732 span_context.trace_state().clone(),
733 )
734}
735
736fn to_system_time(timestamp: DateTime<Utc>) -> SystemTime {
737 let seconds = timestamp.timestamp();
738 let nanos = timestamp.timestamp_subsec_nanos();
739 if seconds >= 0 {
740 UNIX_EPOCH + Duration::new(seconds as u64, nanos)
741 } else if nanos == 0 {
742 UNIX_EPOCH - Duration::new(seconds.unsigned_abs(), 0)
743 } else {
744 UNIX_EPOCH - Duration::new(seconds.unsigned_abs() - 1, 1_000_000_000 - nanos)
745 }
746}
747
748#[cfg(test)]
749#[path = "../../tests/unit/observability/otel_tests.rs"]
750mod tests;