1use std::collections::HashMap;
20use std::sync::{Arc, Mutex};
21use std::time::{Duration, SystemTime, UNIX_EPOCH};
22
23use crate::api::event::Event;
24use crate::api::event::ScopeCategory;
25use crate::api::runtime::EventSubscriberFn;
26use crate::api::scope::ScopeType;
27use crate::api::subscriber::{deregister_subscriber, register_subscriber};
28use crate::error::FlowError;
29use chrono::{DateTime, Utc};
30use opentelemetry::trace::{
31 Span as _, SpanContext, SpanKind, TraceContextExt, Tracer, TracerProvider as _,
32};
33use opentelemetry::{Context, KeyValue};
34use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig, WithHttpConfig};
35use opentelemetry_sdk::Resource;
36use opentelemetry_sdk::trace::{SdkTracer, SdkTracerProvider, Span};
37use serde::Serialize;
38use uuid::Uuid;
39
40#[cfg(target_arch = "wasm32")]
41use async_trait::async_trait;
42#[cfg(target_arch = "wasm32")]
43use opentelemetry_http::{
44 Bytes, HttpClient, HttpError, Request as HttpRequest, Response as HttpResponse,
45};
46#[cfg(not(target_arch = "wasm32"))]
47use opentelemetry_otlp::WithTonicConfig;
48#[cfg(not(target_arch = "wasm32"))]
49use tokio::runtime::Handle;
50#[cfg(not(target_arch = "wasm32"))]
51use tonic::metadata::{MetadataKey, MetadataMap, MetadataValue};
52#[cfg(target_arch = "wasm32")]
53use wasm_bindgen::{JsCast, JsValue};
54#[cfg(target_arch = "wasm32")]
55use wasm_bindgen_futures::{JsFuture, spawn_local};
56#[cfg(target_arch = "wasm32")]
57use web_sys::{Request as WebRequest, RequestInit};
58
59pub type Result<T> = std::result::Result<T, OpenTelemetryError>;
61
62#[derive(Debug, thiserror::Error)]
64pub enum OpenTelemetryError {
65 #[error("the OTLP gRPC exporter requires an active Tokio runtime")]
67 MissingTokioRuntime,
68 #[error("the OTLP {transport} transport is not supported on this target")]
70 UnsupportedTransport {
71 transport: &'static str,
73 },
74 #[error("invalid OTLP gRPC header {key:?}: {message}")]
76 InvalidGrpcHeader {
77 key: String,
79 message: String,
81 },
82 #[error("failed to build the OTLP exporter: {0}")]
84 ExporterBuild(String),
85 #[error("OpenTelemetry tracer provider error: {0}")]
87 Provider(String),
88 #[error(transparent)]
90 Core(#[from] FlowError),
91}
92
93#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
95pub enum OtlpTransport {
96 #[default]
98 HttpBinary,
99 Grpc,
101}
102
103#[derive(Debug, Clone)]
105pub struct OpenTelemetryConfig {
106 endpoint: Option<String>,
107 headers: HashMap<String, String>,
108 resource_attributes: HashMap<String, String>,
109 service_name: String,
110 service_namespace: Option<String>,
111 service_version: Option<String>,
112 instrumentation_scope: String,
113 timeout: Duration,
114 transport: OtlpTransport,
115}
116
117impl Default for OpenTelemetryConfig {
118 fn default() -> Self {
119 Self {
120 endpoint: None,
121 headers: HashMap::new(),
122 resource_attributes: HashMap::new(),
123 service_name: "nemo-flow".to_string(),
124 service_namespace: None,
125 service_version: None,
126 instrumentation_scope: "nemo-flow-otel".to_string(),
127 timeout: Duration::from_secs(3),
128 transport: OtlpTransport::HttpBinary,
129 }
130 }
131}
132
133impl OpenTelemetryConfig {
134 pub fn http_binary(service_name: impl Into<String>) -> Self {
136 Self {
137 service_name: service_name.into(),
138 transport: OtlpTransport::HttpBinary,
139 ..Self::default()
140 }
141 }
142
143 pub fn grpc(service_name: impl Into<String>) -> Self {
145 Self {
146 service_name: service_name.into(),
147 transport: OtlpTransport::Grpc,
148 ..Self::default()
149 }
150 }
151
152 pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
154 self.endpoint = Some(endpoint.into());
155 self
156 }
157
158 pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
160 self.headers.insert(key.into(), value.into());
161 self
162 }
163
164 pub fn with_resource_attribute(
166 mut self,
167 key: impl Into<String>,
168 value: impl Into<String>,
169 ) -> Self {
170 self.resource_attributes.insert(key.into(), value.into());
171 self
172 }
173
174 pub fn with_timeout(mut self, timeout: Duration) -> Self {
176 self.timeout = timeout;
177 self
178 }
179
180 pub fn with_service_namespace(mut self, namespace: impl Into<String>) -> Self {
182 self.service_namespace = Some(namespace.into());
183 self
184 }
185
186 pub fn with_service_version(mut self, version: impl Into<String>) -> Self {
188 self.service_version = Some(version.into());
189 self
190 }
191
192 pub fn with_instrumentation_scope(mut self, scope: impl Into<String>) -> Self {
194 self.instrumentation_scope = scope.into();
195 self
196 }
197}
198
199#[derive(Clone)]
201pub struct OpenTelemetrySubscriber {
202 inner: Arc<Inner>,
203}
204
205struct Inner {
206 processor: Arc<Mutex<OtelEventProcessor>>,
207 subscriber: EventSubscriberFn,
208}
209
210impl OpenTelemetrySubscriber {
211 pub fn new(config: OpenTelemetryConfig) -> Result<Self> {
213 #[cfg(not(target_arch = "wasm32"))]
214 if config.transport == OtlpTransport::Grpc && tokio::runtime::Handle::try_current().is_err()
215 {
216 return Err(OpenTelemetryError::MissingTokioRuntime);
217 }
218 #[cfg(target_arch = "wasm32")]
219 if config.transport == OtlpTransport::Grpc {
220 return Err(OpenTelemetryError::UnsupportedTransport { transport: "gRPC" });
221 }
222
223 let provider = build_tracer_provider(&config)?;
224 Ok(Self::from_tracer_provider_with_scope(
225 provider,
226 config.instrumentation_scope,
227 ))
228 }
229
230 pub fn from_tracer_provider(
232 provider: SdkTracerProvider,
233 instrumentation_scope: impl Into<String>,
234 ) -> Self {
235 Self::from_tracer_provider_with_scope(provider, instrumentation_scope.into())
236 }
237
238 fn from_tracer_provider_with_scope(
239 provider: SdkTracerProvider,
240 instrumentation_scope: String,
241 ) -> Self {
242 let processor = Arc::new(Mutex::new(OtelEventProcessor::new(
243 provider,
244 instrumentation_scope,
245 )));
246 let processor_for_callback = Arc::clone(&processor);
247 let subscriber: EventSubscriberFn = Arc::new(move |event: &Event| {
248 let Ok(mut guard) = processor_for_callback.lock() else {
249 return;
252 };
253 guard.process(event);
254 });
255
256 Self {
257 inner: Arc::new(Inner {
258 processor,
259 subscriber,
260 }),
261 }
262 }
263
264 pub fn subscriber(&self) -> EventSubscriberFn {
266 Arc::clone(&self.inner.subscriber)
267 }
268
269 pub fn register(&self, name: &str) -> Result<()> {
271 register_subscriber(name, self.subscriber()).map_err(Into::into)
272 }
273
274 pub fn deregister(&self, name: &str) -> Result<bool> {
276 deregister_subscriber(name).map_err(Into::into)
277 }
278
279 pub fn force_flush(&self) -> Result<()> {
281 let guard = self.inner.processor.lock().map_err(|_| {
282 OpenTelemetryError::Provider("the subscriber state lock was poisoned".to_string())
283 })?;
284 guard.force_flush()
285 }
286
287 pub fn shutdown(&self) -> Result<()> {
291 let guard = self.inner.processor.lock().map_err(|_| {
292 OpenTelemetryError::Provider("the subscriber state lock was poisoned".to_string())
293 })?;
294 guard.shutdown()
295 }
296}
297
298#[cfg(target_arch = "wasm32")]
299#[derive(Debug, Clone, Copy, Default)]
300struct WasmHttpClient;
301
302#[cfg(target_arch = "wasm32")]
303#[async_trait]
304impl HttpClient for WasmHttpClient {
305 async fn send_bytes(
306 &self,
307 request: HttpRequest<Bytes>,
308 ) -> std::result::Result<HttpResponse<Bytes>, HttpError> {
309 let (parts, body) = request.into_parts();
310
311 let request = {
312 let request_url = parts.uri.to_string();
313 let init = RequestInit::new();
314 init.set_method(parts.method.as_str());
315 if !body.is_empty() {
316 let body_bytes = js_sys::Uint8Array::from(body.as_ref());
317 init.set_body_opt_u8_array(Some(&body_bytes));
318 }
319
320 let request =
321 WebRequest::new_with_str_and_init(&request_url, &init).map_err(js_error)?;
322 let request_headers = request.headers();
323 for (name, value) in &parts.headers {
324 let value = value
325 .to_str()
326 .map_err(|e| http_error(format!("invalid OTLP HTTP header {name}: {e}")))?;
327 request_headers
328 .set(name.as_str(), value)
329 .map_err(js_error)?;
330 }
331 request
332 };
333
334 let fetch_promise = if let Some(window) = web_sys::window() {
335 window.fetch_with_request(&request)
336 } else {
337 let global = js_sys::global();
338 let fetch = js_sys::Reflect::get(&global, &JsValue::from_str("fetch"))
339 .map_err(js_error)?
340 .dyn_into::<js_sys::Function>()
341 .map_err(js_error)?;
342 fetch.call1(&global, &request).map_err(js_error)?.into()
343 };
344 spawn_local(async move {
347 if let Err(error) = JsFuture::from(fetch_promise).await {
348 web_sys::console::warn_1(&JsValue::from_str(&format!(
349 "OpenTelemetry OTLP/HTTP export failed: {error:?}"
350 )));
351 }
352 });
353
354 HttpResponse::builder()
355 .status(202)
356 .body(Bytes::new())
357 .map_err(|e| http_error(e.to_string()))
358 }
359}
360
361#[cfg(target_arch = "wasm32")]
362fn js_error(value: JsValue) -> HttpError {
363 http_error(
364 value
365 .as_string()
366 .unwrap_or_else(|| format!("JavaScript error: {value:?}")),
367 )
368}
369
370#[cfg(target_arch = "wasm32")]
371fn http_error(message: impl Into<String>) -> HttpError {
372 Box::new(std::io::Error::other(message.into()))
373}
374
375fn build_tracer_provider(config: &OpenTelemetryConfig) -> Result<SdkTracerProvider> {
376 let exporter = match config.transport {
377 OtlpTransport::HttpBinary => {
378 #[cfg(not(target_arch = "wasm32"))]
379 install_rustls_crypto_provider();
380 let mut builder = SpanExporter::builder()
381 .with_http()
382 .with_protocol(Protocol::HttpBinary)
383 .with_timeout(config.timeout);
384 if let Some(endpoint) = &config.endpoint {
385 builder = builder.with_endpoint(endpoint.clone());
386 }
387 if !config.headers.is_empty() {
388 builder = builder.with_headers(config.headers.clone());
389 }
390 #[cfg(target_arch = "wasm32")]
391 {
392 builder = builder.with_http_client(WasmHttpClient);
393 }
394 builder
395 .build()
396 .map_err(|e| OpenTelemetryError::ExporterBuild(e.to_string()))?
397 }
398 #[cfg(not(target_arch = "wasm32"))]
399 OtlpTransport::Grpc => {
400 let mut builder = SpanExporter::builder()
401 .with_tonic()
402 .with_protocol(Protocol::Grpc)
403 .with_timeout(config.timeout);
404 if let Some(endpoint) = &config.endpoint {
405 builder = builder.with_endpoint(endpoint.clone());
406 }
407 if !config.headers.is_empty() {
408 builder = builder.with_metadata(build_grpc_metadata(&config.headers)?);
409 }
410 builder
411 .build()
412 .map_err(|e| OpenTelemetryError::ExporterBuild(e.to_string()))?
413 }
414 #[cfg(target_arch = "wasm32")]
415 OtlpTransport::Grpc => {
416 return Err(OpenTelemetryError::UnsupportedTransport { transport: "gRPC" });
417 }
418 };
419
420 let mut resource_attributes = vec![KeyValue::new("service.name", config.service_name.clone())];
421 if let Some(service_namespace) = &config.service_namespace {
422 resource_attributes.push(KeyValue::new(
423 "service.namespace",
424 service_namespace.clone(),
425 ));
426 }
427 if let Some(service_version) = &config.service_version {
428 resource_attributes.push(KeyValue::new("service.version", service_version.clone()));
429 }
430 for (key, value) in &config.resource_attributes {
431 resource_attributes.push(KeyValue::new(key.clone(), value.clone()));
432 }
433
434 let builder = SdkTracerProvider::builder()
438 .with_resource(
439 Resource::builder_empty()
440 .with_attributes(resource_attributes)
441 .build(),
442 )
443 .with_max_attributes_per_span(u32::MAX)
444 .with_max_attributes_per_event(u32::MAX);
445
446 #[cfg(not(target_arch = "wasm32"))]
447 {
448 if Handle::try_current().is_ok() {
449 Ok(builder.with_batch_exporter(exporter).build())
450 } else {
451 Ok(builder.with_simple_exporter(exporter).build())
452 }
453 }
454 #[cfg(target_arch = "wasm32")]
455 {
456 Ok(builder.with_simple_exporter(exporter).build())
457 }
458}
459
460#[cfg(not(target_arch = "wasm32"))]
461fn install_rustls_crypto_provider() {
462 let _ = rustls::crypto::ring::default_provider().install_default();
463}
464
465#[cfg(not(target_arch = "wasm32"))]
466fn build_grpc_metadata(headers: &HashMap<String, String>) -> Result<MetadataMap> {
467 let mut metadata = MetadataMap::new();
468 for (key, value) in headers {
469 let metadata_key = MetadataKey::from_bytes(key.as_bytes()).map_err(|e| {
470 OpenTelemetryError::InvalidGrpcHeader {
471 key: key.clone(),
472 message: e.to_string(),
473 }
474 })?;
475 let metadata_value = MetadataValue::try_from(value.as_str()).map_err(|e| {
476 OpenTelemetryError::InvalidGrpcHeader {
477 key: key.clone(),
478 message: e.to_string(),
479 }
480 })?;
481 metadata.insert(metadata_key, metadata_value);
482 }
483 Ok(metadata)
484}
485
486struct ActiveSpan {
487 span: Span,
488 span_context: SpanContext,
489}
490
491struct OtelEventProcessor {
492 active_spans: HashMap<Uuid, ActiveSpan>,
493 provider: SdkTracerProvider,
494 tracer: SdkTracer,
495}
496
497impl OtelEventProcessor {
498 fn new(provider: SdkTracerProvider, instrumentation_scope: String) -> Self {
499 let tracer = provider.tracer(instrumentation_scope);
500 Self {
501 active_spans: HashMap::new(),
502 provider,
503 tracer,
504 }
505 }
506
507 fn process(&mut self, event: &Event) {
508 match event.scope_category() {
509 Some(ScopeCategory::Start) => self.process_start(event),
510 Some(ScopeCategory::End) => self.process_end(event),
511 None => self.process_mark(event),
512 }
513 }
514
515 fn force_flush(&self) -> Result<()> {
516 self.provider
517 .force_flush()
518 .map_err(|e| OpenTelemetryError::Provider(e.to_string()))
519 }
520
521 fn shutdown(&self) -> Result<()> {
522 self.provider
523 .shutdown()
524 .map_err(|e| OpenTelemetryError::Provider(e.to_string()))
525 }
526
527 fn process_start(&mut self, event: &Event) {
528 let mut span = self
529 .tracer
530 .span_builder(span_name(event))
531 .with_kind(span_kind(event))
532 .with_start_time(to_system_time(*event.timestamp()))
533 .start_with_context(&self.tracer, &self.parent_context(event));
534 span.set_attributes(start_attributes(event));
535 let span_context = local_parent_span_context(span.span_context());
536 self.active_spans
537 .insert(event.uuid(), ActiveSpan { span, span_context });
538 }
539
540 fn process_end(&mut self, event: &Event) {
541 let Some(mut active_span) = self.active_spans.remove(&event.uuid()) else {
542 return;
543 };
544 active_span.span.set_attributes(end_attributes(event));
545 active_span
546 .span
547 .end_with_timestamp(to_system_time(*event.timestamp()));
548 }
549
550 fn process_mark(&mut self, event: &Event) {
551 let mark_name = event.name().to_string();
552 let timestamp = to_system_time(*event.timestamp());
553 let attributes = mark_attributes(event);
554
555 if let Some(parent_span) = self.find_parent_span_mut(event) {
556 parent_span
557 .span
558 .add_event_with_timestamp(mark_name, timestamp, attributes);
559 return;
560 }
561
562 let mut span = self
563 .tracer
564 .span_builder(format!("mark:{mark_name}"))
565 .with_kind(SpanKind::Internal)
566 .with_start_time(timestamp)
567 .start_with_context(&self.tracer, &self.parent_context(event));
568 let mut span_attributes = attributes;
569 span_attributes.push(KeyValue::new("nemo_flow.mark.orphan", true));
570 span.set_attributes(span_attributes);
571 span.end_with_timestamp(timestamp);
572 }
573
574 fn parent_context(&self, event: &Event) -> Context {
575 self.find_parent_span(event)
576 .map(|active_span| {
577 Context::new().with_remote_span_context(active_span.span_context.clone())
578 })
579 .unwrap_or_default()
580 }
581
582 fn parent_span_uuid(&self, event: &Event) -> Option<Uuid> {
583 event
584 .parent_uuid()
585 .filter(|uuid| self.active_spans.contains_key(uuid))
586 }
587
588 fn find_parent_span(&self, event: &Event) -> Option<&ActiveSpan> {
589 self.parent_span_uuid(event)
590 .and_then(|uuid| self.active_spans.get(&uuid))
591 }
592
593 fn find_parent_span_mut(&mut self, event: &Event) -> Option<&mut ActiveSpan> {
594 self.parent_span_uuid(event)
595 .and_then(|uuid| self.active_spans.get_mut(&uuid))
596 }
597}
598
599fn span_kind(event: &Event) -> SpanKind {
600 match semantic_scope_type(event) {
601 Some(ScopeType::Llm) => SpanKind::Client,
602 Some(
603 ScopeType::Tool | ScopeType::Retriever | ScopeType::Embedder | ScopeType::Reranker,
604 ) => SpanKind::Client,
605 _ => SpanKind::Internal,
606 }
607}
608
609fn span_name(event: &Event) -> String {
610 event.name().to_string()
611}
612
613fn semantic_scope_type(event: &Event) -> Option<ScopeType> {
614 event.scope_type()
615}
616
617fn scope_type_name(scope_type: Option<ScopeType>) -> &'static str {
618 match scope_type {
619 Some(ScopeType::Agent) => "agent",
620 Some(ScopeType::Function) => "function",
621 Some(ScopeType::Tool) => "tool",
622 Some(ScopeType::Llm) => "llm",
623 Some(ScopeType::Retriever) => "retriever",
624 Some(ScopeType::Embedder) => "embedder",
625 Some(ScopeType::Reranker) => "reranker",
626 Some(ScopeType::Guardrail) => "guardrail",
627 Some(ScopeType::Evaluator) => "evaluator",
628 Some(ScopeType::Custom) => "custom",
629 Some(ScopeType::Unknown) | None => "unknown",
630 }
631}
632
633fn start_attributes(event: &Event) -> Vec<KeyValue> {
634 let mut attributes = common_attributes(event);
635 let handle_attributes = event.attributes();
636 push_serialized(
637 &mut attributes,
638 "nemo_flow.handle_attributes_json",
639 handle_attributes,
640 );
641 push_serialized(&mut attributes, "nemo_flow.start.data_json", event.data());
642 push_serialized(
643 &mut attributes,
644 "nemo_flow.start.metadata_json",
645 event.metadata(),
646 );
647 push_serialized(&mut attributes, "nemo_flow.start.input_json", event.input());
648 attributes
649}
650
651fn end_attributes(event: &Event) -> Vec<KeyValue> {
652 let mut attributes = Vec::new();
653 push_serialized(&mut attributes, "nemo_flow.end.data_json", event.data());
654 push_serialized(
655 &mut attributes,
656 "nemo_flow.end.metadata_json",
657 event.metadata(),
658 );
659 push_serialized(&mut attributes, "nemo_flow.end.output_json", event.output());
660 attributes
661}
662
663fn mark_attributes(event: &Event) -> Vec<KeyValue> {
664 let handle_attributes = event.attributes();
665 let mut attributes = vec![
666 KeyValue::new("nemo_flow.mark.uuid", event.uuid().to_string()),
667 KeyValue::new(
668 "nemo_flow.mark.parent_uuid",
669 event
670 .parent_uuid()
671 .map(|uuid| uuid.to_string())
672 .unwrap_or_default(),
673 ),
674 ];
675 push_serialized(
676 &mut attributes,
677 "nemo_flow.mark.attributes_json",
678 handle_attributes,
679 );
680 push_serialized(&mut attributes, "nemo_flow.mark.data_json", event.data());
681 push_serialized(
682 &mut attributes,
683 "nemo_flow.mark.metadata_json",
684 event.metadata(),
685 );
686 attributes
687}
688
689fn common_attributes(event: &Event) -> Vec<KeyValue> {
690 let mut attributes = vec![
691 KeyValue::new("nemo_flow.uuid", event.uuid().to_string()),
692 KeyValue::new(
693 "nemo_flow.parent_uuid",
694 event
695 .parent_uuid()
696 .map(|uuid| uuid.to_string())
697 .unwrap_or_default(),
698 ),
699 KeyValue::new(
700 "nemo_flow.scope_type",
701 scope_type_name(semantic_scope_type(event)),
702 ),
703 ];
704
705 if let Some(model_name) = event.model_name() {
706 attributes.push(KeyValue::new(
707 "nemo_flow.model_name",
708 model_name.to_string(),
709 ));
710 }
711 if let Some(tool_call_id) = event.tool_call_id() {
712 attributes.push(KeyValue::new(
713 "nemo_flow.tool_call_id",
714 tool_call_id.to_string(),
715 ));
716 }
717
718 attributes
719}
720
721fn push_serialized<T: Serialize + ?Sized>(
722 attributes: &mut Vec<KeyValue>,
723 key: &'static str,
724 value: Option<&T>,
725) {
726 if let Some(value) = value
727 && let Ok(json) = serde_json::to_string(value)
728 {
729 attributes.push(KeyValue::new(key, json));
730 }
731}
732
733fn local_parent_span_context(span_context: &SpanContext) -> SpanContext {
734 SpanContext::new(
735 span_context.trace_id(),
736 span_context.span_id(),
737 span_context.trace_flags(),
738 false,
739 span_context.trace_state().clone(),
740 )
741}
742
743fn to_system_time(timestamp: DateTime<Utc>) -> SystemTime {
744 let seconds = timestamp.timestamp();
745 let nanos = timestamp.timestamp_subsec_nanos();
746 if seconds >= 0 {
747 UNIX_EPOCH + Duration::new(seconds as u64, nanos)
748 } else if nanos == 0 {
749 UNIX_EPOCH - Duration::new(seconds.unsigned_abs(), 0)
750 } else {
751 UNIX_EPOCH - Duration::new(seconds.unsigned_abs() - 1, 1_000_000_000 - nanos)
752 }
753}
754
755#[cfg(test)]
756#[path = "../../tests/unit/observability/otel_tests.rs"]
757mod tests;