Skip to main content

nemo_flow/observability/
openinference.rs

1// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! OpenInference subscriber support for NeMo Flow.
5//!
6//! This crate adapts NeMo Flow lifecycle events into OpenInference trace spans:
7//!
8//! - scope/tool/LLM `Start` events open spans
9//! - matching `End` events close spans
10//! - `Mark` events become span events on the active parent span when possible
11//! - orphan marks fall back to zero-duration spans so they still reach OTLP
12//!
13//! The public API is intentionally small:
14//!
15//! - [`OpenInferenceConfig`] configures the OTLP exporter and OpenInference metadata
16//! - [`OpenInferenceSubscriber`] exposes a NeMo Flow [`EventSubscriberFn`] and
17//!   convenience `register` / `deregister` / `force_flush` / `shutdown` methods
18
19use std::collections::HashMap;
20use std::sync::{Arc, Mutex};
21use std::time::{Duration, SystemTime, UNIX_EPOCH};
22
23use crate::api::event::{Event, ScopeCategory};
24use crate::api::runtime::EventSubscriberFn;
25use crate::api::scope::ScopeType;
26use crate::api::subscriber::{deregister_subscriber, register_subscriber};
27use crate::error::FlowError;
28use chrono::{DateTime, Utc};
29use openinference_semantic_conventions::SpanKind as OpenInferenceSpanKind;
30use openinference_semantic_conventions::attributes as oi;
31use opentelemetry::trace::{
32    Span as _, SpanContext, SpanKind, TraceContextExt, Tracer, TracerProvider as _,
33};
34use opentelemetry::{Context, KeyValue};
35use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig, WithHttpConfig};
36use opentelemetry_sdk::Resource;
37use opentelemetry_sdk::trace::{SdkTracer, SdkTracerProvider, Span};
38use serde::Serialize;
39use uuid::Uuid;
40
41#[cfg(target_arch = "wasm32")]
42use async_trait::async_trait;
43#[cfg(target_arch = "wasm32")]
44use opentelemetry_http::{
45    Bytes, HttpClient, HttpError, Request as HttpRequest, Response as HttpResponse,
46};
47#[cfg(not(target_arch = "wasm32"))]
48use opentelemetry_otlp::WithTonicConfig;
49#[cfg(not(target_arch = "wasm32"))]
50use tokio::runtime::Handle;
51#[cfg(not(target_arch = "wasm32"))]
52use tonic::metadata::{MetadataKey, MetadataMap, MetadataValue};
53#[cfg(target_arch = "wasm32")]
54use wasm_bindgen::{JsCast, JsValue};
55#[cfg(target_arch = "wasm32")]
56use wasm_bindgen_futures::{JsFuture, spawn_local};
57#[cfg(target_arch = "wasm32")]
58use web_sys::{Request as WebRequest, RequestInit};
59
60/// Result type for the OpenInference subscriber crate.
61pub type Result<T> = std::result::Result<T, OpenInferenceError>;
62
63/// Errors produced while configuring or operating the OpenInference subscriber.
64#[derive(Debug, thiserror::Error)]
65pub enum OpenInferenceError {
66    /// The tonic gRPC exporter requires an active Tokio runtime.
67    #[error("the OTLP gRPC exporter requires an active Tokio runtime")]
68    MissingTokioRuntime,
69    /// The requested transport is not available on this target.
70    #[error("the OTLP {transport} transport is not supported on this target")]
71    UnsupportedTransport {
72        /// Human-readable transport label used in the error message.
73        transport: &'static str,
74    },
75    /// Failed to parse a configured gRPC metadata header.
76    #[error("invalid OTLP gRPC header {key:?}: {message}")]
77    InvalidGrpcHeader {
78        /// Header name that failed to parse.
79        key: String,
80        /// Parser failure message.
81        message: String,
82    },
83    /// Failed to build the OTLP exporter.
84    #[error("failed to build the OTLP exporter: {0}")]
85    ExporterBuild(String),
86    /// The underlying tracer provider returned an error.
87    #[error("OpenInference tracer provider error: {0}")]
88    Provider(String),
89    /// Registration errors from the core runtime.
90    #[error(transparent)]
91    Core(#[from] FlowError),
92}
93
94/// Supported OTLP trace transports.
95#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
96pub enum OtlpTransport {
97    /// OTLP/HTTP protobuf, typically `http://host:4318/v1/traces`.
98    #[default]
99    HttpBinary,
100    /// OTLP/gRPC, typically `http://host:4317`.
101    Grpc,
102}
103
104/// Configuration for the OpenInference subscriber.
105#[derive(Debug, Clone)]
106pub struct OpenInferenceConfig {
107    endpoint: Option<String>,
108    headers: HashMap<String, String>,
109    resource_attributes: HashMap<String, String>,
110    service_name: String,
111    service_namespace: Option<String>,
112    service_version: Option<String>,
113    instrumentation_scope: String,
114    timeout: Duration,
115    transport: OtlpTransport,
116}
117
118impl Default for OpenInferenceConfig {
119    fn default() -> Self {
120        Self {
121            endpoint: None,
122            headers: HashMap::new(),
123            resource_attributes: HashMap::new(),
124            service_name: "nemo-flow".to_string(),
125            service_namespace: None,
126            service_version: None,
127            instrumentation_scope: "nemo-flow-openinference".to_string(),
128            timeout: Duration::from_secs(3),
129            transport: OtlpTransport::HttpBinary,
130        }
131    }
132}
133
134impl OpenInferenceConfig {
135    /// Creates a config with sensible defaults.
136    pub fn new() -> Self {
137        Self::default()
138    }
139
140    /// Selects the OTLP transport.
141    pub fn with_transport(mut self, transport: OtlpTransport) -> Self {
142        self.transport = transport;
143        self
144    }
145
146    /// Sets the `service.name` resource attribute.
147    pub fn with_service_name(mut self, service_name: impl Into<String>) -> Self {
148        self.service_name = service_name.into();
149        self
150    }
151
152    /// Overrides the OTLP endpoint. If unset, exporter defaults and OTEL_* env vars apply.
153    pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
154        self.endpoint = Some(endpoint.into());
155        self
156    }
157
158    /// Adds a header/metadata entry for the exporter.
159    pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
160        self.headers.insert(key.into(), value.into());
161        self
162    }
163
164    /// Adds a resource attribute as a string key/value pair.
165    pub fn with_resource_attribute(
166        mut self,
167        key: impl Into<String>,
168        value: impl Into<String>,
169    ) -> Self {
170        self.resource_attributes.insert(key.into(), value.into());
171        self
172    }
173
174    /// Sets the OTLP request timeout.
175    pub fn with_timeout(mut self, timeout: Duration) -> Self {
176        self.timeout = timeout;
177        self
178    }
179
180    /// Sets the service namespace resource attribute.
181    pub fn with_service_namespace(mut self, namespace: impl Into<String>) -> Self {
182        self.service_namespace = Some(namespace.into());
183        self
184    }
185
186    /// Sets the service version resource attribute.
187    pub fn with_service_version(mut self, version: impl Into<String>) -> Self {
188        self.service_version = Some(version.into());
189        self
190    }
191
192    /// Sets the instrumentation scope name used for emitted spans.
193    pub fn with_instrumentation_scope(mut self, scope: impl Into<String>) -> Self {
194        self.instrumentation_scope = scope.into();
195        self
196    }
197}
198
199/// OpenInference-backed NeMo Flow subscriber.
200#[derive(Clone)]
201pub struct OpenInferenceSubscriber {
202    inner: Arc<Inner>,
203}
204
205struct Inner {
206    processor: Arc<Mutex<OpenInferenceEventProcessor>>,
207    subscriber: EventSubscriberFn,
208}
209
210impl OpenInferenceSubscriber {
211    /// Builds a subscriber backed by a new OTLP tracer provider.
212    pub fn new(config: OpenInferenceConfig) -> Result<Self> {
213        #[cfg(not(target_arch = "wasm32"))]
214        if config.transport == OtlpTransport::Grpc && tokio::runtime::Handle::try_current().is_err()
215        {
216            return Err(OpenInferenceError::MissingTokioRuntime);
217        }
218        #[cfg(target_arch = "wasm32")]
219        if config.transport == OtlpTransport::Grpc {
220            return Err(OpenInferenceError::UnsupportedTransport { transport: "gRPC" });
221        }
222
223        let provider = build_tracer_provider(&config)?;
224        Ok(Self::from_tracer_provider_with_scope(
225            provider,
226            config.instrumentation_scope,
227        ))
228    }
229
230    /// Builds a subscriber from an already-configured tracer provider.
231    pub fn from_tracer_provider(
232        provider: SdkTracerProvider,
233        instrumentation_scope: impl Into<String>,
234    ) -> Self {
235        Self::from_tracer_provider_with_scope(provider, instrumentation_scope.into())
236    }
237
238    fn from_tracer_provider_with_scope(
239        provider: SdkTracerProvider,
240        instrumentation_scope: String,
241    ) -> Self {
242        let processor = Arc::new(Mutex::new(OpenInferenceEventProcessor::new(
243            provider,
244            instrumentation_scope,
245        )));
246        let processor_for_callback = Arc::clone(&processor);
247        let subscriber: EventSubscriberFn = Arc::new(move |event: &Event| {
248            let Ok(mut guard) = processor_for_callback.lock() else {
249                // Observability should not take down the host process if the
250                // subscriber state was previously poisoned.
251                return;
252            };
253            guard.process(event);
254        });
255
256        Self {
257            inner: Arc::new(Inner {
258                processor,
259                subscriber,
260            }),
261        }
262    }
263
264    /// Returns the raw NeMo Flow subscriber callback for custom registration flows.
265    pub fn subscriber(&self) -> EventSubscriberFn {
266        Arc::clone(&self.inner.subscriber)
267    }
268
269    /// Registers this subscriber globally with the NeMo Flow runtime.
270    pub fn register(&self, name: &str) -> Result<()> {
271        register_subscriber(name, self.subscriber()).map_err(Into::into)
272    }
273
274    /// Deregisters a previously-registered global subscriber by name.
275    pub fn deregister(&self, name: &str) -> Result<bool> {
276        deregister_subscriber(name).map_err(Into::into)
277    }
278
279    /// Flushes finished spans through the underlying tracer provider.
280    pub fn force_flush(&self) -> Result<()> {
281        let guard = self.inner.processor.lock().map_err(|_| {
282            OpenInferenceError::Provider("the subscriber state lock was poisoned".to_string())
283        })?;
284        guard.force_flush()
285    }
286
287    /// Shuts down the underlying tracer provider.
288    ///
289    /// Call `deregister(...)` first if the subscriber is still registered with NeMo Flow.
290    pub fn shutdown(&self) -> Result<()> {
291        let guard = self.inner.processor.lock().map_err(|_| {
292            OpenInferenceError::Provider("the subscriber state lock was poisoned".to_string())
293        })?;
294        guard.shutdown()
295    }
296}
297
298#[cfg(target_arch = "wasm32")]
299#[derive(Debug, Clone, Copy, Default)]
300struct WasmHttpClient;
301
302#[cfg(target_arch = "wasm32")]
303#[async_trait]
304impl HttpClient for WasmHttpClient {
305    async fn send_bytes(
306        &self,
307        request: HttpRequest<Bytes>,
308    ) -> std::result::Result<HttpResponse<Bytes>, HttpError> {
309        let (parts, body) = request.into_parts();
310
311        let request = {
312            let request_url = parts.uri.to_string();
313            let init = RequestInit::new();
314            init.set_method(parts.method.as_str());
315            if !body.is_empty() {
316                let body_bytes = js_sys::Uint8Array::from(body.as_ref());
317                init.set_body_opt_u8_array(Some(&body_bytes));
318            }
319
320            let request =
321                WebRequest::new_with_str_and_init(&request_url, &init).map_err(js_error)?;
322            let request_headers = request.headers();
323            for (name, value) in &parts.headers {
324                let value = value
325                    .to_str()
326                    .map_err(|e| http_error(format!("invalid OTLP HTTP header {name}: {e}")))?;
327                request_headers
328                    .set(name.as_str(), value)
329                    .map_err(js_error)?;
330            }
331            request
332        };
333
334        let fetch_promise = if let Some(window) = web_sys::window() {
335            window.fetch_with_request(&request)
336        } else {
337            let global = js_sys::global();
338            let fetch = js_sys::Reflect::get(&global, &JsValue::from_str("fetch"))
339                .map_err(js_error)?
340                .dyn_into::<js_sys::Function>()
341                .map_err(js_error)?;
342            fetch.call1(&global, &request).map_err(js_error)?.into()
343        };
344        // Waiting on the fetch promise from a synchronous wasm call stack can deadlock
345        // Node/browser event processing, so dispatch the request asynchronously.
346        spawn_local(async move {
347            if let Err(error) = JsFuture::from(fetch_promise).await {
348                web_sys::console::warn_1(&JsValue::from_str(&format!(
349                    "OpenInference OTLP/HTTP export failed: {error:?}"
350                )));
351            }
352        });
353
354        HttpResponse::builder()
355            .status(202)
356            .body(Bytes::new())
357            .map_err(|e| http_error(e.to_string()))
358    }
359}
360
361#[cfg(target_arch = "wasm32")]
362fn js_error(value: JsValue) -> HttpError {
363    http_error(
364        value
365            .as_string()
366            .unwrap_or_else(|| format!("JavaScript error: {value:?}")),
367    )
368}
369
370#[cfg(target_arch = "wasm32")]
371fn http_error(message: impl Into<String>) -> HttpError {
372    Box::new(std::io::Error::other(message.into()))
373}
374
375fn build_tracer_provider(config: &OpenInferenceConfig) -> Result<SdkTracerProvider> {
376    let exporter = match config.transport {
377        OtlpTransport::HttpBinary => {
378            let mut builder = SpanExporter::builder()
379                .with_http()
380                .with_protocol(Protocol::HttpBinary)
381                .with_timeout(config.timeout);
382            if let Some(endpoint) = &config.endpoint {
383                builder = builder.with_endpoint(endpoint.clone());
384            }
385            if !config.headers.is_empty() {
386                builder = builder.with_headers(config.headers.clone());
387            }
388            #[cfg(target_arch = "wasm32")]
389            {
390                builder = builder.with_http_client(WasmHttpClient);
391            }
392            builder
393                .build()
394                .map_err(|e| OpenInferenceError::ExporterBuild(e.to_string()))?
395        }
396        #[cfg(not(target_arch = "wasm32"))]
397        OtlpTransport::Grpc => {
398            let mut builder = SpanExporter::builder()
399                .with_tonic()
400                .with_protocol(Protocol::Grpc)
401                .with_timeout(config.timeout);
402            if let Some(endpoint) = &config.endpoint {
403                builder = builder.with_endpoint(endpoint.clone());
404            }
405            if !config.headers.is_empty() {
406                builder = builder.with_metadata(build_grpc_metadata(&config.headers)?);
407            }
408            builder
409                .build()
410                .map_err(|e| OpenInferenceError::ExporterBuild(e.to_string()))?
411        }
412        #[cfg(target_arch = "wasm32")]
413        OtlpTransport::Grpc => {
414            return Err(OpenInferenceError::UnsupportedTransport { transport: "gRPC" });
415        }
416    };
417
418    let mut resource_attributes = vec![KeyValue::new("service.name", config.service_name.clone())];
419    if let Some(service_namespace) = &config.service_namespace {
420        resource_attributes.push(KeyValue::new(
421            "service.namespace",
422            service_namespace.clone(),
423        ));
424    }
425    if let Some(service_version) = &config.service_version {
426        resource_attributes.push(KeyValue::new("service.version", service_version.clone()));
427    }
428    for (key, value) in &config.resource_attributes {
429        resource_attributes.push(KeyValue::new(key.clone(), value.clone()));
430    }
431
432    // Disable per-span attribute caps. OpenInference emits many flat
433    // `llm.input_messages.*` attributes on long conversations; the OTel SDK
434    // default (128) silently drops attributes added last in the span's
435    // lifecycle, notably `llm.token_count.*` emitted at span end.
436    let builder = SdkTracerProvider::builder()
437        .with_resource(
438            Resource::builder_empty()
439                .with_attributes(resource_attributes)
440                .build(),
441        )
442        .with_max_attributes_per_span(u32::MAX)
443        .with_max_attributes_per_event(u32::MAX);
444
445    #[cfg(not(target_arch = "wasm32"))]
446    {
447        if Handle::try_current().is_ok() {
448            Ok(builder.with_batch_exporter(exporter).build())
449        } else {
450            Ok(builder.with_simple_exporter(exporter).build())
451        }
452    }
453    #[cfg(target_arch = "wasm32")]
454    {
455        Ok(builder.with_simple_exporter(exporter).build())
456    }
457}
458
459#[cfg(not(target_arch = "wasm32"))]
460fn build_grpc_metadata(headers: &HashMap<String, String>) -> Result<MetadataMap> {
461    let mut metadata = MetadataMap::new();
462    for (key, value) in headers {
463        let metadata_key = MetadataKey::from_bytes(key.as_bytes()).map_err(|e| {
464            OpenInferenceError::InvalidGrpcHeader {
465                key: key.clone(),
466                message: e.to_string(),
467            }
468        })?;
469        let metadata_value = MetadataValue::try_from(value.as_str()).map_err(|e| {
470            OpenInferenceError::InvalidGrpcHeader {
471                key: key.clone(),
472                message: e.to_string(),
473            }
474        })?;
475        metadata.insert(metadata_key, metadata_value);
476    }
477    Ok(metadata)
478}
479
480struct ActiveSpan {
481    span: Span,
482    span_context: SpanContext,
483}
484
485struct OpenInferenceEventProcessor {
486    active_spans: HashMap<Uuid, ActiveSpan>,
487    provider: SdkTracerProvider,
488    tracer: SdkTracer,
489}
490
491impl OpenInferenceEventProcessor {
492    fn new(provider: SdkTracerProvider, instrumentation_scope: String) -> Self {
493        let tracer = provider.tracer(instrumentation_scope);
494        Self {
495            active_spans: HashMap::new(),
496            provider,
497            tracer,
498        }
499    }
500
501    fn process(&mut self, event: &Event) {
502        match event.scope_category() {
503            Some(ScopeCategory::Start) => self.process_start(event),
504            Some(ScopeCategory::End) => self.process_end(event),
505            None => self.process_mark(event),
506        }
507    }
508
509    fn force_flush(&self) -> Result<()> {
510        self.provider
511            .force_flush()
512            .map_err(|e| OpenInferenceError::Provider(e.to_string()))
513    }
514
515    fn shutdown(&self) -> Result<()> {
516        self.provider
517            .shutdown()
518            .map_err(|e| OpenInferenceError::Provider(e.to_string()))
519    }
520
521    fn process_start(&mut self, event: &Event) {
522        let mut span = self
523            .tracer
524            .span_builder(span_name(event))
525            .with_kind(span_kind(event))
526            .with_start_time(to_system_time(*event.timestamp()))
527            .start_with_context(&self.tracer, &self.parent_context(event));
528        span.set_attributes(start_attributes(event));
529        let span_context = local_parent_span_context(span.span_context());
530        self.active_spans
531            .insert(event.uuid(), ActiveSpan { span, span_context });
532    }
533
534    fn process_end(&mut self, event: &Event) {
535        let Some(mut active_span) = self.active_spans.remove(&event.uuid()) else {
536            return;
537        };
538        active_span.span.set_attributes(end_attributes(event));
539        active_span
540            .span
541            .end_with_timestamp(to_system_time(*event.timestamp()));
542    }
543
544    fn process_mark(&mut self, event: &Event) {
545        let mark_name = event.name().to_string();
546        let timestamp = to_system_time(*event.timestamp());
547        let attributes = mark_attributes(event);
548
549        if let Some(parent_span) = self.find_parent_span_mut(event) {
550            parent_span
551                .span
552                .add_event_with_timestamp(mark_name, timestamp, attributes);
553            return;
554        }
555
556        let mut span = self
557            .tracer
558            .span_builder(format!("mark:{mark_name}"))
559            .with_kind(SpanKind::Internal)
560            .with_start_time(timestamp)
561            .start_with_context(&self.tracer, &self.parent_context(event));
562        let mut span_attributes = attributes;
563        span_attributes.push(KeyValue::new(
564            oi::OPENINFERENCE_SPAN_KIND,
565            OpenInferenceSpanKind::Chain,
566        ));
567        span_attributes.push(KeyValue::new("nemo_flow.mark.orphan", true));
568        span.set_attributes(span_attributes);
569        span.end_with_timestamp(timestamp);
570    }
571
572    fn parent_context(&self, event: &Event) -> Context {
573        self.find_parent_span(event)
574            .map(|active_span| {
575                Context::new().with_remote_span_context(active_span.span_context.clone())
576            })
577            .unwrap_or_default()
578    }
579
580    fn parent_span_uuid(&self, event: &Event) -> Option<Uuid> {
581        event
582            .parent_uuid()
583            .filter(|uuid| self.active_spans.contains_key(uuid))
584    }
585
586    fn find_parent_span(&self, event: &Event) -> Option<&ActiveSpan> {
587        self.parent_span_uuid(event)
588            .and_then(|uuid| self.active_spans.get(&uuid))
589    }
590
591    fn find_parent_span_mut(&mut self, event: &Event) -> Option<&mut ActiveSpan> {
592        self.parent_span_uuid(event)
593            .and_then(|uuid| self.active_spans.get_mut(&uuid))
594    }
595}
596
597fn span_kind(event: &Event) -> SpanKind {
598    match semantic_scope_type(event) {
599        Some(ScopeType::Llm) => SpanKind::Client,
600        Some(
601            ScopeType::Tool | ScopeType::Retriever | ScopeType::Embedder | ScopeType::Reranker,
602        ) => SpanKind::Client,
603        _ => SpanKind::Internal,
604    }
605}
606
607fn span_name(event: &Event) -> String {
608    event.name().to_string()
609}
610
611fn semantic_scope_type(event: &Event) -> Option<ScopeType> {
612    event.scope_type()
613}
614
615fn scope_type_name(scope_type: Option<ScopeType>) -> &'static str {
616    match scope_type {
617        Some(ScopeType::Agent) => "agent",
618        Some(ScopeType::Function) => "function",
619        Some(ScopeType::Tool) => "tool",
620        Some(ScopeType::Llm) => "llm",
621        Some(ScopeType::Retriever) => "retriever",
622        Some(ScopeType::Embedder) => "embedder",
623        Some(ScopeType::Reranker) => "reranker",
624        Some(ScopeType::Guardrail) => "guardrail",
625        Some(ScopeType::Evaluator) => "evaluator",
626        Some(ScopeType::Custom) => "custom",
627        Some(ScopeType::Unknown) | None => "unknown",
628    }
629}
630
631fn start_attributes(event: &Event) -> Vec<KeyValue> {
632    let mut attributes = common_attributes(event);
633    let handle_attributes = event.attributes();
634    push_serialized(
635        &mut attributes,
636        "nemo_flow.handle_attributes_json",
637        handle_attributes,
638    );
639    push_serialized(&mut attributes, "nemo_flow.start.data_json", event.data());
640    push_serialized(
641        &mut attributes,
642        "nemo_flow.start.metadata_json",
643        event.metadata(),
644    );
645    push_serialized(&mut attributes, "nemo_flow.start.input_json", event.input());
646    if event
647        .category()
648        .is_some_and(|category| category.as_str() == "tool")
649    {
650        attributes.push(KeyValue::new(oi::tool::NAME, event.name().to_string()));
651        attributes.push(KeyValue::new(
652            oi::tool_call::function::NAME,
653            event.name().to_string(),
654        ));
655    }
656
657    if let Some(input) = openinference_input_value(event) {
658        attributes.push(KeyValue::new(oi::input::VALUE, input.clone()));
659        attributes.push(KeyValue::new(oi::input::MIME_TYPE, "application/json"));
660
661        if event
662            .category()
663            .is_some_and(|category| category.as_str() == "tool")
664        {
665            attributes.push(KeyValue::new(oi::tool::PARAMETERS, input.clone()));
666            attributes.push(KeyValue::new(oi::tool_call::function::ARGUMENTS, input));
667        }
668    }
669    attributes
670}
671
672fn end_attributes(event: &Event) -> Vec<KeyValue> {
673    let mut attributes = Vec::new();
674    push_serialized(&mut attributes, "nemo_flow.end.data_json", event.data());
675    push_serialized(
676        &mut attributes,
677        "nemo_flow.end.metadata_json",
678        event.metadata(),
679    );
680    push_serialized(&mut attributes, "nemo_flow.end.output_json", event.output());
681    if let Some(output) = event.output().and_then(to_json_string) {
682        attributes.push(KeyValue::new(oi::output::VALUE, output));
683        attributes.push(KeyValue::new(oi::output::MIME_TYPE, "application/json"));
684    }
685    if event
686        .category()
687        .is_some_and(|category| category.as_str() == "llm")
688        && let Some(usage) = event.annotated_response().and_then(|r| r.usage.as_ref())
689    {
690        if let Some(v) = usage.prompt_tokens {
691            attributes.push(KeyValue::new(oi::llm::token_count::PROMPT, v as i64));
692        }
693        if let Some(v) = usage.completion_tokens {
694            attributes.push(KeyValue::new(oi::llm::token_count::COMPLETION, v as i64));
695        }
696        if let Some(v) = usage.total_tokens {
697            attributes.push(KeyValue::new(oi::llm::token_count::TOTAL, v as i64));
698        }
699        if let Some(v) = usage.cache_read_tokens {
700            attributes.push(KeyValue::new(
701                oi::llm::token_count::prompt_details::CACHE_READ,
702                v as i64,
703            ));
704        }
705        if let Some(v) = usage.cache_write_tokens {
706            attributes.push(KeyValue::new(
707                oi::llm::token_count::prompt_details::CACHE_WRITE,
708                v as i64,
709            ));
710        }
711    }
712    attributes
713}
714
715fn mark_attributes(event: &Event) -> Vec<KeyValue> {
716    let handle_attributes = event.attributes();
717    let mut attributes = vec![
718        KeyValue::new("nemo_flow.mark.uuid", event.uuid().to_string()),
719        KeyValue::new(
720            "nemo_flow.mark.parent_uuid",
721            event
722                .parent_uuid()
723                .map(|uuid| uuid.to_string())
724                .unwrap_or_default(),
725        ),
726    ];
727    push_serialized(
728        &mut attributes,
729        "nemo_flow.mark.attributes_json",
730        handle_attributes,
731    );
732    push_serialized(&mut attributes, "nemo_flow.mark.data_json", event.data());
733    push_serialized(
734        &mut attributes,
735        "nemo_flow.mark.metadata_json",
736        event.metadata(),
737    );
738    attributes
739}
740
741fn common_attributes(event: &Event) -> Vec<KeyValue> {
742    let mut attributes = vec![
743        KeyValue::new(
744            oi::OPENINFERENCE_SPAN_KIND,
745            openinference_span_kind(semantic_scope_type(event)),
746        ),
747        KeyValue::new("nemo_flow.uuid", event.uuid().to_string()),
748        KeyValue::new(
749            "nemo_flow.parent_uuid",
750            event
751                .parent_uuid()
752                .map(|uuid| uuid.to_string())
753                .unwrap_or_default(),
754        ),
755        KeyValue::new(
756            "nemo_flow.scope_type",
757            scope_type_name(semantic_scope_type(event)),
758        ),
759    ];
760
761    if let Some(model_name) = event.model_name() {
762        attributes.push(KeyValue::new(
763            "nemo_flow.model_name",
764            model_name.to_string(),
765        ));
766        attributes.push(KeyValue::new(oi::llm::MODEL_NAME, model_name.to_string()));
767    }
768    if let Some(tool_call_id) = event.tool_call_id() {
769        attributes.push(KeyValue::new(
770            "nemo_flow.tool_call_id",
771            tool_call_id.to_string(),
772        ));
773        attributes.push(KeyValue::new(oi::tool_call::ID, tool_call_id.to_string()));
774    }
775    if let Some(metadata) = event.metadata().and_then(to_json_string) {
776        attributes.push(KeyValue::new(oi::METADATA, metadata));
777    }
778
779    attributes
780}
781
782fn openinference_span_kind(scope_type: Option<ScopeType>) -> OpenInferenceSpanKind {
783    match scope_type {
784        Some(ScopeType::Agent) => OpenInferenceSpanKind::Agent,
785        Some(ScopeType::Tool) => OpenInferenceSpanKind::Tool,
786        Some(ScopeType::Llm) => OpenInferenceSpanKind::Llm,
787        Some(ScopeType::Retriever) => OpenInferenceSpanKind::Retriever,
788        Some(ScopeType::Embedder) => OpenInferenceSpanKind::Embedding,
789        Some(ScopeType::Reranker) => OpenInferenceSpanKind::Reranker,
790        Some(ScopeType::Guardrail) => OpenInferenceSpanKind::Guardrail,
791        Some(ScopeType::Evaluator) => OpenInferenceSpanKind::Evaluator,
792        Some(ScopeType::Function | ScopeType::Custom | ScopeType::Unknown) | None => {
793            OpenInferenceSpanKind::Chain
794        }
795    }
796}
797
798fn push_serialized<T: Serialize + ?Sized>(
799    attributes: &mut Vec<KeyValue>,
800    key: &'static str,
801    value: Option<&T>,
802) {
803    if let Some(value) = value
804        && let Ok(json) = serde_json::to_string(value)
805    {
806        attributes.push(KeyValue::new(key, json));
807    }
808}
809
810fn openinference_input_value(event: &Event) -> Option<String> {
811    let input = event.input()?;
812
813    if event
814        .category()
815        .is_some_and(|category| category.as_str() == "llm")
816    {
817        return match input {
818            serde_json::Value::Object(object) => {
819                if let Some(content) = object.get("content") {
820                    return to_json_string(content);
821                }
822
823                let mut sanitized = object.clone();
824                sanitized.remove("headers");
825                to_json_string(&serde_json::Value::Object(sanitized))
826            }
827            _ => to_json_string(input),
828        };
829    }
830
831    to_json_string(input)
832}
833
834fn to_json_string<T: Serialize>(value: &T) -> Option<String> {
835    serde_json::to_string(value).ok()
836}
837
838fn local_parent_span_context(span_context: &SpanContext) -> SpanContext {
839    SpanContext::new(
840        span_context.trace_id(),
841        span_context.span_id(),
842        span_context.trace_flags(),
843        false,
844        span_context.trace_state().clone(),
845    )
846}
847
848fn to_system_time(timestamp: DateTime<Utc>) -> SystemTime {
849    let seconds = timestamp.timestamp();
850    let nanos = timestamp.timestamp_subsec_nanos();
851    if seconds >= 0 {
852        UNIX_EPOCH + Duration::new(seconds as u64, nanos)
853    } else if nanos == 0 {
854        UNIX_EPOCH - Duration::new(seconds.unsigned_abs(), 0)
855    } else {
856        UNIX_EPOCH - Duration::new(seconds.unsigned_abs() - 1, 1_000_000_000 - nanos)
857    }
858}
859
860#[cfg(test)]
861#[path = "../../tests/unit/observability/openinference_tests.rs"]
862mod tests;