1use std::collections::HashMap;
20use std::sync::{Arc, Mutex};
21use std::time::{Duration, SystemTime, UNIX_EPOCH};
22
23use crate::api::event::{Event, ScopeCategory};
24use crate::api::runtime::EventSubscriberFn;
25use crate::api::scope::ScopeType;
26use crate::api::subscriber::{deregister_subscriber, register_subscriber};
27use crate::error::FlowError;
28use chrono::{DateTime, Utc};
29use openinference_semantic_conventions::SpanKind as OpenInferenceSpanKind;
30use openinference_semantic_conventions::attributes as oi;
31use opentelemetry::trace::{
32 Span as _, SpanContext, SpanKind, TraceContextExt, Tracer, TracerProvider as _,
33};
34use opentelemetry::{Context, KeyValue};
35use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig, WithHttpConfig};
36use opentelemetry_sdk::Resource;
37use opentelemetry_sdk::trace::{SdkTracer, SdkTracerProvider, Span};
38use serde::Serialize;
39use uuid::Uuid;
40
41#[cfg(target_arch = "wasm32")]
42use async_trait::async_trait;
43#[cfg(target_arch = "wasm32")]
44use opentelemetry_http::{
45 Bytes, HttpClient, HttpError, Request as HttpRequest, Response as HttpResponse,
46};
47#[cfg(not(target_arch = "wasm32"))]
48use opentelemetry_otlp::WithTonicConfig;
49#[cfg(not(target_arch = "wasm32"))]
50use tokio::runtime::Handle;
51#[cfg(not(target_arch = "wasm32"))]
52use tonic::metadata::{MetadataKey, MetadataMap, MetadataValue};
53#[cfg(target_arch = "wasm32")]
54use wasm_bindgen::{JsCast, JsValue};
55#[cfg(target_arch = "wasm32")]
56use wasm_bindgen_futures::{JsFuture, spawn_local};
57#[cfg(target_arch = "wasm32")]
58use web_sys::{Request as WebRequest, RequestInit};
59
60pub type Result<T> = std::result::Result<T, OpenInferenceError>;
62
63#[derive(Debug, thiserror::Error)]
65pub enum OpenInferenceError {
66 #[error("the OTLP gRPC exporter requires an active Tokio runtime")]
68 MissingTokioRuntime,
69 #[error("the OTLP {transport} transport is not supported on this target")]
71 UnsupportedTransport {
72 transport: &'static str,
74 },
75 #[error("invalid OTLP gRPC header {key:?}: {message}")]
77 InvalidGrpcHeader {
78 key: String,
80 message: String,
82 },
83 #[error("failed to build the OTLP exporter: {0}")]
85 ExporterBuild(String),
86 #[error("OpenInference tracer provider error: {0}")]
88 Provider(String),
89 #[error(transparent)]
91 Core(#[from] FlowError),
92}
93
94#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
96pub enum OtlpTransport {
97 #[default]
99 HttpBinary,
100 Grpc,
102}
103
104#[derive(Debug, Clone)]
106pub struct OpenInferenceConfig {
107 endpoint: Option<String>,
108 headers: HashMap<String, String>,
109 resource_attributes: HashMap<String, String>,
110 service_name: String,
111 service_namespace: Option<String>,
112 service_version: Option<String>,
113 instrumentation_scope: String,
114 timeout: Duration,
115 transport: OtlpTransport,
116}
117
118impl Default for OpenInferenceConfig {
119 fn default() -> Self {
120 Self {
121 endpoint: None,
122 headers: HashMap::new(),
123 resource_attributes: HashMap::new(),
124 service_name: "nemo-flow".to_string(),
125 service_namespace: None,
126 service_version: None,
127 instrumentation_scope: "nemo-flow-openinference".to_string(),
128 timeout: Duration::from_secs(3),
129 transport: OtlpTransport::HttpBinary,
130 }
131 }
132}
133
134impl OpenInferenceConfig {
135 pub fn new() -> Self {
137 Self::default()
138 }
139
140 pub fn with_transport(mut self, transport: OtlpTransport) -> Self {
142 self.transport = transport;
143 self
144 }
145
146 pub fn with_service_name(mut self, service_name: impl Into<String>) -> Self {
148 self.service_name = service_name.into();
149 self
150 }
151
152 pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
154 self.endpoint = Some(endpoint.into());
155 self
156 }
157
158 pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
160 self.headers.insert(key.into(), value.into());
161 self
162 }
163
164 pub fn with_resource_attribute(
166 mut self,
167 key: impl Into<String>,
168 value: impl Into<String>,
169 ) -> Self {
170 self.resource_attributes.insert(key.into(), value.into());
171 self
172 }
173
174 pub fn with_timeout(mut self, timeout: Duration) -> Self {
176 self.timeout = timeout;
177 self
178 }
179
180 pub fn with_service_namespace(mut self, namespace: impl Into<String>) -> Self {
182 self.service_namespace = Some(namespace.into());
183 self
184 }
185
186 pub fn with_service_version(mut self, version: impl Into<String>) -> Self {
188 self.service_version = Some(version.into());
189 self
190 }
191
192 pub fn with_instrumentation_scope(mut self, scope: impl Into<String>) -> Self {
194 self.instrumentation_scope = scope.into();
195 self
196 }
197}
198
199#[derive(Clone)]
201pub struct OpenInferenceSubscriber {
202 inner: Arc<Inner>,
203}
204
205struct Inner {
206 processor: Arc<Mutex<OpenInferenceEventProcessor>>,
207 subscriber: EventSubscriberFn,
208}
209
210impl OpenInferenceSubscriber {
211 pub fn new(config: OpenInferenceConfig) -> Result<Self> {
213 #[cfg(not(target_arch = "wasm32"))]
214 if config.transport == OtlpTransport::Grpc && tokio::runtime::Handle::try_current().is_err()
215 {
216 return Err(OpenInferenceError::MissingTokioRuntime);
217 }
218 #[cfg(target_arch = "wasm32")]
219 if config.transport == OtlpTransport::Grpc {
220 return Err(OpenInferenceError::UnsupportedTransport { transport: "gRPC" });
221 }
222
223 let provider = build_tracer_provider(&config)?;
224 Ok(Self::from_tracer_provider_with_scope(
225 provider,
226 config.instrumentation_scope,
227 ))
228 }
229
230 pub fn from_tracer_provider(
232 provider: SdkTracerProvider,
233 instrumentation_scope: impl Into<String>,
234 ) -> Self {
235 Self::from_tracer_provider_with_scope(provider, instrumentation_scope.into())
236 }
237
238 fn from_tracer_provider_with_scope(
239 provider: SdkTracerProvider,
240 instrumentation_scope: String,
241 ) -> Self {
242 let processor = Arc::new(Mutex::new(OpenInferenceEventProcessor::new(
243 provider,
244 instrumentation_scope,
245 )));
246 let processor_for_callback = Arc::clone(&processor);
247 let subscriber: EventSubscriberFn = Arc::new(move |event: &Event| {
248 let Ok(mut guard) = processor_for_callback.lock() else {
249 return;
252 };
253 guard.process(event);
254 });
255
256 Self {
257 inner: Arc::new(Inner {
258 processor,
259 subscriber,
260 }),
261 }
262 }
263
264 pub fn subscriber(&self) -> EventSubscriberFn {
266 Arc::clone(&self.inner.subscriber)
267 }
268
269 pub fn register(&self, name: &str) -> Result<()> {
271 register_subscriber(name, self.subscriber()).map_err(Into::into)
272 }
273
274 pub fn deregister(&self, name: &str) -> Result<bool> {
276 deregister_subscriber(name).map_err(Into::into)
277 }
278
279 pub fn force_flush(&self) -> Result<()> {
281 let guard = self.inner.processor.lock().map_err(|_| {
282 OpenInferenceError::Provider("the subscriber state lock was poisoned".to_string())
283 })?;
284 guard.force_flush()
285 }
286
287 pub fn shutdown(&self) -> Result<()> {
291 let guard = self.inner.processor.lock().map_err(|_| {
292 OpenInferenceError::Provider("the subscriber state lock was poisoned".to_string())
293 })?;
294 guard.shutdown()
295 }
296}
297
298#[cfg(target_arch = "wasm32")]
299#[derive(Debug, Clone, Copy, Default)]
300struct WasmHttpClient;
301
302#[cfg(target_arch = "wasm32")]
303#[async_trait]
304impl HttpClient for WasmHttpClient {
305 async fn send_bytes(
306 &self,
307 request: HttpRequest<Bytes>,
308 ) -> std::result::Result<HttpResponse<Bytes>, HttpError> {
309 let (parts, body) = request.into_parts();
310
311 let request = {
312 let request_url = parts.uri.to_string();
313 let init = RequestInit::new();
314 init.set_method(parts.method.as_str());
315 if !body.is_empty() {
316 let body_bytes = js_sys::Uint8Array::from(body.as_ref());
317 init.set_body_opt_u8_array(Some(&body_bytes));
318 }
319
320 let request =
321 WebRequest::new_with_str_and_init(&request_url, &init).map_err(js_error)?;
322 let request_headers = request.headers();
323 for (name, value) in &parts.headers {
324 let value = value
325 .to_str()
326 .map_err(|e| http_error(format!("invalid OTLP HTTP header {name}: {e}")))?;
327 request_headers
328 .set(name.as_str(), value)
329 .map_err(js_error)?;
330 }
331 request
332 };
333
334 let fetch_promise = if let Some(window) = web_sys::window() {
335 window.fetch_with_request(&request)
336 } else {
337 let global = js_sys::global();
338 let fetch = js_sys::Reflect::get(&global, &JsValue::from_str("fetch"))
339 .map_err(js_error)?
340 .dyn_into::<js_sys::Function>()
341 .map_err(js_error)?;
342 fetch.call1(&global, &request).map_err(js_error)?.into()
343 };
344 spawn_local(async move {
347 if let Err(error) = JsFuture::from(fetch_promise).await {
348 web_sys::console::warn_1(&JsValue::from_str(&format!(
349 "OpenInference OTLP/HTTP export failed: {error:?}"
350 )));
351 }
352 });
353
354 HttpResponse::builder()
355 .status(202)
356 .body(Bytes::new())
357 .map_err(|e| http_error(e.to_string()))
358 }
359}
360
361#[cfg(target_arch = "wasm32")]
362fn js_error(value: JsValue) -> HttpError {
363 http_error(
364 value
365 .as_string()
366 .unwrap_or_else(|| format!("JavaScript error: {value:?}")),
367 )
368}
369
370#[cfg(target_arch = "wasm32")]
371fn http_error(message: impl Into<String>) -> HttpError {
372 Box::new(std::io::Error::other(message.into()))
373}
374
375fn build_tracer_provider(config: &OpenInferenceConfig) -> Result<SdkTracerProvider> {
376 let exporter = match config.transport {
377 OtlpTransport::HttpBinary => {
378 let mut builder = SpanExporter::builder()
379 .with_http()
380 .with_protocol(Protocol::HttpBinary)
381 .with_timeout(config.timeout);
382 if let Some(endpoint) = &config.endpoint {
383 builder = builder.with_endpoint(endpoint.clone());
384 }
385 if !config.headers.is_empty() {
386 builder = builder.with_headers(config.headers.clone());
387 }
388 #[cfg(target_arch = "wasm32")]
389 {
390 builder = builder.with_http_client(WasmHttpClient);
391 }
392 builder
393 .build()
394 .map_err(|e| OpenInferenceError::ExporterBuild(e.to_string()))?
395 }
396 #[cfg(not(target_arch = "wasm32"))]
397 OtlpTransport::Grpc => {
398 let mut builder = SpanExporter::builder()
399 .with_tonic()
400 .with_protocol(Protocol::Grpc)
401 .with_timeout(config.timeout);
402 if let Some(endpoint) = &config.endpoint {
403 builder = builder.with_endpoint(endpoint.clone());
404 }
405 if !config.headers.is_empty() {
406 builder = builder.with_metadata(build_grpc_metadata(&config.headers)?);
407 }
408 builder
409 .build()
410 .map_err(|e| OpenInferenceError::ExporterBuild(e.to_string()))?
411 }
412 #[cfg(target_arch = "wasm32")]
413 OtlpTransport::Grpc => {
414 return Err(OpenInferenceError::UnsupportedTransport { transport: "gRPC" });
415 }
416 };
417
418 let mut resource_attributes = vec![KeyValue::new("service.name", config.service_name.clone())];
419 if let Some(service_namespace) = &config.service_namespace {
420 resource_attributes.push(KeyValue::new(
421 "service.namespace",
422 service_namespace.clone(),
423 ));
424 }
425 if let Some(service_version) = &config.service_version {
426 resource_attributes.push(KeyValue::new("service.version", service_version.clone()));
427 }
428 for (key, value) in &config.resource_attributes {
429 resource_attributes.push(KeyValue::new(key.clone(), value.clone()));
430 }
431
432 let builder = SdkTracerProvider::builder()
437 .with_resource(
438 Resource::builder_empty()
439 .with_attributes(resource_attributes)
440 .build(),
441 )
442 .with_max_attributes_per_span(u32::MAX)
443 .with_max_attributes_per_event(u32::MAX);
444
445 #[cfg(not(target_arch = "wasm32"))]
446 {
447 if Handle::try_current().is_ok() {
448 Ok(builder.with_batch_exporter(exporter).build())
449 } else {
450 Ok(builder.with_simple_exporter(exporter).build())
451 }
452 }
453 #[cfg(target_arch = "wasm32")]
454 {
455 Ok(builder.with_simple_exporter(exporter).build())
456 }
457}
458
459#[cfg(not(target_arch = "wasm32"))]
460fn build_grpc_metadata(headers: &HashMap<String, String>) -> Result<MetadataMap> {
461 let mut metadata = MetadataMap::new();
462 for (key, value) in headers {
463 let metadata_key = MetadataKey::from_bytes(key.as_bytes()).map_err(|e| {
464 OpenInferenceError::InvalidGrpcHeader {
465 key: key.clone(),
466 message: e.to_string(),
467 }
468 })?;
469 let metadata_value = MetadataValue::try_from(value.as_str()).map_err(|e| {
470 OpenInferenceError::InvalidGrpcHeader {
471 key: key.clone(),
472 message: e.to_string(),
473 }
474 })?;
475 metadata.insert(metadata_key, metadata_value);
476 }
477 Ok(metadata)
478}
479
480struct ActiveSpan {
481 span: Span,
482 span_context: SpanContext,
483}
484
485struct OpenInferenceEventProcessor {
486 active_spans: HashMap<Uuid, ActiveSpan>,
487 provider: SdkTracerProvider,
488 tracer: SdkTracer,
489}
490
491impl OpenInferenceEventProcessor {
492 fn new(provider: SdkTracerProvider, instrumentation_scope: String) -> Self {
493 let tracer = provider.tracer(instrumentation_scope);
494 Self {
495 active_spans: HashMap::new(),
496 provider,
497 tracer,
498 }
499 }
500
501 fn process(&mut self, event: &Event) {
502 match event.scope_category() {
503 Some(ScopeCategory::Start) => self.process_start(event),
504 Some(ScopeCategory::End) => self.process_end(event),
505 None => self.process_mark(event),
506 }
507 }
508
509 fn force_flush(&self) -> Result<()> {
510 self.provider
511 .force_flush()
512 .map_err(|e| OpenInferenceError::Provider(e.to_string()))
513 }
514
515 fn shutdown(&self) -> Result<()> {
516 self.provider
517 .shutdown()
518 .map_err(|e| OpenInferenceError::Provider(e.to_string()))
519 }
520
521 fn process_start(&mut self, event: &Event) {
522 let mut span = self
523 .tracer
524 .span_builder(span_name(event))
525 .with_kind(span_kind(event))
526 .with_start_time(to_system_time(*event.timestamp()))
527 .start_with_context(&self.tracer, &self.parent_context(event));
528 span.set_attributes(start_attributes(event));
529 let span_context = local_parent_span_context(span.span_context());
530 self.active_spans
531 .insert(event.uuid(), ActiveSpan { span, span_context });
532 }
533
534 fn process_end(&mut self, event: &Event) {
535 let Some(mut active_span) = self.active_spans.remove(&event.uuid()) else {
536 return;
537 };
538 active_span.span.set_attributes(end_attributes(event));
539 active_span
540 .span
541 .end_with_timestamp(to_system_time(*event.timestamp()));
542 }
543
544 fn process_mark(&mut self, event: &Event) {
545 let mark_name = event.name().to_string();
546 let timestamp = to_system_time(*event.timestamp());
547 let attributes = mark_attributes(event);
548
549 if let Some(parent_span) = self.find_parent_span_mut(event) {
550 parent_span
551 .span
552 .add_event_with_timestamp(mark_name, timestamp, attributes);
553 return;
554 }
555
556 let mut span = self
557 .tracer
558 .span_builder(format!("mark:{mark_name}"))
559 .with_kind(SpanKind::Internal)
560 .with_start_time(timestamp)
561 .start_with_context(&self.tracer, &self.parent_context(event));
562 let mut span_attributes = attributes;
563 span_attributes.push(KeyValue::new(
564 oi::OPENINFERENCE_SPAN_KIND,
565 OpenInferenceSpanKind::Chain,
566 ));
567 span_attributes.push(KeyValue::new("nemo_flow.mark.orphan", true));
568 span.set_attributes(span_attributes);
569 span.end_with_timestamp(timestamp);
570 }
571
572 fn parent_context(&self, event: &Event) -> Context {
573 self.find_parent_span(event)
574 .map(|active_span| {
575 Context::new().with_remote_span_context(active_span.span_context.clone())
576 })
577 .unwrap_or_default()
578 }
579
580 fn parent_span_uuid(&self, event: &Event) -> Option<Uuid> {
581 event
582 .parent_uuid()
583 .filter(|uuid| self.active_spans.contains_key(uuid))
584 }
585
586 fn find_parent_span(&self, event: &Event) -> Option<&ActiveSpan> {
587 self.parent_span_uuid(event)
588 .and_then(|uuid| self.active_spans.get(&uuid))
589 }
590
591 fn find_parent_span_mut(&mut self, event: &Event) -> Option<&mut ActiveSpan> {
592 self.parent_span_uuid(event)
593 .and_then(|uuid| self.active_spans.get_mut(&uuid))
594 }
595}
596
597fn span_kind(event: &Event) -> SpanKind {
598 match semantic_scope_type(event) {
599 Some(ScopeType::Llm) => SpanKind::Client,
600 Some(
601 ScopeType::Tool | ScopeType::Retriever | ScopeType::Embedder | ScopeType::Reranker,
602 ) => SpanKind::Client,
603 _ => SpanKind::Internal,
604 }
605}
606
607fn span_name(event: &Event) -> String {
608 event.name().to_string()
609}
610
611fn semantic_scope_type(event: &Event) -> Option<ScopeType> {
612 event.scope_type()
613}
614
615fn scope_type_name(scope_type: Option<ScopeType>) -> &'static str {
616 match scope_type {
617 Some(ScopeType::Agent) => "agent",
618 Some(ScopeType::Function) => "function",
619 Some(ScopeType::Tool) => "tool",
620 Some(ScopeType::Llm) => "llm",
621 Some(ScopeType::Retriever) => "retriever",
622 Some(ScopeType::Embedder) => "embedder",
623 Some(ScopeType::Reranker) => "reranker",
624 Some(ScopeType::Guardrail) => "guardrail",
625 Some(ScopeType::Evaluator) => "evaluator",
626 Some(ScopeType::Custom) => "custom",
627 Some(ScopeType::Unknown) | None => "unknown",
628 }
629}
630
631fn start_attributes(event: &Event) -> Vec<KeyValue> {
632 let mut attributes = common_attributes(event);
633 let handle_attributes = event.attributes();
634 push_serialized(
635 &mut attributes,
636 "nemo_flow.handle_attributes_json",
637 handle_attributes,
638 );
639 push_serialized(&mut attributes, "nemo_flow.start.data_json", event.data());
640 push_serialized(
641 &mut attributes,
642 "nemo_flow.start.metadata_json",
643 event.metadata(),
644 );
645 push_serialized(&mut attributes, "nemo_flow.start.input_json", event.input());
646 if event
647 .category()
648 .is_some_and(|category| category.as_str() == "tool")
649 {
650 attributes.push(KeyValue::new(oi::tool::NAME, event.name().to_string()));
651 attributes.push(KeyValue::new(
652 oi::tool_call::function::NAME,
653 event.name().to_string(),
654 ));
655 }
656
657 if let Some(input) = openinference_input_value(event) {
658 attributes.push(KeyValue::new(oi::input::VALUE, input.clone()));
659 attributes.push(KeyValue::new(oi::input::MIME_TYPE, "application/json"));
660
661 if event
662 .category()
663 .is_some_and(|category| category.as_str() == "tool")
664 {
665 attributes.push(KeyValue::new(oi::tool::PARAMETERS, input.clone()));
666 attributes.push(KeyValue::new(oi::tool_call::function::ARGUMENTS, input));
667 }
668 }
669 attributes
670}
671
672fn end_attributes(event: &Event) -> Vec<KeyValue> {
673 let mut attributes = Vec::new();
674 push_serialized(&mut attributes, "nemo_flow.end.data_json", event.data());
675 push_serialized(
676 &mut attributes,
677 "nemo_flow.end.metadata_json",
678 event.metadata(),
679 );
680 push_serialized(&mut attributes, "nemo_flow.end.output_json", event.output());
681 if let Some(output) = event.output().and_then(to_json_string) {
682 attributes.push(KeyValue::new(oi::output::VALUE, output));
683 attributes.push(KeyValue::new(oi::output::MIME_TYPE, "application/json"));
684 }
685 if event
686 .category()
687 .is_some_and(|category| category.as_str() == "llm")
688 && let Some(usage) = event.annotated_response().and_then(|r| r.usage.as_ref())
689 {
690 if let Some(v) = usage.prompt_tokens {
691 attributes.push(KeyValue::new(oi::llm::token_count::PROMPT, v as i64));
692 }
693 if let Some(v) = usage.completion_tokens {
694 attributes.push(KeyValue::new(oi::llm::token_count::COMPLETION, v as i64));
695 }
696 if let Some(v) = usage.total_tokens {
697 attributes.push(KeyValue::new(oi::llm::token_count::TOTAL, v as i64));
698 }
699 if let Some(v) = usage.cache_read_tokens {
700 attributes.push(KeyValue::new(
701 oi::llm::token_count::prompt_details::CACHE_READ,
702 v as i64,
703 ));
704 }
705 if let Some(v) = usage.cache_write_tokens {
706 attributes.push(KeyValue::new(
707 oi::llm::token_count::prompt_details::CACHE_WRITE,
708 v as i64,
709 ));
710 }
711 }
712 attributes
713}
714
715fn mark_attributes(event: &Event) -> Vec<KeyValue> {
716 let handle_attributes = event.attributes();
717 let mut attributes = vec![
718 KeyValue::new("nemo_flow.mark.uuid", event.uuid().to_string()),
719 KeyValue::new(
720 "nemo_flow.mark.parent_uuid",
721 event
722 .parent_uuid()
723 .map(|uuid| uuid.to_string())
724 .unwrap_or_default(),
725 ),
726 ];
727 push_serialized(
728 &mut attributes,
729 "nemo_flow.mark.attributes_json",
730 handle_attributes,
731 );
732 push_serialized(&mut attributes, "nemo_flow.mark.data_json", event.data());
733 push_serialized(
734 &mut attributes,
735 "nemo_flow.mark.metadata_json",
736 event.metadata(),
737 );
738 attributes
739}
740
741fn common_attributes(event: &Event) -> Vec<KeyValue> {
742 let mut attributes = vec![
743 KeyValue::new(
744 oi::OPENINFERENCE_SPAN_KIND,
745 openinference_span_kind(semantic_scope_type(event)),
746 ),
747 KeyValue::new("nemo_flow.uuid", event.uuid().to_string()),
748 KeyValue::new(
749 "nemo_flow.parent_uuid",
750 event
751 .parent_uuid()
752 .map(|uuid| uuid.to_string())
753 .unwrap_or_default(),
754 ),
755 KeyValue::new(
756 "nemo_flow.scope_type",
757 scope_type_name(semantic_scope_type(event)),
758 ),
759 ];
760
761 if let Some(model_name) = event.model_name() {
762 attributes.push(KeyValue::new(
763 "nemo_flow.model_name",
764 model_name.to_string(),
765 ));
766 attributes.push(KeyValue::new(oi::llm::MODEL_NAME, model_name.to_string()));
767 }
768 if let Some(tool_call_id) = event.tool_call_id() {
769 attributes.push(KeyValue::new(
770 "nemo_flow.tool_call_id",
771 tool_call_id.to_string(),
772 ));
773 attributes.push(KeyValue::new(oi::tool_call::ID, tool_call_id.to_string()));
774 }
775 if let Some(metadata) = event.metadata().and_then(to_json_string) {
776 attributes.push(KeyValue::new(oi::METADATA, metadata));
777 }
778
779 attributes
780}
781
782fn openinference_span_kind(scope_type: Option<ScopeType>) -> OpenInferenceSpanKind {
783 match scope_type {
784 Some(ScopeType::Agent) => OpenInferenceSpanKind::Agent,
785 Some(ScopeType::Tool) => OpenInferenceSpanKind::Tool,
786 Some(ScopeType::Llm) => OpenInferenceSpanKind::Llm,
787 Some(ScopeType::Retriever) => OpenInferenceSpanKind::Retriever,
788 Some(ScopeType::Embedder) => OpenInferenceSpanKind::Embedding,
789 Some(ScopeType::Reranker) => OpenInferenceSpanKind::Reranker,
790 Some(ScopeType::Guardrail) => OpenInferenceSpanKind::Guardrail,
791 Some(ScopeType::Evaluator) => OpenInferenceSpanKind::Evaluator,
792 Some(ScopeType::Function | ScopeType::Custom | ScopeType::Unknown) | None => {
793 OpenInferenceSpanKind::Chain
794 }
795 }
796}
797
798fn push_serialized<T: Serialize + ?Sized>(
799 attributes: &mut Vec<KeyValue>,
800 key: &'static str,
801 value: Option<&T>,
802) {
803 if let Some(value) = value
804 && let Ok(json) = serde_json::to_string(value)
805 {
806 attributes.push(KeyValue::new(key, json));
807 }
808}
809
810fn openinference_input_value(event: &Event) -> Option<String> {
811 let input = event.input()?;
812
813 if event
814 .category()
815 .is_some_and(|category| category.as_str() == "llm")
816 {
817 return match input {
818 serde_json::Value::Object(object) => {
819 if let Some(content) = object.get("content") {
820 return to_json_string(content);
821 }
822
823 let mut sanitized = object.clone();
824 sanitized.remove("headers");
825 to_json_string(&serde_json::Value::Object(sanitized))
826 }
827 _ => to_json_string(input),
828 };
829 }
830
831 to_json_string(input)
832}
833
834fn to_json_string<T: Serialize>(value: &T) -> Option<String> {
835 serde_json::to_string(value).ok()
836}
837
838fn local_parent_span_context(span_context: &SpanContext) -> SpanContext {
839 SpanContext::new(
840 span_context.trace_id(),
841 span_context.span_id(),
842 span_context.trace_flags(),
843 false,
844 span_context.trace_state().clone(),
845 )
846}
847
848fn to_system_time(timestamp: DateTime<Utc>) -> SystemTime {
849 let seconds = timestamp.timestamp();
850 let nanos = timestamp.timestamp_subsec_nanos();
851 if seconds >= 0 {
852 UNIX_EPOCH + Duration::new(seconds as u64, nanos)
853 } else if nanos == 0 {
854 UNIX_EPOCH - Duration::new(seconds.unsigned_abs(), 0)
855 } else {
856 UNIX_EPOCH - Duration::new(seconds.unsigned_abs() - 1, 1_000_000_000 - nanos)
857 }
858}
859
860#[cfg(test)]
861#[path = "../../tests/unit/observability/openinference_tests.rs"]
862mod tests;