Skip to main content

nemo_flow/observability/
otel.rs

1// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! OpenTelemetry subscriber support for NeMo Flow.
5//!
6//! This crate adapts NeMo Flow lifecycle events into OpenTelemetry trace spans:
7//!
8//! - scope/tool/LLM `Start` events open spans
9//! - matching `End` events close spans
10//! - `Mark` events become span events on the active parent span when possible
11//! - orphan marks fall back to zero-duration spans so they still reach OTLP
12//!
13//! The public API is intentionally small:
14//!
15//! - [`OpenTelemetryConfig`] configures the OTLP exporter and resource metadata
16//! - [`OpenTelemetrySubscriber`] exposes a NeMo Flow [`EventSubscriberFn`] and
17//!   convenience `register` / `deregister` / `force_flush` / `shutdown` methods
18
19use std::collections::HashMap;
20use std::sync::{Arc, Mutex};
21use std::time::{Duration, SystemTime, UNIX_EPOCH};
22
23use crate::api::event::Event;
24use crate::api::event::ScopeCategory;
25use crate::api::runtime::EventSubscriberFn;
26use crate::api::scope::ScopeType;
27use crate::api::subscriber::{deregister_subscriber, register_subscriber};
28use crate::error::FlowError;
29use chrono::{DateTime, Utc};
30use opentelemetry::trace::{
31    Span as _, SpanContext, SpanKind, TraceContextExt, Tracer, TracerProvider as _,
32};
33use opentelemetry::{Context, KeyValue};
34use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig, WithHttpConfig};
35use opentelemetry_sdk::Resource;
36use opentelemetry_sdk::trace::{SdkTracer, SdkTracerProvider, Span};
37use serde::Serialize;
38use uuid::Uuid;
39
40#[cfg(target_arch = "wasm32")]
41use async_trait::async_trait;
42#[cfg(target_arch = "wasm32")]
43use opentelemetry_http::{
44    Bytes, HttpClient, HttpError, Request as HttpRequest, Response as HttpResponse,
45};
46#[cfg(not(target_arch = "wasm32"))]
47use opentelemetry_otlp::WithTonicConfig;
48#[cfg(not(target_arch = "wasm32"))]
49use tokio::runtime::Handle;
50#[cfg(not(target_arch = "wasm32"))]
51use tonic::metadata::{MetadataKey, MetadataMap, MetadataValue};
52#[cfg(target_arch = "wasm32")]
53use wasm_bindgen::{JsCast, JsValue};
54#[cfg(target_arch = "wasm32")]
55use wasm_bindgen_futures::{JsFuture, spawn_local};
56#[cfg(target_arch = "wasm32")]
57use web_sys::{Request as WebRequest, RequestInit};
58
59/// Result type for the OpenTelemetry subscriber crate.
60pub type Result<T> = std::result::Result<T, OpenTelemetryError>;
61
62/// Errors produced while configuring or operating the OpenTelemetry subscriber.
63#[derive(Debug, thiserror::Error)]
64pub enum OpenTelemetryError {
65    /// The tonic gRPC exporter requires an active Tokio runtime.
66    #[error("the OTLP gRPC exporter requires an active Tokio runtime")]
67    MissingTokioRuntime,
68    /// The requested transport is not available on this target.
69    #[error("the OTLP {transport} transport is not supported on this target")]
70    UnsupportedTransport {
71        /// Human-readable transport label used in the error message.
72        transport: &'static str,
73    },
74    /// Failed to parse a configured gRPC metadata header.
75    #[error("invalid OTLP gRPC header {key:?}: {message}")]
76    InvalidGrpcHeader {
77        /// Header name that failed to parse.
78        key: String,
79        /// Parser failure message.
80        message: String,
81    },
82    /// Failed to build the OTLP exporter.
83    #[error("failed to build the OTLP exporter: {0}")]
84    ExporterBuild(String),
85    /// The underlying tracer provider returned an error.
86    #[error("OpenTelemetry tracer provider error: {0}")]
87    Provider(String),
88    /// Registration errors from the core runtime.
89    #[error(transparent)]
90    Core(#[from] FlowError),
91}
92
93/// Supported OTLP trace transports.
94#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
95pub enum OtlpTransport {
96    /// OTLP/HTTP protobuf, typically `http://host:4318/v1/traces`.
97    #[default]
98    HttpBinary,
99    /// OTLP/gRPC, typically `http://host:4317`.
100    Grpc,
101}
102
103/// Configuration for the OpenTelemetry subscriber.
104#[derive(Debug, Clone)]
105pub struct OpenTelemetryConfig {
106    endpoint: Option<String>,
107    headers: HashMap<String, String>,
108    resource_attributes: HashMap<String, String>,
109    service_name: String,
110    service_namespace: Option<String>,
111    service_version: Option<String>,
112    instrumentation_scope: String,
113    timeout: Duration,
114    transport: OtlpTransport,
115}
116
117impl Default for OpenTelemetryConfig {
118    fn default() -> Self {
119        Self {
120            endpoint: None,
121            headers: HashMap::new(),
122            resource_attributes: HashMap::new(),
123            service_name: "nemo-flow".to_string(),
124            service_namespace: None,
125            service_version: None,
126            instrumentation_scope: "nemo-flow-otel".to_string(),
127            timeout: Duration::from_secs(3),
128            transport: OtlpTransport::HttpBinary,
129        }
130    }
131}
132
133impl OpenTelemetryConfig {
134    /// Creates an HTTP OTLP config for the given service name.
135    pub fn http_binary(service_name: impl Into<String>) -> Self {
136        Self {
137            service_name: service_name.into(),
138            transport: OtlpTransport::HttpBinary,
139            ..Self::default()
140        }
141    }
142
143    /// Creates a gRPC OTLP config for the given service name.
144    pub fn grpc(service_name: impl Into<String>) -> Self {
145        Self {
146            service_name: service_name.into(),
147            transport: OtlpTransport::Grpc,
148            ..Self::default()
149        }
150    }
151
152    /// Overrides the OTLP endpoint. If unset, exporter defaults and OTEL_* env vars apply.
153    pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
154        self.endpoint = Some(endpoint.into());
155        self
156    }
157
158    /// Adds a header/metadata entry for the exporter.
159    pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
160        self.headers.insert(key.into(), value.into());
161        self
162    }
163
164    /// Adds a resource attribute as a string key/value pair.
165    pub fn with_resource_attribute(
166        mut self,
167        key: impl Into<String>,
168        value: impl Into<String>,
169    ) -> Self {
170        self.resource_attributes.insert(key.into(), value.into());
171        self
172    }
173
174    /// Sets the OTLP request timeout.
175    pub fn with_timeout(mut self, timeout: Duration) -> Self {
176        self.timeout = timeout;
177        self
178    }
179
180    /// Sets the service namespace resource attribute.
181    pub fn with_service_namespace(mut self, namespace: impl Into<String>) -> Self {
182        self.service_namespace = Some(namespace.into());
183        self
184    }
185
186    /// Sets the service version resource attribute.
187    pub fn with_service_version(mut self, version: impl Into<String>) -> Self {
188        self.service_version = Some(version.into());
189        self
190    }
191
192    /// Sets the instrumentation scope name used for emitted spans.
193    pub fn with_instrumentation_scope(mut self, scope: impl Into<String>) -> Self {
194        self.instrumentation_scope = scope.into();
195        self
196    }
197}
198
199/// OpenTelemetry-backed NeMo Flow subscriber.
200#[derive(Clone)]
201pub struct OpenTelemetrySubscriber {
202    inner: Arc<Inner>,
203}
204
205struct Inner {
206    processor: Arc<Mutex<OtelEventProcessor>>,
207    subscriber: EventSubscriberFn,
208}
209
210impl OpenTelemetrySubscriber {
211    /// Builds a subscriber backed by a new OTLP tracer provider.
212    pub fn new(config: OpenTelemetryConfig) -> Result<Self> {
213        #[cfg(not(target_arch = "wasm32"))]
214        if config.transport == OtlpTransport::Grpc && tokio::runtime::Handle::try_current().is_err()
215        {
216            return Err(OpenTelemetryError::MissingTokioRuntime);
217        }
218        #[cfg(target_arch = "wasm32")]
219        if config.transport == OtlpTransport::Grpc {
220            return Err(OpenTelemetryError::UnsupportedTransport { transport: "gRPC" });
221        }
222
223        let provider = build_tracer_provider(&config)?;
224        Ok(Self::from_tracer_provider_with_scope(
225            provider,
226            config.instrumentation_scope,
227        ))
228    }
229
230    /// Builds a subscriber from an already-configured tracer provider.
231    pub fn from_tracer_provider(
232        provider: SdkTracerProvider,
233        instrumentation_scope: impl Into<String>,
234    ) -> Self {
235        Self::from_tracer_provider_with_scope(provider, instrumentation_scope.into())
236    }
237
238    fn from_tracer_provider_with_scope(
239        provider: SdkTracerProvider,
240        instrumentation_scope: String,
241    ) -> Self {
242        let processor = Arc::new(Mutex::new(OtelEventProcessor::new(
243            provider,
244            instrumentation_scope,
245        )));
246        let processor_for_callback = Arc::clone(&processor);
247        let subscriber: EventSubscriberFn = Arc::new(move |event: &Event| {
248            let Ok(mut guard) = processor_for_callback.lock() else {
249                // Observability should not take down the host process if the
250                // subscriber state was previously poisoned.
251                return;
252            };
253            guard.process(event);
254        });
255
256        Self {
257            inner: Arc::new(Inner {
258                processor,
259                subscriber,
260            }),
261        }
262    }
263
264    /// Returns the raw NeMo Flow subscriber callback for custom registration flows.
265    pub fn subscriber(&self) -> EventSubscriberFn {
266        Arc::clone(&self.inner.subscriber)
267    }
268
269    /// Registers this subscriber globally with the NeMo Flow runtime.
270    pub fn register(&self, name: &str) -> Result<()> {
271        register_subscriber(name, self.subscriber()).map_err(Into::into)
272    }
273
274    /// Deregisters a previously-registered global subscriber by name.
275    pub fn deregister(&self, name: &str) -> Result<bool> {
276        deregister_subscriber(name).map_err(Into::into)
277    }
278
279    /// Flushes finished spans through the underlying tracer provider.
280    pub fn force_flush(&self) -> Result<()> {
281        let guard = self.inner.processor.lock().map_err(|_| {
282            OpenTelemetryError::Provider("the subscriber state lock was poisoned".to_string())
283        })?;
284        guard.force_flush()
285    }
286
287    /// Shuts down the underlying tracer provider.
288    ///
289    /// Call `deregister(...)` first if the subscriber is still registered with NeMo Flow.
290    pub fn shutdown(&self) -> Result<()> {
291        let guard = self.inner.processor.lock().map_err(|_| {
292            OpenTelemetryError::Provider("the subscriber state lock was poisoned".to_string())
293        })?;
294        guard.shutdown()
295    }
296}
297
298#[cfg(target_arch = "wasm32")]
299#[derive(Debug, Clone, Copy, Default)]
300struct WasmHttpClient;
301
302#[cfg(target_arch = "wasm32")]
303#[async_trait]
304impl HttpClient for WasmHttpClient {
305    async fn send_bytes(
306        &self,
307        request: HttpRequest<Bytes>,
308    ) -> std::result::Result<HttpResponse<Bytes>, HttpError> {
309        let (parts, body) = request.into_parts();
310
311        let request = {
312            let request_url = parts.uri.to_string();
313            let init = RequestInit::new();
314            init.set_method(parts.method.as_str());
315            if !body.is_empty() {
316                let body_bytes = js_sys::Uint8Array::from(body.as_ref());
317                init.set_body_opt_u8_array(Some(&body_bytes));
318            }
319
320            let request =
321                WebRequest::new_with_str_and_init(&request_url, &init).map_err(js_error)?;
322            let request_headers = request.headers();
323            for (name, value) in &parts.headers {
324                let value = value
325                    .to_str()
326                    .map_err(|e| http_error(format!("invalid OTLP HTTP header {name}: {e}")))?;
327                request_headers
328                    .set(name.as_str(), value)
329                    .map_err(js_error)?;
330            }
331            request
332        };
333
334        let fetch_promise = if let Some(window) = web_sys::window() {
335            window.fetch_with_request(&request)
336        } else {
337            let global = js_sys::global();
338            let fetch = js_sys::Reflect::get(&global, &JsValue::from_str("fetch"))
339                .map_err(js_error)?
340                .dyn_into::<js_sys::Function>()
341                .map_err(js_error)?;
342            fetch.call1(&global, &request).map_err(js_error)?.into()
343        };
344        // Waiting on the fetch promise from a synchronous wasm call stack can deadlock
345        // Node/browser event processing, so dispatch the request asynchronously.
346        spawn_local(async move {
347            if let Err(error) = JsFuture::from(fetch_promise).await {
348                web_sys::console::warn_1(&JsValue::from_str(&format!(
349                    "OpenTelemetry OTLP/HTTP export failed: {error:?}"
350                )));
351            }
352        });
353
354        HttpResponse::builder()
355            .status(202)
356            .body(Bytes::new())
357            .map_err(|e| http_error(e.to_string()))
358    }
359}
360
361#[cfg(target_arch = "wasm32")]
362fn js_error(value: JsValue) -> HttpError {
363    http_error(
364        value
365            .as_string()
366            .unwrap_or_else(|| format!("JavaScript error: {value:?}")),
367    )
368}
369
370#[cfg(target_arch = "wasm32")]
371fn http_error(message: impl Into<String>) -> HttpError {
372    Box::new(std::io::Error::other(message.into()))
373}
374
375fn build_tracer_provider(config: &OpenTelemetryConfig) -> Result<SdkTracerProvider> {
376    let exporter = match config.transport {
377        OtlpTransport::HttpBinary => {
378            let mut builder = SpanExporter::builder()
379                .with_http()
380                .with_protocol(Protocol::HttpBinary)
381                .with_timeout(config.timeout);
382            if let Some(endpoint) = &config.endpoint {
383                builder = builder.with_endpoint(endpoint.clone());
384            }
385            if !config.headers.is_empty() {
386                builder = builder.with_headers(config.headers.clone());
387            }
388            #[cfg(target_arch = "wasm32")]
389            {
390                builder = builder.with_http_client(WasmHttpClient);
391            }
392            builder
393                .build()
394                .map_err(|e| OpenTelemetryError::ExporterBuild(e.to_string()))?
395        }
396        #[cfg(not(target_arch = "wasm32"))]
397        OtlpTransport::Grpc => {
398            let mut builder = SpanExporter::builder()
399                .with_tonic()
400                .with_protocol(Protocol::Grpc)
401                .with_timeout(config.timeout);
402            if let Some(endpoint) = &config.endpoint {
403                builder = builder.with_endpoint(endpoint.clone());
404            }
405            if !config.headers.is_empty() {
406                builder = builder.with_metadata(build_grpc_metadata(&config.headers)?);
407            }
408            builder
409                .build()
410                .map_err(|e| OpenTelemetryError::ExporterBuild(e.to_string()))?
411        }
412        #[cfg(target_arch = "wasm32")]
413        OtlpTransport::Grpc => {
414            return Err(OpenTelemetryError::UnsupportedTransport { transport: "gRPC" });
415        }
416    };
417
418    let mut resource_attributes = vec![KeyValue::new("service.name", config.service_name.clone())];
419    if let Some(service_namespace) = &config.service_namespace {
420        resource_attributes.push(KeyValue::new(
421            "service.namespace",
422            service_namespace.clone(),
423        ));
424    }
425    if let Some(service_version) = &config.service_version {
426        resource_attributes.push(KeyValue::new("service.version", service_version.clone()));
427    }
428    for (key, value) in &config.resource_attributes {
429        resource_attributes.push(KeyValue::new(key.clone(), value.clone()));
430    }
431
432    // Disable per-span attribute caps. Consumers may emit large attribute
433    // sets on long-running spans; the OTel SDK default (128) silently drops
434    // attributes added last in the span's lifecycle.
435    let builder = SdkTracerProvider::builder()
436        .with_resource(
437            Resource::builder_empty()
438                .with_attributes(resource_attributes)
439                .build(),
440        )
441        .with_max_attributes_per_span(u32::MAX)
442        .with_max_attributes_per_event(u32::MAX);
443
444    #[cfg(not(target_arch = "wasm32"))]
445    {
446        if Handle::try_current().is_ok() {
447            Ok(builder.with_batch_exporter(exporter).build())
448        } else {
449            Ok(builder.with_simple_exporter(exporter).build())
450        }
451    }
452    #[cfg(target_arch = "wasm32")]
453    {
454        Ok(builder.with_simple_exporter(exporter).build())
455    }
456}
457
458#[cfg(not(target_arch = "wasm32"))]
459fn build_grpc_metadata(headers: &HashMap<String, String>) -> Result<MetadataMap> {
460    let mut metadata = MetadataMap::new();
461    for (key, value) in headers {
462        let metadata_key = MetadataKey::from_bytes(key.as_bytes()).map_err(|e| {
463            OpenTelemetryError::InvalidGrpcHeader {
464                key: key.clone(),
465                message: e.to_string(),
466            }
467        })?;
468        let metadata_value = MetadataValue::try_from(value.as_str()).map_err(|e| {
469            OpenTelemetryError::InvalidGrpcHeader {
470                key: key.clone(),
471                message: e.to_string(),
472            }
473        })?;
474        metadata.insert(metadata_key, metadata_value);
475    }
476    Ok(metadata)
477}
478
479struct ActiveSpan {
480    span: Span,
481    span_context: SpanContext,
482}
483
484struct OtelEventProcessor {
485    active_spans: HashMap<Uuid, ActiveSpan>,
486    provider: SdkTracerProvider,
487    tracer: SdkTracer,
488}
489
490impl OtelEventProcessor {
491    fn new(provider: SdkTracerProvider, instrumentation_scope: String) -> Self {
492        let tracer = provider.tracer(instrumentation_scope);
493        Self {
494            active_spans: HashMap::new(),
495            provider,
496            tracer,
497        }
498    }
499
500    fn process(&mut self, event: &Event) {
501        match event.scope_category() {
502            Some(ScopeCategory::Start) => self.process_start(event),
503            Some(ScopeCategory::End) => self.process_end(event),
504            None => self.process_mark(event),
505        }
506    }
507
508    fn force_flush(&self) -> Result<()> {
509        self.provider
510            .force_flush()
511            .map_err(|e| OpenTelemetryError::Provider(e.to_string()))
512    }
513
514    fn shutdown(&self) -> Result<()> {
515        self.provider
516            .shutdown()
517            .map_err(|e| OpenTelemetryError::Provider(e.to_string()))
518    }
519
520    fn process_start(&mut self, event: &Event) {
521        let mut span = self
522            .tracer
523            .span_builder(span_name(event))
524            .with_kind(span_kind(event))
525            .with_start_time(to_system_time(*event.timestamp()))
526            .start_with_context(&self.tracer, &self.parent_context(event));
527        span.set_attributes(start_attributes(event));
528        let span_context = local_parent_span_context(span.span_context());
529        self.active_spans
530            .insert(event.uuid(), ActiveSpan { span, span_context });
531    }
532
533    fn process_end(&mut self, event: &Event) {
534        let Some(mut active_span) = self.active_spans.remove(&event.uuid()) else {
535            return;
536        };
537        active_span.span.set_attributes(end_attributes(event));
538        active_span
539            .span
540            .end_with_timestamp(to_system_time(*event.timestamp()));
541    }
542
543    fn process_mark(&mut self, event: &Event) {
544        let mark_name = event.name().to_string();
545        let timestamp = to_system_time(*event.timestamp());
546        let attributes = mark_attributes(event);
547
548        if let Some(parent_span) = self.find_parent_span_mut(event) {
549            parent_span
550                .span
551                .add_event_with_timestamp(mark_name, timestamp, attributes);
552            return;
553        }
554
555        let mut span = self
556            .tracer
557            .span_builder(format!("mark:{mark_name}"))
558            .with_kind(SpanKind::Internal)
559            .with_start_time(timestamp)
560            .start_with_context(&self.tracer, &self.parent_context(event));
561        let mut span_attributes = attributes;
562        span_attributes.push(KeyValue::new("nemo_flow.mark.orphan", true));
563        span.set_attributes(span_attributes);
564        span.end_with_timestamp(timestamp);
565    }
566
567    fn parent_context(&self, event: &Event) -> Context {
568        self.find_parent_span(event)
569            .map(|active_span| {
570                Context::new().with_remote_span_context(active_span.span_context.clone())
571            })
572            .unwrap_or_default()
573    }
574
575    fn parent_span_uuid(&self, event: &Event) -> Option<Uuid> {
576        event
577            .parent_uuid()
578            .filter(|uuid| self.active_spans.contains_key(uuid))
579    }
580
581    fn find_parent_span(&self, event: &Event) -> Option<&ActiveSpan> {
582        self.parent_span_uuid(event)
583            .and_then(|uuid| self.active_spans.get(&uuid))
584    }
585
586    fn find_parent_span_mut(&mut self, event: &Event) -> Option<&mut ActiveSpan> {
587        self.parent_span_uuid(event)
588            .and_then(|uuid| self.active_spans.get_mut(&uuid))
589    }
590}
591
592fn span_kind(event: &Event) -> SpanKind {
593    match semantic_scope_type(event) {
594        Some(ScopeType::Llm) => SpanKind::Client,
595        Some(
596            ScopeType::Tool | ScopeType::Retriever | ScopeType::Embedder | ScopeType::Reranker,
597        ) => SpanKind::Client,
598        _ => SpanKind::Internal,
599    }
600}
601
602fn span_name(event: &Event) -> String {
603    event.name().to_string()
604}
605
606fn semantic_scope_type(event: &Event) -> Option<ScopeType> {
607    event.scope_type()
608}
609
610fn scope_type_name(scope_type: Option<ScopeType>) -> &'static str {
611    match scope_type {
612        Some(ScopeType::Agent) => "agent",
613        Some(ScopeType::Function) => "function",
614        Some(ScopeType::Tool) => "tool",
615        Some(ScopeType::Llm) => "llm",
616        Some(ScopeType::Retriever) => "retriever",
617        Some(ScopeType::Embedder) => "embedder",
618        Some(ScopeType::Reranker) => "reranker",
619        Some(ScopeType::Guardrail) => "guardrail",
620        Some(ScopeType::Evaluator) => "evaluator",
621        Some(ScopeType::Custom) => "custom",
622        Some(ScopeType::Unknown) | None => "unknown",
623    }
624}
625
626fn start_attributes(event: &Event) -> Vec<KeyValue> {
627    let mut attributes = common_attributes(event);
628    let handle_attributes = event.attributes();
629    push_serialized(
630        &mut attributes,
631        "nemo_flow.handle_attributes_json",
632        handle_attributes,
633    );
634    push_serialized(&mut attributes, "nemo_flow.start.data_json", event.data());
635    push_serialized(
636        &mut attributes,
637        "nemo_flow.start.metadata_json",
638        event.metadata(),
639    );
640    push_serialized(&mut attributes, "nemo_flow.start.input_json", event.input());
641    attributes
642}
643
644fn end_attributes(event: &Event) -> Vec<KeyValue> {
645    let mut attributes = Vec::new();
646    push_serialized(&mut attributes, "nemo_flow.end.data_json", event.data());
647    push_serialized(
648        &mut attributes,
649        "nemo_flow.end.metadata_json",
650        event.metadata(),
651    );
652    push_serialized(&mut attributes, "nemo_flow.end.output_json", event.output());
653    attributes
654}
655
656fn mark_attributes(event: &Event) -> Vec<KeyValue> {
657    let handle_attributes = event.attributes();
658    let mut attributes = vec![
659        KeyValue::new("nemo_flow.mark.uuid", event.uuid().to_string()),
660        KeyValue::new(
661            "nemo_flow.mark.parent_uuid",
662            event
663                .parent_uuid()
664                .map(|uuid| uuid.to_string())
665                .unwrap_or_default(),
666        ),
667    ];
668    push_serialized(
669        &mut attributes,
670        "nemo_flow.mark.attributes_json",
671        handle_attributes,
672    );
673    push_serialized(&mut attributes, "nemo_flow.mark.data_json", event.data());
674    push_serialized(
675        &mut attributes,
676        "nemo_flow.mark.metadata_json",
677        event.metadata(),
678    );
679    attributes
680}
681
682fn common_attributes(event: &Event) -> Vec<KeyValue> {
683    let mut attributes = vec![
684        KeyValue::new("nemo_flow.uuid", event.uuid().to_string()),
685        KeyValue::new(
686            "nemo_flow.parent_uuid",
687            event
688                .parent_uuid()
689                .map(|uuid| uuid.to_string())
690                .unwrap_or_default(),
691        ),
692        KeyValue::new(
693            "nemo_flow.scope_type",
694            scope_type_name(semantic_scope_type(event)),
695        ),
696    ];
697
698    if let Some(model_name) = event.model_name() {
699        attributes.push(KeyValue::new(
700            "nemo_flow.model_name",
701            model_name.to_string(),
702        ));
703    }
704    if let Some(tool_call_id) = event.tool_call_id() {
705        attributes.push(KeyValue::new(
706            "nemo_flow.tool_call_id",
707            tool_call_id.to_string(),
708        ));
709    }
710
711    attributes
712}
713
714fn push_serialized<T: Serialize + ?Sized>(
715    attributes: &mut Vec<KeyValue>,
716    key: &'static str,
717    value: Option<&T>,
718) {
719    if let Some(value) = value
720        && let Ok(json) = serde_json::to_string(value)
721    {
722        attributes.push(KeyValue::new(key, json));
723    }
724}
725
726fn local_parent_span_context(span_context: &SpanContext) -> SpanContext {
727    SpanContext::new(
728        span_context.trace_id(),
729        span_context.span_id(),
730        span_context.trace_flags(),
731        false,
732        span_context.trace_state().clone(),
733    )
734}
735
736fn to_system_time(timestamp: DateTime<Utc>) -> SystemTime {
737    let seconds = timestamp.timestamp();
738    let nanos = timestamp.timestamp_subsec_nanos();
739    if seconds >= 0 {
740        UNIX_EPOCH + Duration::new(seconds as u64, nanos)
741    } else if nanos == 0 {
742        UNIX_EPOCH - Duration::new(seconds.unsigned_abs(), 0)
743    } else {
744        UNIX_EPOCH - Duration::new(seconds.unsigned_abs() - 1, 1_000_000_000 - nanos)
745    }
746}
747
748#[cfg(test)]
749#[path = "../../tests/unit/observability/otel_tests.rs"]
750mod tests;