Skip to main content

nemo_flow/observability/
openinference.rs

1// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! OpenInference subscriber support for NeMo Flow.
5//!
6//! This crate adapts NeMo Flow lifecycle events into OpenInference trace spans:
7//!
8//! - scope/tool/LLM `Start` events open spans
9//! - matching `End` events close spans
10//! - `Mark` events become span events on the active parent span when possible
11//! - orphan marks fall back to zero-duration spans so they still reach OTLP
12//!
13//! The public API is intentionally small:
14//!
15//! - [`OpenInferenceConfig`] configures the OTLP exporter and OpenInference metadata
16//! - [`OpenInferenceSubscriber`] exposes a NeMo Flow [`EventSubscriberFn`] and
17//!   convenience `register` / `deregister` / `force_flush` / `shutdown` methods
18
19use std::collections::HashMap;
20use std::sync::{Arc, Mutex};
21use std::time::{Duration, SystemTime, UNIX_EPOCH};
22
23use crate::api::event::{Event, ScopeCategory};
24use crate::api::runtime::EventSubscriberFn;
25use crate::api::scope::ScopeType;
26use crate::api::subscriber::{deregister_subscriber, register_subscriber};
27use crate::codec::response::Usage;
28use crate::error::FlowError;
29use crate::json::Json;
30use chrono::{DateTime, Utc};
31use openinference_semantic_conventions::SpanKind as OpenInferenceSpanKind;
32use openinference_semantic_conventions::attributes as oi;
33use opentelemetry::trace::{
34    Span as _, SpanContext, SpanKind, TraceContextExt, Tracer, TracerProvider as _,
35};
36use opentelemetry::{Context, KeyValue};
37use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig, WithHttpConfig};
38use opentelemetry_sdk::Resource;
39use opentelemetry_sdk::trace::{SdkTracer, SdkTracerProvider, Span};
40use serde::Serialize;
41use uuid::Uuid;
42
43#[cfg(target_arch = "wasm32")]
44use async_trait::async_trait;
45#[cfg(target_arch = "wasm32")]
46use opentelemetry_http::{
47    Bytes, HttpClient, HttpError, Request as HttpRequest, Response as HttpResponse,
48};
49#[cfg(not(target_arch = "wasm32"))]
50use opentelemetry_otlp::WithTonicConfig;
51#[cfg(not(target_arch = "wasm32"))]
52use tokio::runtime::Handle;
53#[cfg(not(target_arch = "wasm32"))]
54use tonic::metadata::{MetadataKey, MetadataMap, MetadataValue};
55#[cfg(target_arch = "wasm32")]
56use wasm_bindgen::{JsCast, JsValue};
57#[cfg(target_arch = "wasm32")]
58use wasm_bindgen_futures::{JsFuture, spawn_local};
59#[cfg(target_arch = "wasm32")]
60use web_sys::{Request as WebRequest, RequestInit};
61
62/// Result type for the OpenInference subscriber crate.
63pub type Result<T> = std::result::Result<T, OpenInferenceError>;
64
65/// Errors produced while configuring or operating the OpenInference subscriber.
66#[derive(Debug, thiserror::Error)]
67pub enum OpenInferenceError {
68    /// The tonic gRPC exporter requires an active Tokio runtime.
69    #[error("the OTLP gRPC exporter requires an active Tokio runtime")]
70    MissingTokioRuntime,
71    /// The requested transport is not available on this target.
72    #[error("the OTLP {transport} transport is not supported on this target")]
73    UnsupportedTransport {
74        /// Human-readable transport label used in the error message.
75        transport: &'static str,
76    },
77    /// Failed to parse a configured gRPC metadata header.
78    #[error("invalid OTLP gRPC header {key:?}: {message}")]
79    InvalidGrpcHeader {
80        /// Header name that failed to parse.
81        key: String,
82        /// Parser failure message.
83        message: String,
84    },
85    /// Failed to build the OTLP exporter.
86    #[error("failed to build the OTLP exporter: {0}")]
87    ExporterBuild(String),
88    /// The underlying tracer provider returned an error.
89    #[error("OpenInference tracer provider error: {0}")]
90    Provider(String),
91    /// Registration errors from the core runtime.
92    #[error(transparent)]
93    Core(#[from] FlowError),
94}
95
96/// Supported OTLP trace transports.
97#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
98pub enum OtlpTransport {
99    /// OTLP/HTTP protobuf, typically `http://host:4318/v1/traces`.
100    #[default]
101    HttpBinary,
102    /// OTLP/gRPC, typically `http://host:4317`.
103    Grpc,
104}
105
106/// Configuration for the OpenInference subscriber.
107#[derive(Debug, Clone)]
108pub struct OpenInferenceConfig {
109    endpoint: Option<String>,
110    headers: HashMap<String, String>,
111    resource_attributes: HashMap<String, String>,
112    service_name: String,
113    service_namespace: Option<String>,
114    service_version: Option<String>,
115    instrumentation_scope: String,
116    timeout: Duration,
117    transport: OtlpTransport,
118}
119
120impl Default for OpenInferenceConfig {
121    fn default() -> Self {
122        Self {
123            endpoint: None,
124            headers: HashMap::new(),
125            resource_attributes: HashMap::new(),
126            service_name: "nemo-flow".to_string(),
127            service_namespace: None,
128            service_version: None,
129            instrumentation_scope: "nemo-flow-openinference".to_string(),
130            timeout: Duration::from_secs(3),
131            transport: OtlpTransport::HttpBinary,
132        }
133    }
134}
135
136impl OpenInferenceConfig {
137    /// Creates a config with sensible defaults.
138    pub fn new() -> Self {
139        Self::default()
140    }
141
142    /// Selects the OTLP transport.
143    pub fn with_transport(mut self, transport: OtlpTransport) -> Self {
144        self.transport = transport;
145        self
146    }
147
148    /// Sets the `service.name` resource attribute.
149    pub fn with_service_name(mut self, service_name: impl Into<String>) -> Self {
150        self.service_name = service_name.into();
151        self
152    }
153
154    /// Overrides the OTLP endpoint. If unset, exporter defaults and OTEL_* env vars apply.
155    pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
156        self.endpoint = Some(endpoint.into());
157        self
158    }
159
160    /// Adds a header/metadata entry for the exporter.
161    pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
162        self.headers.insert(key.into(), value.into());
163        self
164    }
165
166    /// Adds a resource attribute as a string key/value pair.
167    pub fn with_resource_attribute(
168        mut self,
169        key: impl Into<String>,
170        value: impl Into<String>,
171    ) -> Self {
172        self.resource_attributes.insert(key.into(), value.into());
173        self
174    }
175
176    /// Sets the OTLP request timeout.
177    pub fn with_timeout(mut self, timeout: Duration) -> Self {
178        self.timeout = timeout;
179        self
180    }
181
182    /// Sets the service namespace resource attribute.
183    pub fn with_service_namespace(mut self, namespace: impl Into<String>) -> Self {
184        self.service_namespace = Some(namespace.into());
185        self
186    }
187
188    /// Sets the service version resource attribute.
189    pub fn with_service_version(mut self, version: impl Into<String>) -> Self {
190        self.service_version = Some(version.into());
191        self
192    }
193
194    /// Sets the instrumentation scope name used for emitted spans.
195    pub fn with_instrumentation_scope(mut self, scope: impl Into<String>) -> Self {
196        self.instrumentation_scope = scope.into();
197        self
198    }
199}
200
201/// OpenInference-backed NeMo Flow subscriber.
202#[derive(Clone)]
203pub struct OpenInferenceSubscriber {
204    inner: Arc<Inner>,
205}
206
207struct Inner {
208    processor: Arc<Mutex<OpenInferenceEventProcessor>>,
209    subscriber: EventSubscriberFn,
210}
211
212impl OpenInferenceSubscriber {
213    /// Builds a subscriber backed by a new OTLP tracer provider.
214    pub fn new(config: OpenInferenceConfig) -> Result<Self> {
215        #[cfg(not(target_arch = "wasm32"))]
216        if config.transport == OtlpTransport::Grpc && tokio::runtime::Handle::try_current().is_err()
217        {
218            return Err(OpenInferenceError::MissingTokioRuntime);
219        }
220        #[cfg(target_arch = "wasm32")]
221        if config.transport == OtlpTransport::Grpc {
222            return Err(OpenInferenceError::UnsupportedTransport { transport: "gRPC" });
223        }
224
225        let provider = build_tracer_provider(&config)?;
226        Ok(Self::from_tracer_provider_with_scope(
227            provider,
228            config.instrumentation_scope,
229        ))
230    }
231
232    /// Builds a subscriber from an already-configured tracer provider.
233    pub fn from_tracer_provider(
234        provider: SdkTracerProvider,
235        instrumentation_scope: impl Into<String>,
236    ) -> Self {
237        Self::from_tracer_provider_with_scope(provider, instrumentation_scope.into())
238    }
239
240    fn from_tracer_provider_with_scope(
241        provider: SdkTracerProvider,
242        instrumentation_scope: String,
243    ) -> Self {
244        let processor = Arc::new(Mutex::new(OpenInferenceEventProcessor::new(
245            provider,
246            instrumentation_scope,
247        )));
248        let processor_for_callback = Arc::clone(&processor);
249        let subscriber: EventSubscriberFn = Arc::new(move |event: &Event| {
250            let Ok(mut guard) = processor_for_callback.lock() else {
251                // Observability should not take down the host process if the
252                // subscriber state was previously poisoned.
253                return;
254            };
255            guard.process(event);
256        });
257
258        Self {
259            inner: Arc::new(Inner {
260                processor,
261                subscriber,
262            }),
263        }
264    }
265
266    /// Returns the raw NeMo Flow subscriber callback for custom registration flows.
267    pub fn subscriber(&self) -> EventSubscriberFn {
268        Arc::clone(&self.inner.subscriber)
269    }
270
271    /// Registers this subscriber globally with the NeMo Flow runtime.
272    pub fn register(&self, name: &str) -> Result<()> {
273        register_subscriber(name, self.subscriber()).map_err(Into::into)
274    }
275
276    /// Deregisters a previously-registered global subscriber by name.
277    pub fn deregister(&self, name: &str) -> Result<bool> {
278        deregister_subscriber(name).map_err(Into::into)
279    }
280
281    /// Flushes finished spans through the underlying tracer provider.
282    pub fn force_flush(&self) -> Result<()> {
283        let guard = self.inner.processor.lock().map_err(|_| {
284            OpenInferenceError::Provider("the subscriber state lock was poisoned".to_string())
285        })?;
286        guard.force_flush()
287    }
288
289    /// Shuts down the underlying tracer provider.
290    ///
291    /// Call `deregister(...)` first if the subscriber is still registered with NeMo Flow.
292    pub fn shutdown(&self) -> Result<()> {
293        let guard = self.inner.processor.lock().map_err(|_| {
294            OpenInferenceError::Provider("the subscriber state lock was poisoned".to_string())
295        })?;
296        guard.shutdown()
297    }
298}
299
300#[cfg(target_arch = "wasm32")]
301#[derive(Debug, Clone, Copy, Default)]
302struct WasmHttpClient;
303
304#[cfg(target_arch = "wasm32")]
305#[async_trait]
306impl HttpClient for WasmHttpClient {
307    async fn send_bytes(
308        &self,
309        request: HttpRequest<Bytes>,
310    ) -> std::result::Result<HttpResponse<Bytes>, HttpError> {
311        let (parts, body) = request.into_parts();
312
313        let request = {
314            let request_url = parts.uri.to_string();
315            let init = RequestInit::new();
316            init.set_method(parts.method.as_str());
317            if !body.is_empty() {
318                let body_bytes = js_sys::Uint8Array::from(body.as_ref());
319                init.set_body_opt_u8_array(Some(&body_bytes));
320            }
321
322            let request =
323                WebRequest::new_with_str_and_init(&request_url, &init).map_err(js_error)?;
324            let request_headers = request.headers();
325            for (name, value) in &parts.headers {
326                let value = value
327                    .to_str()
328                    .map_err(|e| http_error(format!("invalid OTLP HTTP header {name}: {e}")))?;
329                request_headers
330                    .set(name.as_str(), value)
331                    .map_err(js_error)?;
332            }
333            request
334        };
335
336        let fetch_promise = if let Some(window) = web_sys::window() {
337            window.fetch_with_request(&request)
338        } else {
339            let global = js_sys::global();
340            let fetch = js_sys::Reflect::get(&global, &JsValue::from_str("fetch"))
341                .map_err(js_error)?
342                .dyn_into::<js_sys::Function>()
343                .map_err(js_error)?;
344            fetch.call1(&global, &request).map_err(js_error)?.into()
345        };
346        // Waiting on the fetch promise from a synchronous wasm call stack can deadlock
347        // Node/browser event processing, so dispatch the request asynchronously.
348        spawn_local(async move {
349            if let Err(error) = JsFuture::from(fetch_promise).await {
350                web_sys::console::warn_1(&JsValue::from_str(&format!(
351                    "OpenInference OTLP/HTTP export failed: {error:?}"
352                )));
353            }
354        });
355
356        HttpResponse::builder()
357            .status(202)
358            .body(Bytes::new())
359            .map_err(|e| http_error(e.to_string()))
360    }
361}
362
363#[cfg(target_arch = "wasm32")]
364fn js_error(value: JsValue) -> HttpError {
365    http_error(
366        value
367            .as_string()
368            .unwrap_or_else(|| format!("JavaScript error: {value:?}")),
369    )
370}
371
372#[cfg(target_arch = "wasm32")]
373fn http_error(message: impl Into<String>) -> HttpError {
374    Box::new(std::io::Error::other(message.into()))
375}
376
377fn build_tracer_provider(config: &OpenInferenceConfig) -> Result<SdkTracerProvider> {
378    let exporter = match config.transport {
379        OtlpTransport::HttpBinary => {
380            #[cfg(not(target_arch = "wasm32"))]
381            install_rustls_crypto_provider();
382            let mut builder = SpanExporter::builder()
383                .with_http()
384                .with_protocol(Protocol::HttpBinary)
385                .with_timeout(config.timeout);
386            if let Some(endpoint) = &config.endpoint {
387                builder = builder.with_endpoint(endpoint.clone());
388            }
389            if !config.headers.is_empty() {
390                builder = builder.with_headers(config.headers.clone());
391            }
392            #[cfg(target_arch = "wasm32")]
393            {
394                builder = builder.with_http_client(WasmHttpClient);
395            }
396            builder
397                .build()
398                .map_err(|e| OpenInferenceError::ExporterBuild(e.to_string()))?
399        }
400        #[cfg(not(target_arch = "wasm32"))]
401        OtlpTransport::Grpc => {
402            let mut builder = SpanExporter::builder()
403                .with_tonic()
404                .with_protocol(Protocol::Grpc)
405                .with_timeout(config.timeout);
406            if let Some(endpoint) = &config.endpoint {
407                builder = builder.with_endpoint(endpoint.clone());
408            }
409            if !config.headers.is_empty() {
410                builder = builder.with_metadata(build_grpc_metadata(&config.headers)?);
411            }
412            builder
413                .build()
414                .map_err(|e| OpenInferenceError::ExporterBuild(e.to_string()))?
415        }
416        #[cfg(target_arch = "wasm32")]
417        OtlpTransport::Grpc => {
418            return Err(OpenInferenceError::UnsupportedTransport { transport: "gRPC" });
419        }
420    };
421
422    let mut resource_attributes = vec![KeyValue::new("service.name", config.service_name.clone())];
423    if let Some(service_namespace) = &config.service_namespace {
424        resource_attributes.push(KeyValue::new(
425            "service.namespace",
426            service_namespace.clone(),
427        ));
428    }
429    if let Some(service_version) = &config.service_version {
430        resource_attributes.push(KeyValue::new("service.version", service_version.clone()));
431    }
432    for (key, value) in &config.resource_attributes {
433        resource_attributes.push(KeyValue::new(key.clone(), value.clone()));
434    }
435
436    // Disable per-span attribute caps. OpenInference emits many flat
437    // `llm.input_messages.*` attributes on long conversations; the OTel SDK
438    // default (128) silently drops attributes added last in the span's
439    // lifecycle, notably `llm.token_count.*` emitted at span end.
440    let builder = SdkTracerProvider::builder()
441        .with_resource(
442            Resource::builder_empty()
443                .with_attributes(resource_attributes)
444                .build(),
445        )
446        .with_max_attributes_per_span(u32::MAX)
447        .with_max_attributes_per_event(u32::MAX);
448
449    #[cfg(not(target_arch = "wasm32"))]
450    {
451        if Handle::try_current().is_ok() {
452            Ok(builder.with_batch_exporter(exporter).build())
453        } else {
454            Ok(builder.with_simple_exporter(exporter).build())
455        }
456    }
457    #[cfg(target_arch = "wasm32")]
458    {
459        Ok(builder.with_simple_exporter(exporter).build())
460    }
461}
462
463#[cfg(not(target_arch = "wasm32"))]
464fn install_rustls_crypto_provider() {
465    let _ = rustls::crypto::ring::default_provider().install_default();
466}
467
468#[cfg(not(target_arch = "wasm32"))]
469fn build_grpc_metadata(headers: &HashMap<String, String>) -> Result<MetadataMap> {
470    let mut metadata = MetadataMap::new();
471    for (key, value) in headers {
472        let metadata_key = MetadataKey::from_bytes(key.as_bytes()).map_err(|e| {
473            OpenInferenceError::InvalidGrpcHeader {
474                key: key.clone(),
475                message: e.to_string(),
476            }
477        })?;
478        let metadata_value = MetadataValue::try_from(value.as_str()).map_err(|e| {
479            OpenInferenceError::InvalidGrpcHeader {
480                key: key.clone(),
481                message: e.to_string(),
482            }
483        })?;
484        metadata.insert(metadata_key, metadata_value);
485    }
486    Ok(metadata)
487}
488
489struct ActiveSpan {
490    span: Span,
491    span_context: SpanContext,
492}
493
494struct OpenInferenceEventProcessor {
495    active_spans: HashMap<Uuid, ActiveSpan>,
496    provider: SdkTracerProvider,
497    tracer: SdkTracer,
498}
499
500impl OpenInferenceEventProcessor {
501    fn new(provider: SdkTracerProvider, instrumentation_scope: String) -> Self {
502        let tracer = provider.tracer(instrumentation_scope);
503        Self {
504            active_spans: HashMap::new(),
505            provider,
506            tracer,
507        }
508    }
509
510    fn process(&mut self, event: &Event) {
511        match event.scope_category() {
512            Some(ScopeCategory::Start) => self.process_start(event),
513            Some(ScopeCategory::End) => self.process_end(event),
514            None => self.process_mark(event),
515        }
516    }
517
518    fn force_flush(&self) -> Result<()> {
519        self.provider
520            .force_flush()
521            .map_err(|e| OpenInferenceError::Provider(e.to_string()))
522    }
523
524    fn shutdown(&self) -> Result<()> {
525        self.provider
526            .shutdown()
527            .map_err(|e| OpenInferenceError::Provider(e.to_string()))
528    }
529
530    fn process_start(&mut self, event: &Event) {
531        let mut span = self
532            .tracer
533            .span_builder(span_name(event))
534            .with_kind(span_kind(event))
535            .with_start_time(to_system_time(*event.timestamp()))
536            .start_with_context(&self.tracer, &self.parent_context(event));
537        span.set_attributes(start_attributes(event));
538        let span_context = local_parent_span_context(span.span_context());
539        self.active_spans
540            .insert(event.uuid(), ActiveSpan { span, span_context });
541    }
542
543    fn process_end(&mut self, event: &Event) {
544        let Some(mut active_span) = self.active_spans.remove(&event.uuid()) else {
545            return;
546        };
547        active_span.span.set_attributes(end_attributes(event));
548        active_span
549            .span
550            .end_with_timestamp(to_system_time(*event.timestamp()));
551    }
552
553    fn process_mark(&mut self, event: &Event) {
554        let mark_name = event.name().to_string();
555        let timestamp = to_system_time(*event.timestamp());
556        let attributes = mark_attributes(event);
557
558        if let Some(parent_span) = self.find_parent_span_mut(event) {
559            parent_span
560                .span
561                .add_event_with_timestamp(mark_name, timestamp, attributes);
562            return;
563        }
564
565        let mut span = self
566            .tracer
567            .span_builder(format!("mark:{mark_name}"))
568            .with_kind(SpanKind::Internal)
569            .with_start_time(timestamp)
570            .start_with_context(&self.tracer, &self.parent_context(event));
571        let mut span_attributes = attributes;
572        span_attributes.push(KeyValue::new(
573            oi::OPENINFERENCE_SPAN_KIND,
574            OpenInferenceSpanKind::Chain,
575        ));
576        span_attributes.push(KeyValue::new("nemo_flow.mark.orphan", true));
577        span.set_attributes(span_attributes);
578        span.end_with_timestamp(timestamp);
579    }
580
581    fn parent_context(&self, event: &Event) -> Context {
582        self.find_parent_span(event)
583            .map(|active_span| {
584                Context::new().with_remote_span_context(active_span.span_context.clone())
585            })
586            .unwrap_or_default()
587    }
588
589    fn parent_span_uuid(&self, event: &Event) -> Option<Uuid> {
590        event
591            .parent_uuid()
592            .filter(|uuid| self.active_spans.contains_key(uuid))
593    }
594
595    fn find_parent_span(&self, event: &Event) -> Option<&ActiveSpan> {
596        self.parent_span_uuid(event)
597            .and_then(|uuid| self.active_spans.get(&uuid))
598    }
599
600    fn find_parent_span_mut(&mut self, event: &Event) -> Option<&mut ActiveSpan> {
601        self.parent_span_uuid(event)
602            .and_then(|uuid| self.active_spans.get_mut(&uuid))
603    }
604}
605
606fn span_kind(event: &Event) -> SpanKind {
607    match semantic_scope_type(event) {
608        Some(ScopeType::Llm) => SpanKind::Client,
609        Some(
610            ScopeType::Tool | ScopeType::Retriever | ScopeType::Embedder | ScopeType::Reranker,
611        ) => SpanKind::Client,
612        _ => SpanKind::Internal,
613    }
614}
615
616fn span_name(event: &Event) -> String {
617    event.name().to_string()
618}
619
620fn semantic_scope_type(event: &Event) -> Option<ScopeType> {
621    event.scope_type()
622}
623
624fn scope_type_name(scope_type: Option<ScopeType>) -> &'static str {
625    match scope_type {
626        Some(ScopeType::Agent) => "agent",
627        Some(ScopeType::Function) => "function",
628        Some(ScopeType::Tool) => "tool",
629        Some(ScopeType::Llm) => "llm",
630        Some(ScopeType::Retriever) => "retriever",
631        Some(ScopeType::Embedder) => "embedder",
632        Some(ScopeType::Reranker) => "reranker",
633        Some(ScopeType::Guardrail) => "guardrail",
634        Some(ScopeType::Evaluator) => "evaluator",
635        Some(ScopeType::Custom) => "custom",
636        Some(ScopeType::Unknown) | None => "unknown",
637    }
638}
639
640fn start_attributes(event: &Event) -> Vec<KeyValue> {
641    let mut attributes = common_attributes(event);
642    let handle_attributes = event.attributes();
643    if handle_attributes.is_some_and(|attributes| !attributes.is_empty()) {
644        push_serialized(
645            &mut attributes,
646            "nemo_flow.handle_attributes_json",
647            handle_attributes,
648        );
649    }
650    if event
651        .category()
652        .is_none_or(|category| category.as_str() != "llm")
653    {
654        push_serialized(&mut attributes, "nemo_flow.start.input_json", event.input());
655    }
656    if event
657        .category()
658        .is_some_and(|category| category.as_str() == "tool")
659    {
660        attributes.push(KeyValue::new(oi::tool::NAME, event.name().to_string()));
661        attributes.push(KeyValue::new(
662            oi::tool_call::function::NAME,
663            event.name().to_string(),
664        ));
665    }
666
667    if let Some((input, mime_type)) = openinference_input_value(event) {
668        attributes.push(KeyValue::new(oi::input::VALUE, input.clone()));
669        attributes.push(KeyValue::new(oi::input::MIME_TYPE, mime_type));
670
671        if event
672            .category()
673            .is_some_and(|category| category.as_str() == "tool")
674        {
675            attributes.push(KeyValue::new(oi::tool::PARAMETERS, input.clone()));
676            attributes.push(KeyValue::new(oi::tool_call::function::ARGUMENTS, input));
677        }
678    }
679    attributes
680}
681
682fn end_attributes(event: &Event) -> Vec<KeyValue> {
683    let mut attributes = Vec::new();
684    push_serialized(&mut attributes, "nemo_flow.end.output_json", event.output());
685    if let Some((output, mime_type)) = openinference_output_value(event) {
686        attributes.push(KeyValue::new(oi::output::VALUE, output));
687        attributes.push(KeyValue::new(oi::output::MIME_TYPE, mime_type));
688    }
689    let fallback_usage = if event
690        .category()
691        .is_some_and(|category| category.as_str() == "llm")
692    {
693        usage_from_manual_llm_output(event.output())
694    } else {
695        None
696    };
697    let usage = event
698        .annotated_response()
699        .and_then(|response| response.usage.as_ref())
700        .or(fallback_usage.as_ref());
701    if event
702        .category()
703        .is_some_and(|category| category.as_str() == "llm")
704        && let Some(usage) = usage
705    {
706        if let Some(v) = usage.prompt_tokens {
707            attributes.push(KeyValue::new(oi::llm::token_count::PROMPT, v as i64));
708        }
709        if let Some(v) = usage.completion_tokens {
710            attributes.push(KeyValue::new(oi::llm::token_count::COMPLETION, v as i64));
711        }
712        if let Some(v) = usage.total_tokens {
713            attributes.push(KeyValue::new(oi::llm::token_count::TOTAL, v as i64));
714        }
715        if let Some(v) = usage.cache_read_tokens {
716            attributes.push(KeyValue::new(
717                oi::llm::token_count::prompt_details::CACHE_READ,
718                v as i64,
719            ));
720        }
721        if let Some(v) = usage.cache_write_tokens {
722            attributes.push(KeyValue::new(
723                oi::llm::token_count::prompt_details::CACHE_WRITE,
724                v as i64,
725            ));
726        }
727    }
728    attributes
729}
730
731fn usage_from_manual_llm_output(output: Option<&Json>) -> Option<Usage> {
732    let object = output?.as_object()?;
733    let usage = object.get("usage").and_then(Json::as_object);
734    let token_usage = object.get("token_usage").and_then(Json::as_object);
735    if usage.is_none() && token_usage.is_none() {
736        return None;
737    }
738
739    let prompt_tokens = first_u64_from_manual_usage(
740        usage,
741        token_usage,
742        &["prompt_tokens", "input_tokens", "inputTokens", "input"],
743    );
744    let completion_tokens = first_u64_from_manual_usage(
745        usage,
746        token_usage,
747        &[
748            "completion_tokens",
749            "output_tokens",
750            "completionTokens",
751            "outputTokens",
752            "output",
753        ],
754    );
755    let reported_total_tokens = first_u64_from_manual_usage(
756        usage,
757        token_usage,
758        &["total_tokens", "totalTokens", "total"],
759    );
760    let cache_read_tokens = first_u64_from_manual_usage(
761        usage,
762        token_usage,
763        &[
764            "cache_read_tokens",
765            "cached_tokens",
766            "cache_read_input_tokens",
767            "cacheReadTokens",
768            "cachedTokens",
769            "cacheReadInputTokens",
770            "cacheRead",
771        ],
772    );
773    let cache_write_tokens = first_u64_from_manual_usage(
774        usage,
775        token_usage,
776        &[
777            "cache_write_tokens",
778            "cache_creation_input_tokens",
779            "cacheWriteTokens",
780            "cacheCreationInputTokens",
781            "cacheWrite",
782        ],
783    );
784
785    if prompt_tokens.is_none()
786        && completion_tokens.is_none()
787        && reported_total_tokens.is_none()
788        && cache_read_tokens.is_none()
789        && cache_write_tokens.is_none()
790    {
791        return None;
792    }
793    let total_tokens =
794        normalize_total_tokens(reported_total_tokens, prompt_tokens, completion_tokens);
795
796    Some(Usage {
797        prompt_tokens,
798        completion_tokens,
799        total_tokens,
800        cache_read_tokens,
801        cache_write_tokens,
802    })
803}
804
805fn normalize_total_tokens(
806    total_tokens: Option<u64>,
807    prompt_tokens: Option<u64>,
808    completion_tokens: Option<u64>,
809) -> Option<u64> {
810    let total_tokens = total_tokens?;
811    let minimum_total = prompt_tokens
812        .unwrap_or(0)
813        .saturating_add(completion_tokens.unwrap_or(0));
814    if minimum_total == 0 || total_tokens >= minimum_total {
815        Some(total_tokens)
816    } else {
817        None
818    }
819}
820
821fn first_u64_from_manual_usage(
822    usage: Option<&serde_json::Map<String, Json>>,
823    token_usage: Option<&serde_json::Map<String, Json>>,
824    keys: &[&str],
825) -> Option<u64> {
826    usage
827        .and_then(|value| first_u64(value, keys))
828        .or_else(|| token_usage.and_then(|value| first_u64(value, keys)))
829}
830
831fn first_u64(usage: &serde_json::Map<String, Json>, keys: &[&str]) -> Option<u64> {
832    keys.iter()
833        .find_map(|key| usage.get(*key).and_then(Json::as_u64))
834}
835
836fn mark_attributes(event: &Event) -> Vec<KeyValue> {
837    let handle_attributes = event.attributes();
838    let mut attributes = vec![
839        KeyValue::new("nemo_flow.mark.uuid", event.uuid().to_string()),
840        KeyValue::new(
841            "nemo_flow.mark.parent_uuid",
842            event
843                .parent_uuid()
844                .map(|uuid| uuid.to_string())
845                .unwrap_or_default(),
846        ),
847    ];
848    push_serialized(
849        &mut attributes,
850        "nemo_flow.mark.attributes_json",
851        handle_attributes,
852    );
853    push_serialized(&mut attributes, "nemo_flow.mark.data_json", event.data());
854    push_serialized(
855        &mut attributes,
856        "nemo_flow.mark.metadata_json",
857        event.metadata(),
858    );
859    attributes
860}
861
862fn common_attributes(event: &Event) -> Vec<KeyValue> {
863    let mut attributes = vec![
864        KeyValue::new(
865            oi::OPENINFERENCE_SPAN_KIND,
866            openinference_span_kind(semantic_scope_type(event)),
867        ),
868        KeyValue::new("nemo_flow.uuid", event.uuid().to_string()),
869        KeyValue::new(
870            "nemo_flow.parent_uuid",
871            event
872                .parent_uuid()
873                .map(|uuid| uuid.to_string())
874                .unwrap_or_default(),
875        ),
876        KeyValue::new(
877            "nemo_flow.scope_type",
878            scope_type_name(semantic_scope_type(event)),
879        ),
880    ];
881
882    if let Some(model_name) = event.model_name() {
883        attributes.push(KeyValue::new(oi::llm::MODEL_NAME, model_name.to_string()));
884    }
885    if let Some(tool_call_id) = event.tool_call_id() {
886        attributes.push(KeyValue::new(oi::tool_call::ID, tool_call_id.to_string()));
887    }
888    if let Some(metadata) = event.metadata().and_then(to_json_string) {
889        attributes.push(KeyValue::new(oi::METADATA, metadata));
890    }
891
892    attributes
893}
894
895fn openinference_span_kind(scope_type: Option<ScopeType>) -> OpenInferenceSpanKind {
896    match scope_type {
897        Some(ScopeType::Agent) => OpenInferenceSpanKind::Agent,
898        Some(ScopeType::Tool) => OpenInferenceSpanKind::Tool,
899        Some(ScopeType::Llm) => OpenInferenceSpanKind::Llm,
900        Some(ScopeType::Retriever) => OpenInferenceSpanKind::Retriever,
901        Some(ScopeType::Embedder) => OpenInferenceSpanKind::Embedding,
902        Some(ScopeType::Reranker) => OpenInferenceSpanKind::Reranker,
903        Some(ScopeType::Guardrail) => OpenInferenceSpanKind::Guardrail,
904        Some(ScopeType::Evaluator) => OpenInferenceSpanKind::Evaluator,
905        Some(ScopeType::Function | ScopeType::Custom | ScopeType::Unknown) | None => {
906            OpenInferenceSpanKind::Chain
907        }
908    }
909}
910
911fn push_serialized<T: Serialize + ?Sized>(
912    attributes: &mut Vec<KeyValue>,
913    key: &'static str,
914    value: Option<&T>,
915) {
916    if let Some(value) = value
917        && let Ok(json) = serde_json::to_string(value)
918    {
919        attributes.push(KeyValue::new(key, json));
920    }
921}
922
923fn openinference_input_value(event: &Event) -> Option<(String, &'static str)> {
924    let input = event.input()?;
925
926    if event
927        .category()
928        .is_some_and(|category| category.as_str() == "llm")
929    {
930        return llm_input_display_value(input)
931            .map(|display| (display, "text/plain"))
932            .or_else(|| sanitized_llm_input_json(input).map(|json| (json, "application/json")));
933    }
934
935    to_json_string(input).map(|json| (json, "application/json"))
936}
937
938fn openinference_output_value(event: &Event) -> Option<(String, &'static str)> {
939    let output = event.output()?;
940    display_text_from_json(output)
941        .map(|display| (display, "text/plain"))
942        .or_else(|| to_json_string(output).map(|json| (json, "application/json")))
943}
944
945fn llm_input_display_value(input: &Json) -> Option<String> {
946    let content = match input {
947        Json::Object(object) => object.get("content").unwrap_or(input),
948        _ => input,
949    };
950
951    content
952        .get("messages")
953        .and_then(display_text_from_messages)
954        .or_else(|| display_text_from_json(content))
955}
956
957fn sanitized_llm_input_json(input: &Json) -> Option<String> {
958    match input {
959        Json::Object(object) => {
960            let mut sanitized = object.clone();
961            sanitized.remove("headers");
962            to_json_string(&Json::Object(sanitized))
963        }
964        _ => to_json_string(input),
965    }
966}
967
968fn display_text_from_json(value: &Json) -> Option<String> {
969    match value {
970        Json::String(text) => display_text_from_string(text),
971        Json::Object(object) => {
972            for key in ["content", "summary", "message", "text", "prompt"] {
973                if let Some(display) = object.get(key).and_then(display_text_from_json) {
974                    return Some(display);
975                }
976            }
977            object
978                .get("choices")
979                .and_then(display_text_from_chat_choices)
980                .or_else(|| {
981                    object
982                        .get("tool_calls")
983                        .and_then(display_text_from_tool_calls)
984                })
985        }
986        Json::Array(items) => display_text_from_content_blocks(items),
987        _ => None,
988    }
989}
990
991fn display_text_from_messages(value: &Json) -> Option<String> {
992    let messages = value.as_array()?;
993    let text = messages
994        .iter()
995        .filter_map(display_text_from_message)
996        .collect::<Vec<_>>()
997        .join("\n\n")
998        .trim()
999        .to_string();
1000    if text.is_empty() { None } else { Some(text) }
1001}
1002
1003fn display_text_from_message(value: &Json) -> Option<String> {
1004    let role = value
1005        .get("role")
1006        .and_then(Json::as_str)
1007        .unwrap_or("message");
1008    if role == "tool" {
1009        return Some("tool: Tool result omitted".to_string());
1010    }
1011    let display = value
1012        .get("content")
1013        .and_then(display_text_from_json)
1014        .or_else(|| {
1015            value
1016                .get("tool_calls")
1017                .and_then(display_text_from_tool_calls)
1018        })?;
1019    Some(format!("{role}: {display}"))
1020}
1021
1022fn display_text_from_string(text: &str) -> Option<String> {
1023    let trimmed = text.trim();
1024    if trimmed.is_empty() {
1025        return None;
1026    }
1027    if let Ok(parsed) = serde_json::from_str::<Json>(trimmed)
1028        && let Some(display) = display_text_from_json(&parsed)
1029    {
1030        return Some(display);
1031    }
1032    Some(trimmed.to_string())
1033}
1034
1035fn display_text_from_chat_choices(value: &Json) -> Option<String> {
1036    let choices = value.as_array()?;
1037    for choice in choices {
1038        let Some(message) = choice.get("message") else {
1039            continue;
1040        };
1041        let content = message.get("content").and_then(display_text_from_json);
1042        let tool_calls = message
1043            .get("tool_calls")
1044            .and_then(display_text_from_tool_calls);
1045        match (content, tool_calls) {
1046            (Some(content), Some(tool_calls)) => return Some(format!("{content}\n{tool_calls}")),
1047            (Some(content), None) => return Some(content),
1048            (None, Some(tool_calls)) => return Some(tool_calls),
1049            (None, None) => {}
1050        }
1051    }
1052    None
1053}
1054
1055fn display_text_from_content_blocks(items: &[Json]) -> Option<String> {
1056    let mut entries = items
1057        .iter()
1058        .filter_map(content_block_display_text)
1059        .collect::<Vec<_>>();
1060    let tool_calls = items.iter().filter_map(tool_call_name).collect::<Vec<_>>();
1061    if !tool_calls.is_empty() {
1062        entries.push(format!("Requested tools: {}", tool_calls.join(", ")));
1063    }
1064    let text = entries
1065        .into_iter()
1066        .filter(|item| !item.trim().is_empty())
1067        .collect::<Vec<_>>()
1068        .join("\n")
1069        .trim()
1070        .to_string();
1071    if text.is_empty() { None } else { Some(text) }
1072}
1073
1074fn content_block_display_text(item: &Json) -> Option<String> {
1075    if let Some(text) = item.as_str() {
1076        return Some(text.to_string());
1077    }
1078    if item.get("stripped").and_then(Json::as_bool) == Some(true) {
1079        return None;
1080    }
1081    if let Some("thinking" | "reasoning" | "toolResult" | "tool_result") =
1082        item.get("type").and_then(Json::as_str)
1083    {
1084        return None;
1085    }
1086    item.get("text").and_then(Json::as_str).map(str::to_string)
1087}
1088
1089fn display_text_from_tool_calls(value: &Json) -> Option<String> {
1090    let calls = value.as_array()?;
1091    let names = calls.iter().filter_map(tool_call_name).collect::<Vec<_>>();
1092    if names.is_empty() {
1093        None
1094    } else {
1095        Some(format!("Requested tools: {}", names.join(", ")))
1096    }
1097}
1098
1099fn tool_call_name(value: &Json) -> Option<String> {
1100    value
1101        .get("name")
1102        .and_then(Json::as_str)
1103        .or_else(|| value.get("toolName").and_then(Json::as_str))
1104        .or_else(|| {
1105            value
1106                .get("function")
1107                .and_then(|function| function.get("name"))
1108                .and_then(Json::as_str)
1109        })
1110        .map(str::to_string)
1111}
1112
1113fn to_json_string<T: Serialize>(value: &T) -> Option<String> {
1114    serde_json::to_string(value).ok()
1115}
1116
1117fn local_parent_span_context(span_context: &SpanContext) -> SpanContext {
1118    SpanContext::new(
1119        span_context.trace_id(),
1120        span_context.span_id(),
1121        span_context.trace_flags(),
1122        false,
1123        span_context.trace_state().clone(),
1124    )
1125}
1126
1127fn to_system_time(timestamp: DateTime<Utc>) -> SystemTime {
1128    let seconds = timestamp.timestamp();
1129    let nanos = timestamp.timestamp_subsec_nanos();
1130    if seconds >= 0 {
1131        UNIX_EPOCH + Duration::new(seconds as u64, nanos)
1132    } else if nanos == 0 {
1133        UNIX_EPOCH - Duration::new(seconds.unsigned_abs(), 0)
1134    } else {
1135        UNIX_EPOCH - Duration::new(seconds.unsigned_abs() - 1, 1_000_000_000 - nanos)
1136    }
1137}
1138
1139#[cfg(test)]
1140#[path = "../../tests/unit/observability/openinference_tests.rs"]
1141mod tests;