1use std::collections::HashMap;
20use std::sync::{Arc, Mutex};
21use std::time::{Duration, SystemTime, UNIX_EPOCH};
22
23use crate::api::event::{Event, ScopeCategory};
24use crate::api::runtime::EventSubscriberFn;
25use crate::api::scope::ScopeType;
26use crate::api::subscriber::{deregister_subscriber, register_subscriber};
27use crate::codec::response::Usage;
28use crate::error::FlowError;
29use crate::json::Json;
30use chrono::{DateTime, Utc};
31use openinference_semantic_conventions::SpanKind as OpenInferenceSpanKind;
32use openinference_semantic_conventions::attributes as oi;
33use opentelemetry::trace::{
34 Span as _, SpanContext, SpanKind, TraceContextExt, Tracer, TracerProvider as _,
35};
36use opentelemetry::{Context, KeyValue};
37use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig, WithHttpConfig};
38use opentelemetry_sdk::Resource;
39use opentelemetry_sdk::trace::{SdkTracer, SdkTracerProvider, Span};
40use serde::Serialize;
41use uuid::Uuid;
42
43#[cfg(target_arch = "wasm32")]
44use async_trait::async_trait;
45#[cfg(target_arch = "wasm32")]
46use opentelemetry_http::{
47 Bytes, HttpClient, HttpError, Request as HttpRequest, Response as HttpResponse,
48};
49#[cfg(not(target_arch = "wasm32"))]
50use opentelemetry_otlp::WithTonicConfig;
51#[cfg(not(target_arch = "wasm32"))]
52use tokio::runtime::Handle;
53#[cfg(not(target_arch = "wasm32"))]
54use tonic::metadata::{MetadataKey, MetadataMap, MetadataValue};
55#[cfg(target_arch = "wasm32")]
56use wasm_bindgen::{JsCast, JsValue};
57#[cfg(target_arch = "wasm32")]
58use wasm_bindgen_futures::{JsFuture, spawn_local};
59#[cfg(target_arch = "wasm32")]
60use web_sys::{Request as WebRequest, RequestInit};
61
62pub type Result<T> = std::result::Result<T, OpenInferenceError>;
64
65#[derive(Debug, thiserror::Error)]
67pub enum OpenInferenceError {
68 #[error("the OTLP gRPC exporter requires an active Tokio runtime")]
70 MissingTokioRuntime,
71 #[error("the OTLP {transport} transport is not supported on this target")]
73 UnsupportedTransport {
74 transport: &'static str,
76 },
77 #[error("invalid OTLP gRPC header {key:?}: {message}")]
79 InvalidGrpcHeader {
80 key: String,
82 message: String,
84 },
85 #[error("failed to build the OTLP exporter: {0}")]
87 ExporterBuild(String),
88 #[error("OpenInference tracer provider error: {0}")]
90 Provider(String),
91 #[error(transparent)]
93 Core(#[from] FlowError),
94}
95
96#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
98pub enum OtlpTransport {
99 #[default]
101 HttpBinary,
102 Grpc,
104}
105
106#[derive(Debug, Clone)]
108pub struct OpenInferenceConfig {
109 endpoint: Option<String>,
110 headers: HashMap<String, String>,
111 resource_attributes: HashMap<String, String>,
112 service_name: String,
113 service_namespace: Option<String>,
114 service_version: Option<String>,
115 instrumentation_scope: String,
116 timeout: Duration,
117 transport: OtlpTransport,
118}
119
120impl Default for OpenInferenceConfig {
121 fn default() -> Self {
122 Self {
123 endpoint: None,
124 headers: HashMap::new(),
125 resource_attributes: HashMap::new(),
126 service_name: "nemo-flow".to_string(),
127 service_namespace: None,
128 service_version: None,
129 instrumentation_scope: "nemo-flow-openinference".to_string(),
130 timeout: Duration::from_secs(3),
131 transport: OtlpTransport::HttpBinary,
132 }
133 }
134}
135
136impl OpenInferenceConfig {
137 pub fn new() -> Self {
139 Self::default()
140 }
141
142 pub fn with_transport(mut self, transport: OtlpTransport) -> Self {
144 self.transport = transport;
145 self
146 }
147
148 pub fn with_service_name(mut self, service_name: impl Into<String>) -> Self {
150 self.service_name = service_name.into();
151 self
152 }
153
154 pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
156 self.endpoint = Some(endpoint.into());
157 self
158 }
159
160 pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
162 self.headers.insert(key.into(), value.into());
163 self
164 }
165
166 pub fn with_resource_attribute(
168 mut self,
169 key: impl Into<String>,
170 value: impl Into<String>,
171 ) -> Self {
172 self.resource_attributes.insert(key.into(), value.into());
173 self
174 }
175
176 pub fn with_timeout(mut self, timeout: Duration) -> Self {
178 self.timeout = timeout;
179 self
180 }
181
182 pub fn with_service_namespace(mut self, namespace: impl Into<String>) -> Self {
184 self.service_namespace = Some(namespace.into());
185 self
186 }
187
188 pub fn with_service_version(mut self, version: impl Into<String>) -> Self {
190 self.service_version = Some(version.into());
191 self
192 }
193
194 pub fn with_instrumentation_scope(mut self, scope: impl Into<String>) -> Self {
196 self.instrumentation_scope = scope.into();
197 self
198 }
199}
200
201#[derive(Clone)]
203pub struct OpenInferenceSubscriber {
204 inner: Arc<Inner>,
205}
206
207struct Inner {
208 processor: Arc<Mutex<OpenInferenceEventProcessor>>,
209 subscriber: EventSubscriberFn,
210}
211
212impl OpenInferenceSubscriber {
213 pub fn new(config: OpenInferenceConfig) -> Result<Self> {
215 #[cfg(not(target_arch = "wasm32"))]
216 if config.transport == OtlpTransport::Grpc && tokio::runtime::Handle::try_current().is_err()
217 {
218 return Err(OpenInferenceError::MissingTokioRuntime);
219 }
220 #[cfg(target_arch = "wasm32")]
221 if config.transport == OtlpTransport::Grpc {
222 return Err(OpenInferenceError::UnsupportedTransport { transport: "gRPC" });
223 }
224
225 let provider = build_tracer_provider(&config)?;
226 Ok(Self::from_tracer_provider_with_scope(
227 provider,
228 config.instrumentation_scope,
229 ))
230 }
231
232 pub fn from_tracer_provider(
234 provider: SdkTracerProvider,
235 instrumentation_scope: impl Into<String>,
236 ) -> Self {
237 Self::from_tracer_provider_with_scope(provider, instrumentation_scope.into())
238 }
239
240 fn from_tracer_provider_with_scope(
241 provider: SdkTracerProvider,
242 instrumentation_scope: String,
243 ) -> Self {
244 let processor = Arc::new(Mutex::new(OpenInferenceEventProcessor::new(
245 provider,
246 instrumentation_scope,
247 )));
248 let processor_for_callback = Arc::clone(&processor);
249 let subscriber: EventSubscriberFn = Arc::new(move |event: &Event| {
250 let Ok(mut guard) = processor_for_callback.lock() else {
251 return;
254 };
255 guard.process(event);
256 });
257
258 Self {
259 inner: Arc::new(Inner {
260 processor,
261 subscriber,
262 }),
263 }
264 }
265
266 pub fn subscriber(&self) -> EventSubscriberFn {
268 Arc::clone(&self.inner.subscriber)
269 }
270
271 pub fn register(&self, name: &str) -> Result<()> {
273 register_subscriber(name, self.subscriber()).map_err(Into::into)
274 }
275
276 pub fn deregister(&self, name: &str) -> Result<bool> {
278 deregister_subscriber(name).map_err(Into::into)
279 }
280
281 pub fn force_flush(&self) -> Result<()> {
283 let guard = self.inner.processor.lock().map_err(|_| {
284 OpenInferenceError::Provider("the subscriber state lock was poisoned".to_string())
285 })?;
286 guard.force_flush()
287 }
288
289 pub fn shutdown(&self) -> Result<()> {
293 let guard = self.inner.processor.lock().map_err(|_| {
294 OpenInferenceError::Provider("the subscriber state lock was poisoned".to_string())
295 })?;
296 guard.shutdown()
297 }
298}
299
300#[cfg(target_arch = "wasm32")]
301#[derive(Debug, Clone, Copy, Default)]
302struct WasmHttpClient;
303
304#[cfg(target_arch = "wasm32")]
305#[async_trait]
306impl HttpClient for WasmHttpClient {
307 async fn send_bytes(
308 &self,
309 request: HttpRequest<Bytes>,
310 ) -> std::result::Result<HttpResponse<Bytes>, HttpError> {
311 let (parts, body) = request.into_parts();
312
313 let request = {
314 let request_url = parts.uri.to_string();
315 let init = RequestInit::new();
316 init.set_method(parts.method.as_str());
317 if !body.is_empty() {
318 let body_bytes = js_sys::Uint8Array::from(body.as_ref());
319 init.set_body_opt_u8_array(Some(&body_bytes));
320 }
321
322 let request =
323 WebRequest::new_with_str_and_init(&request_url, &init).map_err(js_error)?;
324 let request_headers = request.headers();
325 for (name, value) in &parts.headers {
326 let value = value
327 .to_str()
328 .map_err(|e| http_error(format!("invalid OTLP HTTP header {name}: {e}")))?;
329 request_headers
330 .set(name.as_str(), value)
331 .map_err(js_error)?;
332 }
333 request
334 };
335
336 let fetch_promise = if let Some(window) = web_sys::window() {
337 window.fetch_with_request(&request)
338 } else {
339 let global = js_sys::global();
340 let fetch = js_sys::Reflect::get(&global, &JsValue::from_str("fetch"))
341 .map_err(js_error)?
342 .dyn_into::<js_sys::Function>()
343 .map_err(js_error)?;
344 fetch.call1(&global, &request).map_err(js_error)?.into()
345 };
346 spawn_local(async move {
349 if let Err(error) = JsFuture::from(fetch_promise).await {
350 web_sys::console::warn_1(&JsValue::from_str(&format!(
351 "OpenInference OTLP/HTTP export failed: {error:?}"
352 )));
353 }
354 });
355
356 HttpResponse::builder()
357 .status(202)
358 .body(Bytes::new())
359 .map_err(|e| http_error(e.to_string()))
360 }
361}
362
363#[cfg(target_arch = "wasm32")]
364fn js_error(value: JsValue) -> HttpError {
365 http_error(
366 value
367 .as_string()
368 .unwrap_or_else(|| format!("JavaScript error: {value:?}")),
369 )
370}
371
372#[cfg(target_arch = "wasm32")]
373fn http_error(message: impl Into<String>) -> HttpError {
374 Box::new(std::io::Error::other(message.into()))
375}
376
377fn build_tracer_provider(config: &OpenInferenceConfig) -> Result<SdkTracerProvider> {
378 let exporter = match config.transport {
379 OtlpTransport::HttpBinary => {
380 #[cfg(not(target_arch = "wasm32"))]
381 install_rustls_crypto_provider();
382 let mut builder = SpanExporter::builder()
383 .with_http()
384 .with_protocol(Protocol::HttpBinary)
385 .with_timeout(config.timeout);
386 if let Some(endpoint) = &config.endpoint {
387 builder = builder.with_endpoint(endpoint.clone());
388 }
389 if !config.headers.is_empty() {
390 builder = builder.with_headers(config.headers.clone());
391 }
392 #[cfg(target_arch = "wasm32")]
393 {
394 builder = builder.with_http_client(WasmHttpClient);
395 }
396 builder
397 .build()
398 .map_err(|e| OpenInferenceError::ExporterBuild(e.to_string()))?
399 }
400 #[cfg(not(target_arch = "wasm32"))]
401 OtlpTransport::Grpc => {
402 let mut builder = SpanExporter::builder()
403 .with_tonic()
404 .with_protocol(Protocol::Grpc)
405 .with_timeout(config.timeout);
406 if let Some(endpoint) = &config.endpoint {
407 builder = builder.with_endpoint(endpoint.clone());
408 }
409 if !config.headers.is_empty() {
410 builder = builder.with_metadata(build_grpc_metadata(&config.headers)?);
411 }
412 builder
413 .build()
414 .map_err(|e| OpenInferenceError::ExporterBuild(e.to_string()))?
415 }
416 #[cfg(target_arch = "wasm32")]
417 OtlpTransport::Grpc => {
418 return Err(OpenInferenceError::UnsupportedTransport { transport: "gRPC" });
419 }
420 };
421
422 let mut resource_attributes = vec![KeyValue::new("service.name", config.service_name.clone())];
423 if let Some(service_namespace) = &config.service_namespace {
424 resource_attributes.push(KeyValue::new(
425 "service.namespace",
426 service_namespace.clone(),
427 ));
428 }
429 if let Some(service_version) = &config.service_version {
430 resource_attributes.push(KeyValue::new("service.version", service_version.clone()));
431 }
432 for (key, value) in &config.resource_attributes {
433 resource_attributes.push(KeyValue::new(key.clone(), value.clone()));
434 }
435
436 let builder = SdkTracerProvider::builder()
441 .with_resource(
442 Resource::builder_empty()
443 .with_attributes(resource_attributes)
444 .build(),
445 )
446 .with_max_attributes_per_span(u32::MAX)
447 .with_max_attributes_per_event(u32::MAX);
448
449 #[cfg(not(target_arch = "wasm32"))]
450 {
451 if Handle::try_current().is_ok() {
452 Ok(builder.with_batch_exporter(exporter).build())
453 } else {
454 Ok(builder.with_simple_exporter(exporter).build())
455 }
456 }
457 #[cfg(target_arch = "wasm32")]
458 {
459 Ok(builder.with_simple_exporter(exporter).build())
460 }
461}
462
463#[cfg(not(target_arch = "wasm32"))]
464fn install_rustls_crypto_provider() {
465 let _ = rustls::crypto::ring::default_provider().install_default();
466}
467
468#[cfg(not(target_arch = "wasm32"))]
469fn build_grpc_metadata(headers: &HashMap<String, String>) -> Result<MetadataMap> {
470 let mut metadata = MetadataMap::new();
471 for (key, value) in headers {
472 let metadata_key = MetadataKey::from_bytes(key.as_bytes()).map_err(|e| {
473 OpenInferenceError::InvalidGrpcHeader {
474 key: key.clone(),
475 message: e.to_string(),
476 }
477 })?;
478 let metadata_value = MetadataValue::try_from(value.as_str()).map_err(|e| {
479 OpenInferenceError::InvalidGrpcHeader {
480 key: key.clone(),
481 message: e.to_string(),
482 }
483 })?;
484 metadata.insert(metadata_key, metadata_value);
485 }
486 Ok(metadata)
487}
488
489struct ActiveSpan {
490 span: Span,
491 span_context: SpanContext,
492}
493
494struct OpenInferenceEventProcessor {
495 active_spans: HashMap<Uuid, ActiveSpan>,
496 provider: SdkTracerProvider,
497 tracer: SdkTracer,
498}
499
500impl OpenInferenceEventProcessor {
501 fn new(provider: SdkTracerProvider, instrumentation_scope: String) -> Self {
502 let tracer = provider.tracer(instrumentation_scope);
503 Self {
504 active_spans: HashMap::new(),
505 provider,
506 tracer,
507 }
508 }
509
510 fn process(&mut self, event: &Event) {
511 match event.scope_category() {
512 Some(ScopeCategory::Start) => self.process_start(event),
513 Some(ScopeCategory::End) => self.process_end(event),
514 None => self.process_mark(event),
515 }
516 }
517
518 fn force_flush(&self) -> Result<()> {
519 self.provider
520 .force_flush()
521 .map_err(|e| OpenInferenceError::Provider(e.to_string()))
522 }
523
524 fn shutdown(&self) -> Result<()> {
525 self.provider
526 .shutdown()
527 .map_err(|e| OpenInferenceError::Provider(e.to_string()))
528 }
529
530 fn process_start(&mut self, event: &Event) {
531 let mut span = self
532 .tracer
533 .span_builder(span_name(event))
534 .with_kind(span_kind(event))
535 .with_start_time(to_system_time(*event.timestamp()))
536 .start_with_context(&self.tracer, &self.parent_context(event));
537 span.set_attributes(start_attributes(event));
538 let span_context = local_parent_span_context(span.span_context());
539 self.active_spans
540 .insert(event.uuid(), ActiveSpan { span, span_context });
541 }
542
543 fn process_end(&mut self, event: &Event) {
544 let Some(mut active_span) = self.active_spans.remove(&event.uuid()) else {
545 return;
546 };
547 active_span.span.set_attributes(end_attributes(event));
548 active_span
549 .span
550 .end_with_timestamp(to_system_time(*event.timestamp()));
551 }
552
553 fn process_mark(&mut self, event: &Event) {
554 let mark_name = event.name().to_string();
555 let timestamp = to_system_time(*event.timestamp());
556 let attributes = mark_attributes(event);
557
558 if let Some(parent_span) = self.find_parent_span_mut(event) {
559 parent_span
560 .span
561 .add_event_with_timestamp(mark_name, timestamp, attributes);
562 return;
563 }
564
565 let mut span = self
566 .tracer
567 .span_builder(format!("mark:{mark_name}"))
568 .with_kind(SpanKind::Internal)
569 .with_start_time(timestamp)
570 .start_with_context(&self.tracer, &self.parent_context(event));
571 let mut span_attributes = attributes;
572 span_attributes.push(KeyValue::new(
573 oi::OPENINFERENCE_SPAN_KIND,
574 OpenInferenceSpanKind::Chain,
575 ));
576 span_attributes.push(KeyValue::new("nemo_flow.mark.orphan", true));
577 span.set_attributes(span_attributes);
578 span.end_with_timestamp(timestamp);
579 }
580
581 fn parent_context(&self, event: &Event) -> Context {
582 self.find_parent_span(event)
583 .map(|active_span| {
584 Context::new().with_remote_span_context(active_span.span_context.clone())
585 })
586 .unwrap_or_default()
587 }
588
589 fn parent_span_uuid(&self, event: &Event) -> Option<Uuid> {
590 event
591 .parent_uuid()
592 .filter(|uuid| self.active_spans.contains_key(uuid))
593 }
594
595 fn find_parent_span(&self, event: &Event) -> Option<&ActiveSpan> {
596 self.parent_span_uuid(event)
597 .and_then(|uuid| self.active_spans.get(&uuid))
598 }
599
600 fn find_parent_span_mut(&mut self, event: &Event) -> Option<&mut ActiveSpan> {
601 self.parent_span_uuid(event)
602 .and_then(|uuid| self.active_spans.get_mut(&uuid))
603 }
604}
605
606fn span_kind(event: &Event) -> SpanKind {
607 match semantic_scope_type(event) {
608 Some(ScopeType::Llm) => SpanKind::Client,
609 Some(
610 ScopeType::Tool | ScopeType::Retriever | ScopeType::Embedder | ScopeType::Reranker,
611 ) => SpanKind::Client,
612 _ => SpanKind::Internal,
613 }
614}
615
616fn span_name(event: &Event) -> String {
617 event.name().to_string()
618}
619
620fn semantic_scope_type(event: &Event) -> Option<ScopeType> {
621 event.scope_type()
622}
623
624fn scope_type_name(scope_type: Option<ScopeType>) -> &'static str {
625 match scope_type {
626 Some(ScopeType::Agent) => "agent",
627 Some(ScopeType::Function) => "function",
628 Some(ScopeType::Tool) => "tool",
629 Some(ScopeType::Llm) => "llm",
630 Some(ScopeType::Retriever) => "retriever",
631 Some(ScopeType::Embedder) => "embedder",
632 Some(ScopeType::Reranker) => "reranker",
633 Some(ScopeType::Guardrail) => "guardrail",
634 Some(ScopeType::Evaluator) => "evaluator",
635 Some(ScopeType::Custom) => "custom",
636 Some(ScopeType::Unknown) | None => "unknown",
637 }
638}
639
640fn start_attributes(event: &Event) -> Vec<KeyValue> {
641 let mut attributes = common_attributes(event);
642 let handle_attributes = event.attributes();
643 if handle_attributes.is_some_and(|attributes| !attributes.is_empty()) {
644 push_serialized(
645 &mut attributes,
646 "nemo_flow.handle_attributes_json",
647 handle_attributes,
648 );
649 }
650 if event
651 .category()
652 .is_none_or(|category| category.as_str() != "llm")
653 {
654 push_serialized(&mut attributes, "nemo_flow.start.input_json", event.input());
655 }
656 if event
657 .category()
658 .is_some_and(|category| category.as_str() == "tool")
659 {
660 attributes.push(KeyValue::new(oi::tool::NAME, event.name().to_string()));
661 attributes.push(KeyValue::new(
662 oi::tool_call::function::NAME,
663 event.name().to_string(),
664 ));
665 }
666
667 if let Some((input, mime_type)) = openinference_input_value(event) {
668 attributes.push(KeyValue::new(oi::input::VALUE, input.clone()));
669 attributes.push(KeyValue::new(oi::input::MIME_TYPE, mime_type));
670
671 if event
672 .category()
673 .is_some_and(|category| category.as_str() == "tool")
674 {
675 attributes.push(KeyValue::new(oi::tool::PARAMETERS, input.clone()));
676 attributes.push(KeyValue::new(oi::tool_call::function::ARGUMENTS, input));
677 }
678 }
679 attributes
680}
681
682fn end_attributes(event: &Event) -> Vec<KeyValue> {
683 let mut attributes = Vec::new();
684 push_serialized(&mut attributes, "nemo_flow.end.output_json", event.output());
685 if let Some((output, mime_type)) = openinference_output_value(event) {
686 attributes.push(KeyValue::new(oi::output::VALUE, output));
687 attributes.push(KeyValue::new(oi::output::MIME_TYPE, mime_type));
688 }
689 let fallback_usage = if event
690 .category()
691 .is_some_and(|category| category.as_str() == "llm")
692 {
693 usage_from_manual_llm_output(event.output())
694 } else {
695 None
696 };
697 let usage = event
698 .annotated_response()
699 .and_then(|response| response.usage.as_ref())
700 .or(fallback_usage.as_ref());
701 if event
702 .category()
703 .is_some_and(|category| category.as_str() == "llm")
704 && let Some(usage) = usage
705 {
706 if let Some(v) = usage.prompt_tokens {
707 attributes.push(KeyValue::new(oi::llm::token_count::PROMPT, v as i64));
708 }
709 if let Some(v) = usage.completion_tokens {
710 attributes.push(KeyValue::new(oi::llm::token_count::COMPLETION, v as i64));
711 }
712 if let Some(v) = usage.total_tokens {
713 attributes.push(KeyValue::new(oi::llm::token_count::TOTAL, v as i64));
714 }
715 if let Some(v) = usage.cache_read_tokens {
716 attributes.push(KeyValue::new(
717 oi::llm::token_count::prompt_details::CACHE_READ,
718 v as i64,
719 ));
720 }
721 if let Some(v) = usage.cache_write_tokens {
722 attributes.push(KeyValue::new(
723 oi::llm::token_count::prompt_details::CACHE_WRITE,
724 v as i64,
725 ));
726 }
727 }
728 attributes
729}
730
731fn usage_from_manual_llm_output(output: Option<&Json>) -> Option<Usage> {
732 let object = output?.as_object()?;
733 let usage = object.get("usage").and_then(Json::as_object);
734 let token_usage = object.get("token_usage").and_then(Json::as_object);
735 if usage.is_none() && token_usage.is_none() {
736 return None;
737 }
738
739 let prompt_tokens = first_u64_from_manual_usage(
740 usage,
741 token_usage,
742 &["prompt_tokens", "input_tokens", "inputTokens", "input"],
743 );
744 let completion_tokens = first_u64_from_manual_usage(
745 usage,
746 token_usage,
747 &[
748 "completion_tokens",
749 "output_tokens",
750 "completionTokens",
751 "outputTokens",
752 "output",
753 ],
754 );
755 let reported_total_tokens = first_u64_from_manual_usage(
756 usage,
757 token_usage,
758 &["total_tokens", "totalTokens", "total"],
759 );
760 let cache_read_tokens = first_u64_from_manual_usage(
761 usage,
762 token_usage,
763 &[
764 "cache_read_tokens",
765 "cached_tokens",
766 "cache_read_input_tokens",
767 "cacheReadTokens",
768 "cachedTokens",
769 "cacheReadInputTokens",
770 "cacheRead",
771 ],
772 );
773 let cache_write_tokens = first_u64_from_manual_usage(
774 usage,
775 token_usage,
776 &[
777 "cache_write_tokens",
778 "cache_creation_input_tokens",
779 "cacheWriteTokens",
780 "cacheCreationInputTokens",
781 "cacheWrite",
782 ],
783 );
784
785 if prompt_tokens.is_none()
786 && completion_tokens.is_none()
787 && reported_total_tokens.is_none()
788 && cache_read_tokens.is_none()
789 && cache_write_tokens.is_none()
790 {
791 return None;
792 }
793 let total_tokens =
794 normalize_total_tokens(reported_total_tokens, prompt_tokens, completion_tokens);
795
796 Some(Usage {
797 prompt_tokens,
798 completion_tokens,
799 total_tokens,
800 cache_read_tokens,
801 cache_write_tokens,
802 })
803}
804
805fn normalize_total_tokens(
806 total_tokens: Option<u64>,
807 prompt_tokens: Option<u64>,
808 completion_tokens: Option<u64>,
809) -> Option<u64> {
810 let total_tokens = total_tokens?;
811 let minimum_total = prompt_tokens
812 .unwrap_or(0)
813 .saturating_add(completion_tokens.unwrap_or(0));
814 if minimum_total == 0 || total_tokens >= minimum_total {
815 Some(total_tokens)
816 } else {
817 None
818 }
819}
820
821fn first_u64_from_manual_usage(
822 usage: Option<&serde_json::Map<String, Json>>,
823 token_usage: Option<&serde_json::Map<String, Json>>,
824 keys: &[&str],
825) -> Option<u64> {
826 usage
827 .and_then(|value| first_u64(value, keys))
828 .or_else(|| token_usage.and_then(|value| first_u64(value, keys)))
829}
830
831fn first_u64(usage: &serde_json::Map<String, Json>, keys: &[&str]) -> Option<u64> {
832 keys.iter()
833 .find_map(|key| usage.get(*key).and_then(Json::as_u64))
834}
835
836fn mark_attributes(event: &Event) -> Vec<KeyValue> {
837 let handle_attributes = event.attributes();
838 let mut attributes = vec![
839 KeyValue::new("nemo_flow.mark.uuid", event.uuid().to_string()),
840 KeyValue::new(
841 "nemo_flow.mark.parent_uuid",
842 event
843 .parent_uuid()
844 .map(|uuid| uuid.to_string())
845 .unwrap_or_default(),
846 ),
847 ];
848 push_serialized(
849 &mut attributes,
850 "nemo_flow.mark.attributes_json",
851 handle_attributes,
852 );
853 push_serialized(&mut attributes, "nemo_flow.mark.data_json", event.data());
854 push_serialized(
855 &mut attributes,
856 "nemo_flow.mark.metadata_json",
857 event.metadata(),
858 );
859 attributes
860}
861
862fn common_attributes(event: &Event) -> Vec<KeyValue> {
863 let mut attributes = vec![
864 KeyValue::new(
865 oi::OPENINFERENCE_SPAN_KIND,
866 openinference_span_kind(semantic_scope_type(event)),
867 ),
868 KeyValue::new("nemo_flow.uuid", event.uuid().to_string()),
869 KeyValue::new(
870 "nemo_flow.parent_uuid",
871 event
872 .parent_uuid()
873 .map(|uuid| uuid.to_string())
874 .unwrap_or_default(),
875 ),
876 KeyValue::new(
877 "nemo_flow.scope_type",
878 scope_type_name(semantic_scope_type(event)),
879 ),
880 ];
881
882 if let Some(model_name) = event.model_name() {
883 attributes.push(KeyValue::new(oi::llm::MODEL_NAME, model_name.to_string()));
884 }
885 if let Some(tool_call_id) = event.tool_call_id() {
886 attributes.push(KeyValue::new(oi::tool_call::ID, tool_call_id.to_string()));
887 }
888 if let Some(metadata) = event.metadata().and_then(to_json_string) {
889 attributes.push(KeyValue::new(oi::METADATA, metadata));
890 }
891
892 attributes
893}
894
895fn openinference_span_kind(scope_type: Option<ScopeType>) -> OpenInferenceSpanKind {
896 match scope_type {
897 Some(ScopeType::Agent) => OpenInferenceSpanKind::Agent,
898 Some(ScopeType::Tool) => OpenInferenceSpanKind::Tool,
899 Some(ScopeType::Llm) => OpenInferenceSpanKind::Llm,
900 Some(ScopeType::Retriever) => OpenInferenceSpanKind::Retriever,
901 Some(ScopeType::Embedder) => OpenInferenceSpanKind::Embedding,
902 Some(ScopeType::Reranker) => OpenInferenceSpanKind::Reranker,
903 Some(ScopeType::Guardrail) => OpenInferenceSpanKind::Guardrail,
904 Some(ScopeType::Evaluator) => OpenInferenceSpanKind::Evaluator,
905 Some(ScopeType::Function | ScopeType::Custom | ScopeType::Unknown) | None => {
906 OpenInferenceSpanKind::Chain
907 }
908 }
909}
910
911fn push_serialized<T: Serialize + ?Sized>(
912 attributes: &mut Vec<KeyValue>,
913 key: &'static str,
914 value: Option<&T>,
915) {
916 if let Some(value) = value
917 && let Ok(json) = serde_json::to_string(value)
918 {
919 attributes.push(KeyValue::new(key, json));
920 }
921}
922
923fn openinference_input_value(event: &Event) -> Option<(String, &'static str)> {
924 let input = event.input()?;
925
926 if event
927 .category()
928 .is_some_and(|category| category.as_str() == "llm")
929 {
930 return llm_input_display_value(input)
931 .map(|display| (display, "text/plain"))
932 .or_else(|| sanitized_llm_input_json(input).map(|json| (json, "application/json")));
933 }
934
935 to_json_string(input).map(|json| (json, "application/json"))
936}
937
938fn openinference_output_value(event: &Event) -> Option<(String, &'static str)> {
939 let output = event.output()?;
940 display_text_from_json(output)
941 .map(|display| (display, "text/plain"))
942 .or_else(|| to_json_string(output).map(|json| (json, "application/json")))
943}
944
945fn llm_input_display_value(input: &Json) -> Option<String> {
946 let content = match input {
947 Json::Object(object) => object.get("content").unwrap_or(input),
948 _ => input,
949 };
950
951 content
952 .get("messages")
953 .and_then(display_text_from_messages)
954 .or_else(|| display_text_from_json(content))
955}
956
957fn sanitized_llm_input_json(input: &Json) -> Option<String> {
958 match input {
959 Json::Object(object) => {
960 let mut sanitized = object.clone();
961 sanitized.remove("headers");
962 to_json_string(&Json::Object(sanitized))
963 }
964 _ => to_json_string(input),
965 }
966}
967
968fn display_text_from_json(value: &Json) -> Option<String> {
969 match value {
970 Json::String(text) => display_text_from_string(text),
971 Json::Object(object) => {
972 for key in ["content", "summary", "message", "text", "prompt"] {
973 if let Some(display) = object.get(key).and_then(display_text_from_json) {
974 return Some(display);
975 }
976 }
977 object
978 .get("choices")
979 .and_then(display_text_from_chat_choices)
980 .or_else(|| {
981 object
982 .get("tool_calls")
983 .and_then(display_text_from_tool_calls)
984 })
985 }
986 Json::Array(items) => display_text_from_content_blocks(items),
987 _ => None,
988 }
989}
990
991fn display_text_from_messages(value: &Json) -> Option<String> {
992 let messages = value.as_array()?;
993 let text = messages
994 .iter()
995 .filter_map(display_text_from_message)
996 .collect::<Vec<_>>()
997 .join("\n\n")
998 .trim()
999 .to_string();
1000 if text.is_empty() { None } else { Some(text) }
1001}
1002
1003fn display_text_from_message(value: &Json) -> Option<String> {
1004 let role = value
1005 .get("role")
1006 .and_then(Json::as_str)
1007 .unwrap_or("message");
1008 if role == "tool" {
1009 return Some("tool: Tool result omitted".to_string());
1010 }
1011 let display = value
1012 .get("content")
1013 .and_then(display_text_from_json)
1014 .or_else(|| {
1015 value
1016 .get("tool_calls")
1017 .and_then(display_text_from_tool_calls)
1018 })?;
1019 Some(format!("{role}: {display}"))
1020}
1021
1022fn display_text_from_string(text: &str) -> Option<String> {
1023 let trimmed = text.trim();
1024 if trimmed.is_empty() {
1025 return None;
1026 }
1027 if let Ok(parsed) = serde_json::from_str::<Json>(trimmed)
1028 && let Some(display) = display_text_from_json(&parsed)
1029 {
1030 return Some(display);
1031 }
1032 Some(trimmed.to_string())
1033}
1034
1035fn display_text_from_chat_choices(value: &Json) -> Option<String> {
1036 let choices = value.as_array()?;
1037 for choice in choices {
1038 let Some(message) = choice.get("message") else {
1039 continue;
1040 };
1041 let content = message.get("content").and_then(display_text_from_json);
1042 let tool_calls = message
1043 .get("tool_calls")
1044 .and_then(display_text_from_tool_calls);
1045 match (content, tool_calls) {
1046 (Some(content), Some(tool_calls)) => return Some(format!("{content}\n{tool_calls}")),
1047 (Some(content), None) => return Some(content),
1048 (None, Some(tool_calls)) => return Some(tool_calls),
1049 (None, None) => {}
1050 }
1051 }
1052 None
1053}
1054
1055fn display_text_from_content_blocks(items: &[Json]) -> Option<String> {
1056 let mut entries = items
1057 .iter()
1058 .filter_map(content_block_display_text)
1059 .collect::<Vec<_>>();
1060 let tool_calls = items.iter().filter_map(tool_call_name).collect::<Vec<_>>();
1061 if !tool_calls.is_empty() {
1062 entries.push(format!("Requested tools: {}", tool_calls.join(", ")));
1063 }
1064 let text = entries
1065 .into_iter()
1066 .filter(|item| !item.trim().is_empty())
1067 .collect::<Vec<_>>()
1068 .join("\n")
1069 .trim()
1070 .to_string();
1071 if text.is_empty() { None } else { Some(text) }
1072}
1073
1074fn content_block_display_text(item: &Json) -> Option<String> {
1075 if let Some(text) = item.as_str() {
1076 return Some(text.to_string());
1077 }
1078 if item.get("stripped").and_then(Json::as_bool) == Some(true) {
1079 return None;
1080 }
1081 if let Some("thinking" | "reasoning" | "toolResult" | "tool_result") =
1082 item.get("type").and_then(Json::as_str)
1083 {
1084 return None;
1085 }
1086 item.get("text").and_then(Json::as_str).map(str::to_string)
1087}
1088
1089fn display_text_from_tool_calls(value: &Json) -> Option<String> {
1090 let calls = value.as_array()?;
1091 let names = calls.iter().filter_map(tool_call_name).collect::<Vec<_>>();
1092 if names.is_empty() {
1093 None
1094 } else {
1095 Some(format!("Requested tools: {}", names.join(", ")))
1096 }
1097}
1098
1099fn tool_call_name(value: &Json) -> Option<String> {
1100 value
1101 .get("name")
1102 .and_then(Json::as_str)
1103 .or_else(|| value.get("toolName").and_then(Json::as_str))
1104 .or_else(|| {
1105 value
1106 .get("function")
1107 .and_then(|function| function.get("name"))
1108 .and_then(Json::as_str)
1109 })
1110 .map(str::to_string)
1111}
1112
1113fn to_json_string<T: Serialize>(value: &T) -> Option<String> {
1114 serde_json::to_string(value).ok()
1115}
1116
1117fn local_parent_span_context(span_context: &SpanContext) -> SpanContext {
1118 SpanContext::new(
1119 span_context.trace_id(),
1120 span_context.span_id(),
1121 span_context.trace_flags(),
1122 false,
1123 span_context.trace_state().clone(),
1124 )
1125}
1126
1127fn to_system_time(timestamp: DateTime<Utc>) -> SystemTime {
1128 let seconds = timestamp.timestamp();
1129 let nanos = timestamp.timestamp_subsec_nanos();
1130 if seconds >= 0 {
1131 UNIX_EPOCH + Duration::new(seconds as u64, nanos)
1132 } else if nanos == 0 {
1133 UNIX_EPOCH - Duration::new(seconds.unsigned_abs(), 0)
1134 } else {
1135 UNIX_EPOCH - Duration::new(seconds.unsigned_abs() - 1, 1_000_000_000 - nanos)
1136 }
1137}
1138
1139#[cfg(test)]
1140#[path = "../../tests/unit/observability/openinference_tests.rs"]
1141mod tests;