Skip to main content

nemo_flow/observability/
otel.rs

1// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! OpenTelemetry subscriber support for NeMo Flow.
5//!
6//! This crate adapts NeMo Flow lifecycle events into OpenTelemetry trace spans:
7//!
8//! - scope/tool/LLM `Start` events open spans
9//! - matching `End` events close spans
10//! - `Mark` events become span events on the active parent span when possible
11//! - orphan marks fall back to zero-duration spans so they still reach OTLP
12//!
13//! The public API is intentionally small:
14//!
15//! - [`OpenTelemetryConfig`] configures the OTLP exporter and resource metadata
16//! - [`OpenTelemetrySubscriber`] exposes a NeMo Flow [`EventSubscriberFn`] and
17//!   convenience `register` / `deregister` / `force_flush` / `shutdown` methods
18
19use std::collections::HashMap;
20use std::sync::{Arc, Mutex};
21use std::time::{Duration, SystemTime, UNIX_EPOCH};
22
23use crate::api::event::Event;
24use crate::api::event::ScopeCategory;
25use crate::api::runtime::EventSubscriberFn;
26use crate::api::scope::ScopeType;
27use crate::api::subscriber::{deregister_subscriber, register_subscriber};
28use crate::error::FlowError;
29use chrono::{DateTime, Utc};
30use opentelemetry::trace::{
31    Span as _, SpanContext, SpanKind, TraceContextExt, Tracer, TracerProvider as _,
32};
33use opentelemetry::{Context, KeyValue};
34use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig, WithHttpConfig};
35use opentelemetry_sdk::Resource;
36use opentelemetry_sdk::trace::{SdkTracer, SdkTracerProvider, Span};
37use serde::Serialize;
38use uuid::Uuid;
39
40#[cfg(target_arch = "wasm32")]
41use async_trait::async_trait;
42#[cfg(target_arch = "wasm32")]
43use opentelemetry_http::{
44    Bytes, HttpClient, HttpError, Request as HttpRequest, Response as HttpResponse,
45};
46#[cfg(not(target_arch = "wasm32"))]
47use opentelemetry_otlp::WithTonicConfig;
48#[cfg(not(target_arch = "wasm32"))]
49use tokio::runtime::Handle;
50#[cfg(not(target_arch = "wasm32"))]
51use tonic::metadata::{MetadataKey, MetadataMap, MetadataValue};
52#[cfg(target_arch = "wasm32")]
53use wasm_bindgen::{JsCast, JsValue};
54#[cfg(target_arch = "wasm32")]
55use wasm_bindgen_futures::{JsFuture, spawn_local};
56#[cfg(target_arch = "wasm32")]
57use web_sys::{Request as WebRequest, RequestInit};
58
59/// Result type for the OpenTelemetry subscriber crate.
60pub type Result<T> = std::result::Result<T, OpenTelemetryError>;
61
62/// Errors produced while configuring or operating the OpenTelemetry subscriber.
63#[derive(Debug, thiserror::Error)]
64pub enum OpenTelemetryError {
65    /// The tonic gRPC exporter requires an active Tokio runtime.
66    #[error("the OTLP gRPC exporter requires an active Tokio runtime")]
67    MissingTokioRuntime,
68    /// The requested transport is not available on this target.
69    #[error("the OTLP {transport} transport is not supported on this target")]
70    UnsupportedTransport {
71        /// Human-readable transport label used in the error message.
72        transport: &'static str,
73    },
74    /// Failed to parse a configured gRPC metadata header.
75    #[error("invalid OTLP gRPC header {key:?}: {message}")]
76    InvalidGrpcHeader {
77        /// Header name that failed to parse.
78        key: String,
79        /// Parser failure message.
80        message: String,
81    },
82    /// Failed to build the OTLP exporter.
83    #[error("failed to build the OTLP exporter: {0}")]
84    ExporterBuild(String),
85    /// The underlying tracer provider returned an error.
86    #[error("OpenTelemetry tracer provider error: {0}")]
87    Provider(String),
88    /// Registration errors from the core runtime.
89    #[error(transparent)]
90    Core(#[from] FlowError),
91}
92
93/// Supported OTLP trace transports.
94#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
95pub enum OtlpTransport {
96    /// OTLP/HTTP protobuf, typically `http://host:4318/v1/traces`.
97    #[default]
98    HttpBinary,
99    /// OTLP/gRPC, typically `http://host:4317`.
100    Grpc,
101}
102
103/// Configuration for the OpenTelemetry subscriber.
104#[derive(Debug, Clone)]
105pub struct OpenTelemetryConfig {
106    endpoint: Option<String>,
107    headers: HashMap<String, String>,
108    resource_attributes: HashMap<String, String>,
109    service_name: String,
110    service_namespace: Option<String>,
111    service_version: Option<String>,
112    instrumentation_scope: String,
113    timeout: Duration,
114    transport: OtlpTransport,
115}
116
117impl Default for OpenTelemetryConfig {
118    fn default() -> Self {
119        Self {
120            endpoint: None,
121            headers: HashMap::new(),
122            resource_attributes: HashMap::new(),
123            service_name: "nemo-flow".to_string(),
124            service_namespace: None,
125            service_version: None,
126            instrumentation_scope: "nemo-flow-otel".to_string(),
127            timeout: Duration::from_secs(3),
128            transport: OtlpTransport::HttpBinary,
129        }
130    }
131}
132
133impl OpenTelemetryConfig {
134    /// Creates an HTTP OTLP config for the given service name.
135    pub fn http_binary(service_name: impl Into<String>) -> Self {
136        Self {
137            service_name: service_name.into(),
138            transport: OtlpTransport::HttpBinary,
139            ..Self::default()
140        }
141    }
142
143    /// Creates a gRPC OTLP config for the given service name.
144    pub fn grpc(service_name: impl Into<String>) -> Self {
145        Self {
146            service_name: service_name.into(),
147            transport: OtlpTransport::Grpc,
148            ..Self::default()
149        }
150    }
151
152    /// Overrides the OTLP endpoint. If unset, exporter defaults and OTEL_* env vars apply.
153    pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
154        self.endpoint = Some(endpoint.into());
155        self
156    }
157
158    /// Adds a header/metadata entry for the exporter.
159    pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
160        self.headers.insert(key.into(), value.into());
161        self
162    }
163
164    /// Adds a resource attribute as a string key/value pair.
165    pub fn with_resource_attribute(
166        mut self,
167        key: impl Into<String>,
168        value: impl Into<String>,
169    ) -> Self {
170        self.resource_attributes.insert(key.into(), value.into());
171        self
172    }
173
174    /// Sets the OTLP request timeout.
175    pub fn with_timeout(mut self, timeout: Duration) -> Self {
176        self.timeout = timeout;
177        self
178    }
179
180    /// Sets the service namespace resource attribute.
181    pub fn with_service_namespace(mut self, namespace: impl Into<String>) -> Self {
182        self.service_namespace = Some(namespace.into());
183        self
184    }
185
186    /// Sets the service version resource attribute.
187    pub fn with_service_version(mut self, version: impl Into<String>) -> Self {
188        self.service_version = Some(version.into());
189        self
190    }
191
192    /// Sets the instrumentation scope name used for emitted spans.
193    pub fn with_instrumentation_scope(mut self, scope: impl Into<String>) -> Self {
194        self.instrumentation_scope = scope.into();
195        self
196    }
197}
198
199/// OpenTelemetry-backed NeMo Flow subscriber.
200#[derive(Clone)]
201pub struct OpenTelemetrySubscriber {
202    inner: Arc<Inner>,
203}
204
205struct Inner {
206    processor: Arc<Mutex<OtelEventProcessor>>,
207    subscriber: EventSubscriberFn,
208}
209
210impl OpenTelemetrySubscriber {
211    /// Builds a subscriber backed by a new OTLP tracer provider.
212    pub fn new(config: OpenTelemetryConfig) -> Result<Self> {
213        #[cfg(not(target_arch = "wasm32"))]
214        if config.transport == OtlpTransport::Grpc && tokio::runtime::Handle::try_current().is_err()
215        {
216            return Err(OpenTelemetryError::MissingTokioRuntime);
217        }
218        #[cfg(target_arch = "wasm32")]
219        if config.transport == OtlpTransport::Grpc {
220            return Err(OpenTelemetryError::UnsupportedTransport { transport: "gRPC" });
221        }
222
223        let provider = build_tracer_provider(&config)?;
224        Ok(Self::from_tracer_provider_with_scope(
225            provider,
226            config.instrumentation_scope,
227        ))
228    }
229
230    /// Builds a subscriber from an already-configured tracer provider.
231    pub fn from_tracer_provider(
232        provider: SdkTracerProvider,
233        instrumentation_scope: impl Into<String>,
234    ) -> Self {
235        Self::from_tracer_provider_with_scope(provider, instrumentation_scope.into())
236    }
237
238    fn from_tracer_provider_with_scope(
239        provider: SdkTracerProvider,
240        instrumentation_scope: String,
241    ) -> Self {
242        let processor = Arc::new(Mutex::new(OtelEventProcessor::new(
243            provider,
244            instrumentation_scope,
245        )));
246        let processor_for_callback = Arc::clone(&processor);
247        let subscriber: EventSubscriberFn = Arc::new(move |event: &Event| {
248            let Ok(mut guard) = processor_for_callback.lock() else {
249                // Observability should not take down the host process if the
250                // subscriber state was previously poisoned.
251                return;
252            };
253            guard.process(event);
254        });
255
256        Self {
257            inner: Arc::new(Inner {
258                processor,
259                subscriber,
260            }),
261        }
262    }
263
264    /// Returns the raw NeMo Flow subscriber callback for custom registration flows.
265    pub fn subscriber(&self) -> EventSubscriberFn {
266        Arc::clone(&self.inner.subscriber)
267    }
268
269    /// Registers this subscriber globally with the NeMo Flow runtime.
270    pub fn register(&self, name: &str) -> Result<()> {
271        register_subscriber(name, self.subscriber()).map_err(Into::into)
272    }
273
274    /// Deregisters a previously-registered global subscriber by name.
275    pub fn deregister(&self, name: &str) -> Result<bool> {
276        deregister_subscriber(name).map_err(Into::into)
277    }
278
279    /// Flushes finished spans through the underlying tracer provider.
280    pub fn force_flush(&self) -> Result<()> {
281        let guard = self.inner.processor.lock().map_err(|_| {
282            OpenTelemetryError::Provider("the subscriber state lock was poisoned".to_string())
283        })?;
284        guard.force_flush()
285    }
286
287    /// Shuts down the underlying tracer provider.
288    ///
289    /// Call `deregister(...)` first if the subscriber is still registered with NeMo Flow.
290    pub fn shutdown(&self) -> Result<()> {
291        let guard = self.inner.processor.lock().map_err(|_| {
292            OpenTelemetryError::Provider("the subscriber state lock was poisoned".to_string())
293        })?;
294        guard.shutdown()
295    }
296}
297
298#[cfg(target_arch = "wasm32")]
299#[derive(Debug, Clone, Copy, Default)]
300struct WasmHttpClient;
301
302#[cfg(target_arch = "wasm32")]
303#[async_trait]
304impl HttpClient for WasmHttpClient {
305    async fn send_bytes(
306        &self,
307        request: HttpRequest<Bytes>,
308    ) -> std::result::Result<HttpResponse<Bytes>, HttpError> {
309        let (parts, body) = request.into_parts();
310
311        let request = {
312            let request_url = parts.uri.to_string();
313            let init = RequestInit::new();
314            init.set_method(parts.method.as_str());
315            if !body.is_empty() {
316                let body_bytes = js_sys::Uint8Array::from(body.as_ref());
317                init.set_body_opt_u8_array(Some(&body_bytes));
318            }
319
320            let request =
321                WebRequest::new_with_str_and_init(&request_url, &init).map_err(js_error)?;
322            let request_headers = request.headers();
323            for (name, value) in &parts.headers {
324                let value = value
325                    .to_str()
326                    .map_err(|e| http_error(format!("invalid OTLP HTTP header {name}: {e}")))?;
327                request_headers
328                    .set(name.as_str(), value)
329                    .map_err(js_error)?;
330            }
331            request
332        };
333
334        let fetch_promise = if let Some(window) = web_sys::window() {
335            window.fetch_with_request(&request)
336        } else {
337            let global = js_sys::global();
338            let fetch = js_sys::Reflect::get(&global, &JsValue::from_str("fetch"))
339                .map_err(js_error)?
340                .dyn_into::<js_sys::Function>()
341                .map_err(js_error)?;
342            fetch.call1(&global, &request).map_err(js_error)?.into()
343        };
344        // Waiting on the fetch promise from a synchronous wasm call stack can deadlock
345        // Node/browser event processing, so dispatch the request asynchronously.
346        spawn_local(async move {
347            if let Err(error) = JsFuture::from(fetch_promise).await {
348                web_sys::console::warn_1(&JsValue::from_str(&format!(
349                    "OpenTelemetry OTLP/HTTP export failed: {error:?}"
350                )));
351            }
352        });
353
354        HttpResponse::builder()
355            .status(202)
356            .body(Bytes::new())
357            .map_err(|e| http_error(e.to_string()))
358    }
359}
360
361#[cfg(target_arch = "wasm32")]
362fn js_error(value: JsValue) -> HttpError {
363    http_error(
364        value
365            .as_string()
366            .unwrap_or_else(|| format!("JavaScript error: {value:?}")),
367    )
368}
369
370#[cfg(target_arch = "wasm32")]
371fn http_error(message: impl Into<String>) -> HttpError {
372    Box::new(std::io::Error::other(message.into()))
373}
374
375fn build_tracer_provider(config: &OpenTelemetryConfig) -> Result<SdkTracerProvider> {
376    let exporter = match config.transport {
377        OtlpTransport::HttpBinary => {
378            #[cfg(not(target_arch = "wasm32"))]
379            install_rustls_crypto_provider();
380            let mut builder = SpanExporter::builder()
381                .with_http()
382                .with_protocol(Protocol::HttpBinary)
383                .with_timeout(config.timeout);
384            if let Some(endpoint) = &config.endpoint {
385                builder = builder.with_endpoint(endpoint.clone());
386            }
387            if !config.headers.is_empty() {
388                builder = builder.with_headers(config.headers.clone());
389            }
390            #[cfg(target_arch = "wasm32")]
391            {
392                builder = builder.with_http_client(WasmHttpClient);
393            }
394            builder
395                .build()
396                .map_err(|e| OpenTelemetryError::ExporterBuild(e.to_string()))?
397        }
398        #[cfg(not(target_arch = "wasm32"))]
399        OtlpTransport::Grpc => {
400            let mut builder = SpanExporter::builder()
401                .with_tonic()
402                .with_protocol(Protocol::Grpc)
403                .with_timeout(config.timeout);
404            if let Some(endpoint) = &config.endpoint {
405                builder = builder.with_endpoint(endpoint.clone());
406            }
407            if !config.headers.is_empty() {
408                builder = builder.with_metadata(build_grpc_metadata(&config.headers)?);
409            }
410            builder
411                .build()
412                .map_err(|e| OpenTelemetryError::ExporterBuild(e.to_string()))?
413        }
414        #[cfg(target_arch = "wasm32")]
415        OtlpTransport::Grpc => {
416            return Err(OpenTelemetryError::UnsupportedTransport { transport: "gRPC" });
417        }
418    };
419
420    let mut resource_attributes = vec![KeyValue::new("service.name", config.service_name.clone())];
421    if let Some(service_namespace) = &config.service_namespace {
422        resource_attributes.push(KeyValue::new(
423            "service.namespace",
424            service_namespace.clone(),
425        ));
426    }
427    if let Some(service_version) = &config.service_version {
428        resource_attributes.push(KeyValue::new("service.version", service_version.clone()));
429    }
430    for (key, value) in &config.resource_attributes {
431        resource_attributes.push(KeyValue::new(key.clone(), value.clone()));
432    }
433
434    // Disable per-span attribute caps. Consumers may emit large attribute
435    // sets on long-running spans; the OTel SDK default (128) silently drops
436    // attributes added last in the span's lifecycle.
437    let builder = SdkTracerProvider::builder()
438        .with_resource(
439            Resource::builder_empty()
440                .with_attributes(resource_attributes)
441                .build(),
442        )
443        .with_max_attributes_per_span(u32::MAX)
444        .with_max_attributes_per_event(u32::MAX);
445
446    #[cfg(not(target_arch = "wasm32"))]
447    {
448        if Handle::try_current().is_ok() {
449            Ok(builder.with_batch_exporter(exporter).build())
450        } else {
451            Ok(builder.with_simple_exporter(exporter).build())
452        }
453    }
454    #[cfg(target_arch = "wasm32")]
455    {
456        Ok(builder.with_simple_exporter(exporter).build())
457    }
458}
459
460#[cfg(not(target_arch = "wasm32"))]
461fn install_rustls_crypto_provider() {
462    let _ = rustls::crypto::ring::default_provider().install_default();
463}
464
465#[cfg(not(target_arch = "wasm32"))]
466fn build_grpc_metadata(headers: &HashMap<String, String>) -> Result<MetadataMap> {
467    let mut metadata = MetadataMap::new();
468    for (key, value) in headers {
469        let metadata_key = MetadataKey::from_bytes(key.as_bytes()).map_err(|e| {
470            OpenTelemetryError::InvalidGrpcHeader {
471                key: key.clone(),
472                message: e.to_string(),
473            }
474        })?;
475        let metadata_value = MetadataValue::try_from(value.as_str()).map_err(|e| {
476            OpenTelemetryError::InvalidGrpcHeader {
477                key: key.clone(),
478                message: e.to_string(),
479            }
480        })?;
481        metadata.insert(metadata_key, metadata_value);
482    }
483    Ok(metadata)
484}
485
486struct ActiveSpan {
487    span: Span,
488    span_context: SpanContext,
489}
490
491struct OtelEventProcessor {
492    active_spans: HashMap<Uuid, ActiveSpan>,
493    provider: SdkTracerProvider,
494    tracer: SdkTracer,
495}
496
497impl OtelEventProcessor {
498    fn new(provider: SdkTracerProvider, instrumentation_scope: String) -> Self {
499        let tracer = provider.tracer(instrumentation_scope);
500        Self {
501            active_spans: HashMap::new(),
502            provider,
503            tracer,
504        }
505    }
506
507    fn process(&mut self, event: &Event) {
508        match event.scope_category() {
509            Some(ScopeCategory::Start) => self.process_start(event),
510            Some(ScopeCategory::End) => self.process_end(event),
511            None => self.process_mark(event),
512        }
513    }
514
515    fn force_flush(&self) -> Result<()> {
516        self.provider
517            .force_flush()
518            .map_err(|e| OpenTelemetryError::Provider(e.to_string()))
519    }
520
521    fn shutdown(&self) -> Result<()> {
522        self.provider
523            .shutdown()
524            .map_err(|e| OpenTelemetryError::Provider(e.to_string()))
525    }
526
527    fn process_start(&mut self, event: &Event) {
528        let mut span = self
529            .tracer
530            .span_builder(span_name(event))
531            .with_kind(span_kind(event))
532            .with_start_time(to_system_time(*event.timestamp()))
533            .start_with_context(&self.tracer, &self.parent_context(event));
534        span.set_attributes(start_attributes(event));
535        let span_context = local_parent_span_context(span.span_context());
536        self.active_spans
537            .insert(event.uuid(), ActiveSpan { span, span_context });
538    }
539
540    fn process_end(&mut self, event: &Event) {
541        let Some(mut active_span) = self.active_spans.remove(&event.uuid()) else {
542            return;
543        };
544        active_span.span.set_attributes(end_attributes(event));
545        active_span
546            .span
547            .end_with_timestamp(to_system_time(*event.timestamp()));
548    }
549
550    fn process_mark(&mut self, event: &Event) {
551        let mark_name = event.name().to_string();
552        let timestamp = to_system_time(*event.timestamp());
553        let attributes = mark_attributes(event);
554
555        if let Some(parent_span) = self.find_parent_span_mut(event) {
556            parent_span
557                .span
558                .add_event_with_timestamp(mark_name, timestamp, attributes);
559            return;
560        }
561
562        let mut span = self
563            .tracer
564            .span_builder(format!("mark:{mark_name}"))
565            .with_kind(SpanKind::Internal)
566            .with_start_time(timestamp)
567            .start_with_context(&self.tracer, &self.parent_context(event));
568        let mut span_attributes = attributes;
569        span_attributes.push(KeyValue::new("nemo_flow.mark.orphan", true));
570        span.set_attributes(span_attributes);
571        span.end_with_timestamp(timestamp);
572    }
573
574    fn parent_context(&self, event: &Event) -> Context {
575        self.find_parent_span(event)
576            .map(|active_span| {
577                Context::new().with_remote_span_context(active_span.span_context.clone())
578            })
579            .unwrap_or_default()
580    }
581
582    fn parent_span_uuid(&self, event: &Event) -> Option<Uuid> {
583        event
584            .parent_uuid()
585            .filter(|uuid| self.active_spans.contains_key(uuid))
586    }
587
588    fn find_parent_span(&self, event: &Event) -> Option<&ActiveSpan> {
589        self.parent_span_uuid(event)
590            .and_then(|uuid| self.active_spans.get(&uuid))
591    }
592
593    fn find_parent_span_mut(&mut self, event: &Event) -> Option<&mut ActiveSpan> {
594        self.parent_span_uuid(event)
595            .and_then(|uuid| self.active_spans.get_mut(&uuid))
596    }
597}
598
599fn span_kind(event: &Event) -> SpanKind {
600    match semantic_scope_type(event) {
601        Some(ScopeType::Llm) => SpanKind::Client,
602        Some(
603            ScopeType::Tool | ScopeType::Retriever | ScopeType::Embedder | ScopeType::Reranker,
604        ) => SpanKind::Client,
605        _ => SpanKind::Internal,
606    }
607}
608
609fn span_name(event: &Event) -> String {
610    event.name().to_string()
611}
612
613fn semantic_scope_type(event: &Event) -> Option<ScopeType> {
614    event.scope_type()
615}
616
617fn scope_type_name(scope_type: Option<ScopeType>) -> &'static str {
618    match scope_type {
619        Some(ScopeType::Agent) => "agent",
620        Some(ScopeType::Function) => "function",
621        Some(ScopeType::Tool) => "tool",
622        Some(ScopeType::Llm) => "llm",
623        Some(ScopeType::Retriever) => "retriever",
624        Some(ScopeType::Embedder) => "embedder",
625        Some(ScopeType::Reranker) => "reranker",
626        Some(ScopeType::Guardrail) => "guardrail",
627        Some(ScopeType::Evaluator) => "evaluator",
628        Some(ScopeType::Custom) => "custom",
629        Some(ScopeType::Unknown) | None => "unknown",
630    }
631}
632
633fn start_attributes(event: &Event) -> Vec<KeyValue> {
634    let mut attributes = common_attributes(event);
635    let handle_attributes = event.attributes();
636    push_serialized(
637        &mut attributes,
638        "nemo_flow.handle_attributes_json",
639        handle_attributes,
640    );
641    push_serialized(&mut attributes, "nemo_flow.start.data_json", event.data());
642    push_serialized(
643        &mut attributes,
644        "nemo_flow.start.metadata_json",
645        event.metadata(),
646    );
647    push_serialized(&mut attributes, "nemo_flow.start.input_json", event.input());
648    attributes
649}
650
651fn end_attributes(event: &Event) -> Vec<KeyValue> {
652    let mut attributes = Vec::new();
653    push_serialized(&mut attributes, "nemo_flow.end.data_json", event.data());
654    push_serialized(
655        &mut attributes,
656        "nemo_flow.end.metadata_json",
657        event.metadata(),
658    );
659    push_serialized(&mut attributes, "nemo_flow.end.output_json", event.output());
660    attributes
661}
662
663fn mark_attributes(event: &Event) -> Vec<KeyValue> {
664    let handle_attributes = event.attributes();
665    let mut attributes = vec![
666        KeyValue::new("nemo_flow.mark.uuid", event.uuid().to_string()),
667        KeyValue::new(
668            "nemo_flow.mark.parent_uuid",
669            event
670                .parent_uuid()
671                .map(|uuid| uuid.to_string())
672                .unwrap_or_default(),
673        ),
674    ];
675    push_serialized(
676        &mut attributes,
677        "nemo_flow.mark.attributes_json",
678        handle_attributes,
679    );
680    push_serialized(&mut attributes, "nemo_flow.mark.data_json", event.data());
681    push_serialized(
682        &mut attributes,
683        "nemo_flow.mark.metadata_json",
684        event.metadata(),
685    );
686    attributes
687}
688
689fn common_attributes(event: &Event) -> Vec<KeyValue> {
690    let mut attributes = vec![
691        KeyValue::new("nemo_flow.uuid", event.uuid().to_string()),
692        KeyValue::new(
693            "nemo_flow.parent_uuid",
694            event
695                .parent_uuid()
696                .map(|uuid| uuid.to_string())
697                .unwrap_or_default(),
698        ),
699        KeyValue::new(
700            "nemo_flow.scope_type",
701            scope_type_name(semantic_scope_type(event)),
702        ),
703    ];
704
705    if let Some(model_name) = event.model_name() {
706        attributes.push(KeyValue::new(
707            "nemo_flow.model_name",
708            model_name.to_string(),
709        ));
710    }
711    if let Some(tool_call_id) = event.tool_call_id() {
712        attributes.push(KeyValue::new(
713            "nemo_flow.tool_call_id",
714            tool_call_id.to_string(),
715        ));
716    }
717
718    attributes
719}
720
721fn push_serialized<T: Serialize + ?Sized>(
722    attributes: &mut Vec<KeyValue>,
723    key: &'static str,
724    value: Option<&T>,
725) {
726    if let Some(value) = value
727        && let Ok(json) = serde_json::to_string(value)
728    {
729        attributes.push(KeyValue::new(key, json));
730    }
731}
732
733fn local_parent_span_context(span_context: &SpanContext) -> SpanContext {
734    SpanContext::new(
735        span_context.trace_id(),
736        span_context.span_id(),
737        span_context.trace_flags(),
738        false,
739        span_context.trace_state().clone(),
740    )
741}
742
743fn to_system_time(timestamp: DateTime<Utc>) -> SystemTime {
744    let seconds = timestamp.timestamp();
745    let nanos = timestamp.timestamp_subsec_nanos();
746    if seconds >= 0 {
747        UNIX_EPOCH + Duration::new(seconds as u64, nanos)
748    } else if nanos == 0 {
749        UNIX_EPOCH - Duration::new(seconds.unsigned_abs(), 0)
750    } else {
751        UNIX_EPOCH - Duration::new(seconds.unsigned_abs() - 1, 1_000_000_000 - nanos)
752    }
753}
754
755#[cfg(test)]
756#[path = "../../tests/unit/observability/otel_tests.rs"]
757mod tests;