1use std::collections::HashMap;
19use std::future::Future;
20use std::path::PathBuf;
21use std::pin::Pin;
22use std::sync::{Arc, Mutex};
23use std::time::Duration;
24
25use serde::{Deserialize, Serialize};
26use serde_json::{Map, Value as Json};
27use uuid::Uuid;
28
29use crate::api::event::{Event, ScopeCategory};
30use crate::api::runtime::{EventSubscriberFn, current_scope_stack};
31use crate::api::scope::ScopeType;
32use crate::api::subscriber::{scope_deregister_subscriber, scope_register_subscriber};
33use crate::observability::atif::{AtifAgentInfo, AtifExporter};
34use crate::observability::atof::{
35 AtofExporter, AtofExporterConfig as CoreAtofExporterConfig, AtofExporterMode,
36};
37#[cfg(feature = "openinference")]
38use crate::observability::openinference::{
39 OpenInferenceConfig as CoreOpenInferenceConfig, OpenInferenceSubscriber,
40 OtlpTransport as OpenInferenceTransport,
41};
42#[cfg(feature = "otel")]
43use crate::observability::otel::{
44 OpenTelemetryConfig as CoreOpenTelemetryConfig, OpenTelemetrySubscriber,
45};
46use crate::plugin::{
47 ConfigDiagnostic, ConfigPolicy, DiagnosticLevel, Plugin, PluginComponentSpec, PluginError,
48 PluginRegistration, PluginRegistrationContext, Result as PluginResult, UnsupportedBehavior,
49 deregister_plugin, register_plugin,
50};
51
52pub const OBSERVABILITY_PLUGIN_KIND: &str = "observability";
54
55#[derive(Debug, Clone)]
61pub struct ComponentSpec {
62 pub enabled: bool,
64 pub config: ObservabilityConfig,
66}
67
68impl ComponentSpec {
69 pub fn new(config: ObservabilityConfig) -> Self {
74 Self {
75 enabled: true,
76 config,
77 }
78 }
79}
80
81impl From<ComponentSpec> for PluginComponentSpec {
82 fn from(value: ComponentSpec) -> Self {
83 let Json::Object(config) = serde_json::to_value(value.config)
84 .expect("observability config should serialize to object")
85 else {
86 unreachable!("observability config must serialize to object");
87 };
88
89 PluginComponentSpec {
90 kind: OBSERVABILITY_PLUGIN_KIND.to_string(),
91 enabled: value.enabled,
92 config,
93 }
94 }
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
103#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
104pub struct ObservabilityConfig {
105 #[serde(default = "default_observability_config_version")]
107 pub version: u32,
108 #[serde(default, skip_serializing_if = "Option::is_none")]
110 pub atof: Option<AtofSectionConfig>,
111 #[serde(default, skip_serializing_if = "Option::is_none")]
113 pub atif: Option<AtifSectionConfig>,
114 #[serde(default, skip_serializing_if = "Option::is_none")]
116 pub opentelemetry: Option<OtlpSectionConfig>,
117 #[serde(default, skip_serializing_if = "Option::is_none")]
119 pub openinference: Option<OtlpSectionConfig>,
120 #[serde(default)]
122 pub policy: ConfigPolicy,
123}
124
125impl Default for ObservabilityConfig {
126 fn default() -> Self {
127 Self {
128 version: default_observability_config_version(),
129 atof: None,
130 atif: None,
131 opentelemetry: None,
132 openinference: None,
133 policy: ConfigPolicy::default(),
134 }
135 }
136}
137
138#[derive(Debug, Clone, Serialize, Deserialize)]
145#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
146pub struct AtofSectionConfig {
147 #[serde(default)]
149 pub enabled: bool,
150 #[serde(default, skip_serializing_if = "Option::is_none")]
152 pub output_directory: Option<PathBuf>,
153 #[serde(default, skip_serializing_if = "Option::is_none")]
155 pub filename: Option<String>,
156 #[serde(default = "default_atof_mode")]
158 #[cfg_attr(feature = "schema", schemars(schema_with = "atof_mode_schema"))]
159 pub mode: String,
160}
161
162impl Default for AtofSectionConfig {
163 fn default() -> Self {
164 Self {
165 enabled: false,
166 output_directory: None,
167 filename: None,
168 mode: default_atof_mode(),
169 }
170 }
171}
172
173#[derive(Debug, Clone, Serialize, Deserialize)]
180#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
181pub struct AtifSectionConfig {
182 #[serde(default)]
184 pub enabled: bool,
185 #[serde(default = "default_agent_name")]
187 pub agent_name: String,
188 #[serde(default = "default_agent_version")]
190 pub agent_version: String,
191 #[serde(default = "default_model_name")]
193 pub model_name: String,
194 #[serde(default, skip_serializing_if = "Option::is_none")]
196 pub tool_definitions: Option<Vec<Json>>,
197 #[serde(default, skip_serializing_if = "Option::is_none")]
199 pub extra: Option<Json>,
200 #[serde(default, skip_serializing_if = "Option::is_none")]
202 pub output_directory: Option<PathBuf>,
203 #[serde(default = "default_atif_filename_template")]
205 pub filename_template: String,
206}
207
208impl Default for AtifSectionConfig {
209 fn default() -> Self {
210 Self {
211 enabled: false,
212 agent_name: default_agent_name(),
213 agent_version: default_agent_version(),
214 model_name: default_model_name(),
215 tool_definitions: None,
216 extra: None,
217 output_directory: None,
218 filename_template: default_atif_filename_template(),
219 }
220 }
221}
222
223#[derive(Debug, Clone, Serialize, Deserialize)]
229#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
230pub struct OtlpSectionConfig {
231 #[serde(default)]
233 pub enabled: bool,
234 #[serde(default = "default_otlp_transport")]
236 #[cfg_attr(feature = "schema", schemars(schema_with = "otlp_transport_schema"))]
237 pub transport: String,
238 #[serde(default, skip_serializing_if = "Option::is_none")]
240 pub endpoint: Option<String>,
241 #[serde(default)]
243 pub headers: HashMap<String, String>,
244 #[serde(default)]
246 pub resource_attributes: HashMap<String, String>,
247 #[serde(default = "default_service_name")]
249 pub service_name: String,
250 #[serde(default, skip_serializing_if = "Option::is_none")]
252 pub service_namespace: Option<String>,
253 #[serde(default, skip_serializing_if = "Option::is_none")]
255 pub service_version: Option<String>,
256 #[serde(default, skip_serializing_if = "Option::is_none")]
258 pub instrumentation_scope: Option<String>,
259 #[serde(default = "default_timeout_millis")]
261 pub timeout_millis: u64,
262}
263
264impl Default for OtlpSectionConfig {
265 fn default() -> Self {
266 Self {
267 enabled: false,
268 transport: default_otlp_transport(),
269 endpoint: None,
270 headers: HashMap::new(),
271 resource_attributes: HashMap::new(),
272 service_name: default_service_name(),
273 service_namespace: None,
274 service_version: None,
275 instrumentation_scope: None,
276 timeout_millis: default_timeout_millis(),
277 }
278 }
279}
280
281crate::editor_config! {
282 impl ObservabilityConfig {
283 atof => {
284 label: "ATOF",
285 kind: Section,
286 optional: true,
287 nested: AtofSectionConfig,
288 default: AtofSectionConfig,
289 },
290 atif => {
291 label: "ATIF",
292 kind: Section,
293 optional: true,
294 nested: AtifSectionConfig,
295 default: AtifSectionConfig,
296 },
297 opentelemetry => {
298 label: "OpenTelemetry",
299 kind: Section,
300 optional: true,
301 nested: OtlpSectionConfig,
302 default: OtlpSectionConfig,
303 },
304 openinference => {
305 label: "OpenInference",
306 kind: Section,
307 optional: true,
308 nested: OtlpSectionConfig,
309 default: OtlpSectionConfig,
310 },
311 policy => {
312 label: "policy",
313 kind: Section,
314 nested: ConfigPolicy,
315 default: ConfigPolicy,
316 },
317 }
318}
319
320crate::editor_config! {
321 impl AtofSectionConfig {
322 enabled => { label: "enabled", kind: Boolean },
323 output_directory => { label: "output_directory", kind: String, optional: true },
324 filename => { label: "filename", kind: String, optional: true },
325 mode => { label: "mode", kind: Enum, values: ["append", "overwrite"] },
326 }
327}
328
329crate::editor_config! {
330 impl AtifSectionConfig {
331 enabled => { label: "enabled", kind: Boolean },
332 agent_name => { label: "agent_name", kind: String },
333 agent_version => { label: "agent_version", kind: String },
334 model_name => { label: "model_name", kind: String },
335 tool_definitions => { label: "tool_definitions", kind: Json, optional: true },
336 extra => { label: "extra", kind: Json, optional: true },
337 output_directory => { label: "output_directory", kind: String, optional: true },
338 filename_template => { label: "filename_template", kind: String },
339 }
340}
341
342crate::editor_config! {
343 impl OtlpSectionConfig {
344 enabled => { label: "enabled", kind: Boolean },
345 transport => { label: "transport", kind: Enum, values: ["http_binary", "grpc"] },
346 endpoint => { label: "endpoint", kind: String, optional: true },
347 headers => { label: "headers", kind: StringMap },
348 resource_attributes => { label: "resource_attributes", kind: StringMap },
349 service_name => { label: "service_name", kind: String },
350 service_namespace => { label: "service_namespace", kind: String, optional: true },
351 service_version => { label: "service_version", kind: String, optional: true },
352 instrumentation_scope => { label: "instrumentation_scope", kind: String, optional: true },
353 timeout_millis => { label: "timeout_millis", kind: Integer },
354 }
355}
356
357struct ObservabilityPlugin;
358
359impl Plugin for ObservabilityPlugin {
360 fn plugin_kind(&self) -> &str {
361 OBSERVABILITY_PLUGIN_KIND
362 }
363
364 fn allows_multiple_components(&self) -> bool {
365 false
366 }
367
368 fn validate(&self, plugin_config: &Map<String, Json>) -> Vec<ConfigDiagnostic> {
369 validate_observability_plugin_config(plugin_config)
370 }
371
372 fn register<'a>(
373 &'a self,
374 plugin_config: &Map<String, Json>,
375 ctx: &'a mut PluginRegistrationContext,
376 ) -> Pin<Box<dyn Future<Output = PluginResult<()>> + Send + 'a>> {
377 let plugin_config = plugin_config.clone();
378 Box::pin(async move {
379 let config = parse_observability_config(&plugin_config)?;
380 register_observability(config, ctx)
381 })
382 }
383}
384
385pub fn register_observability_component() -> PluginResult<()> {
391 match register_plugin(Arc::new(ObservabilityPlugin)) {
392 Ok(()) => Ok(()),
393 Err(PluginError::RegistrationFailed(message)) if message.contains("already registered") => {
394 Ok(())
395 }
396 Err(err) => Err(err),
397 }
398}
399
400pub fn deregister_observability_component() -> bool {
406 deregister_plugin(OBSERVABILITY_PLUGIN_KIND)
407}
408
409#[cfg(feature = "schema")]
411pub fn observability_config_schema() -> serde_json::Value {
412 serde_json::to_value(schemars::schema_for!(ObservabilityConfig))
413 .expect("observability config schema should serialize")
414}
415
416#[cfg(feature = "schema")]
417fn atof_mode_schema(generator: &mut schemars::r#gen::SchemaGenerator) -> schemars::schema::Schema {
418 string_enum_schema(generator, &["append", "overwrite"], Some("append"))
419}
420
421#[cfg(feature = "schema")]
422fn otlp_transport_schema(
423 generator: &mut schemars::r#gen::SchemaGenerator,
424) -> schemars::schema::Schema {
425 string_enum_schema(generator, &["http_binary", "grpc"], Some("http_binary"))
426}
427
428#[cfg(feature = "schema")]
429fn string_enum_schema(
430 generator: &mut schemars::r#gen::SchemaGenerator,
431 values: &[&str],
432 default: Option<&str>,
433) -> schemars::schema::Schema {
434 let mut schema: schemars::schema::SchemaObject =
435 <String as schemars::JsonSchema>::json_schema(generator).into();
436 schema.enum_values = Some(
437 values
438 .iter()
439 .map(|value| Json::String((*value).into()))
440 .collect(),
441 );
442 if let Some(default) = default {
443 schema.metadata().default = Some(Json::String(default.into()));
444 }
445 schema.into()
446}
447
448fn register_observability(
449 config: ObservabilityConfig,
450 ctx: &mut PluginRegistrationContext,
451) -> PluginResult<()> {
452 if let Some(atof) = config.atof.filter(|section| section.enabled) {
453 register_atof_exporter(atof, ctx)?;
454 }
455 if let Some(atif) = config.atif.filter(|section| section.enabled) {
456 register_atif_dispatcher(atif, ctx)?;
457 }
458 if let Some(otel) = config.opentelemetry.filter(|section| section.enabled) {
459 register_opentelemetry(otel, ctx)?;
460 }
461 if let Some(openinference) = config.openinference.filter(|section| section.enabled) {
462 register_openinference(openinference, ctx)?;
463 }
464 Ok(())
465}
466
467fn register_atof_exporter(
468 section: AtofSectionConfig,
469 ctx: &mut PluginRegistrationContext,
470) -> PluginResult<()> {
471 let mode = AtofExporterMode::parse(§ion.mode).ok_or_else(|| {
472 PluginError::InvalidConfig("ATOF mode must be 'append' or 'overwrite'".to_string())
473 })?;
474 let mut config = CoreAtofExporterConfig::new().with_mode(mode);
475 if let Some(output_directory) = section.output_directory {
476 config = config.with_output_directory(output_directory);
477 }
478 if let Some(filename) = section.filename {
479 config = config.with_filename(filename);
480 }
481
482 let exporter = Arc::new(AtofExporter::new(config).map_err(observability_registration_error)?);
483 ctx.register_subscriber("atof", exporter.subscriber())?;
484 ctx.add_registration(PluginRegistration::new(
485 "observability",
486 ctx.qualify_name("atof.shutdown"),
487 Box::new(move || {
488 exporter
489 .shutdown()
490 .map_err(observability_registration_error)
491 }),
492 ));
493 Ok(())
494}
495
496fn register_atif_dispatcher(
497 section: AtifSectionConfig,
498 ctx: &mut PluginRegistrationContext,
499) -> PluginResult<()> {
500 if !section.filename_template.contains("{session_id}") {
501 return Err(PluginError::InvalidConfig(
502 "ATIF filename_template must contain '{session_id}'".to_string(),
503 ));
504 }
505
506 let manager = Arc::new(Mutex::new(AtifDispatcher::new(section)));
507 let dispatcher = atif_dispatcher_subscriber(Arc::clone(&manager), ctx.qualify_name("atif-"));
508 ctx.register_subscriber("atif", dispatcher)?;
509 ctx.add_registration(PluginRegistration::new(
510 "observability",
511 ctx.qualify_name("atif.shutdown"),
512 Box::new(move || {
513 let work = {
514 let mut guard = manager.lock().map_err(|err| {
515 PluginError::Internal(format!("ATIF dispatcher lock poisoned: {err}"))
516 })?;
517 guard
518 .flush_open_agents()
519 .map_err(observability_registration_error)?
520 };
521 for (scope_uuid, name) in work.scope_subscribers {
522 let _ = scope_deregister_subscriber(&scope_uuid, &name);
523 }
524 for write in work.writes {
525 let agent_uuid = write.agent_uuid;
526 let result = write_atif_file(&write);
527 let mut guard = manager.lock().map_err(|err| {
528 PluginError::Internal(format!("ATIF dispatcher lock poisoned: {err}"))
529 })?;
530 guard
531 .finish_agent_write(agent_uuid, result)
532 .map_err(observability_registration_error)?;
533 }
534 let guard = manager.lock().map_err(|err| {
535 PluginError::Internal(format!("ATIF dispatcher lock poisoned: {err}"))
536 })?;
537 guard
538 .last_error_result()
539 .map_err(observability_registration_error)
540 }),
541 ));
542 Ok(())
543}
544
545#[cfg(feature = "otel")]
546fn register_opentelemetry(
547 section: OtlpSectionConfig,
548 ctx: &mut PluginRegistrationContext,
549) -> PluginResult<()> {
550 let subscriber = Arc::new(
551 OpenTelemetrySubscriber::new(build_otel_config(section)?)
552 .map_err(observability_registration_error)?,
553 );
554 ctx.register_subscriber("opentelemetry", subscriber.subscriber())?;
555 ctx.add_registration(PluginRegistration::new(
556 "observability",
557 ctx.qualify_name("opentelemetry.shutdown"),
558 Box::new(move || {
559 subscriber
560 .shutdown()
561 .map_err(observability_registration_error)
562 }),
563 ));
564 Ok(())
565}
566
567#[cfg(not(feature = "otel"))]
568fn register_opentelemetry(
569 _section: OtlpSectionConfig,
570 _ctx: &mut PluginRegistrationContext,
571) -> PluginResult<()> {
572 Err(PluginError::InvalidConfig(
573 "OpenTelemetry support is not enabled in this build".to_string(),
574 ))
575}
576
577#[cfg(feature = "openinference")]
578fn register_openinference(
579 section: OtlpSectionConfig,
580 ctx: &mut PluginRegistrationContext,
581) -> PluginResult<()> {
582 let subscriber = Arc::new(
583 OpenInferenceSubscriber::new(build_openinference_config(section)?)
584 .map_err(observability_registration_error)?,
585 );
586 ctx.register_subscriber("openinference", subscriber.subscriber())?;
587 ctx.add_registration(PluginRegistration::new(
588 "observability",
589 ctx.qualify_name("openinference.shutdown"),
590 Box::new(move || {
591 subscriber
592 .shutdown()
593 .map_err(observability_registration_error)
594 }),
595 ));
596 Ok(())
597}
598
599#[cfg(not(feature = "openinference"))]
600fn register_openinference(
601 _section: OtlpSectionConfig,
602 _ctx: &mut PluginRegistrationContext,
603) -> PluginResult<()> {
604 Err(PluginError::InvalidConfig(
605 "OpenInference support is not enabled in this build".to_string(),
606 ))
607}
608
609struct AtifDispatcher {
610 config: AtifSectionConfig,
611 agents: HashMap<Uuid, ManagedAtifExporter>,
612 scope_subscribers: HashMap<Uuid, String>,
613 last_error: Option<String>,
614}
615
616struct ManagedAtifExporter {
617 exporter: AtifExporter,
618 path: PathBuf,
619 observed_events: Vec<Event>,
620 written: bool,
621}
622
623struct PendingAtifWrite {
624 agent_uuid: Uuid,
625 path: PathBuf,
626 payload: Vec<u8>,
627}
628
629struct AtifFlushWork {
630 writes: Vec<PendingAtifWrite>,
631 scope_subscribers: Vec<(Uuid, String)>,
632}
633
634impl AtifDispatcher {
635 fn new(config: AtifSectionConfig) -> Self {
636 Self {
637 config,
638 agents: HashMap::new(),
639 scope_subscribers: HashMap::new(),
640 last_error: None,
641 }
642 }
643
644 fn observe_global(&mut self, event: &Event, subscriber_prefix: &str, state: Arc<Mutex<Self>>) {
645 if self.last_error.is_some() || !is_top_level_agent_start(event) {
646 return;
647 }
648
649 let session_id = event.uuid().to_string();
654 let exporter = AtifExporter::new(session_id.clone(), self.agent_info());
655 (exporter.subscriber())(event);
656 let path = self.output_path(&session_id);
657 self.agents.insert(
658 event.uuid(),
659 ManagedAtifExporter {
660 exporter,
661 path,
662 observed_events: vec![event.clone()],
663 written: false,
664 },
665 );
666
667 let agent_uuid = event.uuid();
668 let name = format!("{subscriber_prefix}{agent_uuid}");
669 let callback = atif_scope_subscriber(state, agent_uuid);
670 if let Err(err) = scope_register_subscriber(&agent_uuid, &name, callback) {
673 self.last_error = Some(format!("failed to register ATIF scope subscriber: {err}"));
674 } else {
675 self.scope_subscribers.insert(agent_uuid, name);
676 }
677 }
678
679 fn observe_scope(&mut self, event: &Event, agent_uuid: Uuid) -> Option<PendingAtifWrite> {
680 if self.last_error.is_some() {
681 return None;
682 }
683 let should_finalize =
684 event.uuid() == agent_uuid && event.scope_category() == Some(ScopeCategory::End);
685 let agent = self.agents.get_mut(&agent_uuid)?;
686 (agent.exporter.subscriber())(event);
687 agent.observed_events.push(event.clone());
688 if !should_finalize || agent.written {
689 return None;
690 }
691 match prepare_atif_file(agent_uuid, agent) {
692 Ok(write) => Some(write),
693 Err(err) => {
694 self.last_error = Some(err.to_string());
695 None
696 }
697 }
698 }
699
700 fn complete_scope_write(
701 &mut self,
702 agent_uuid: Uuid,
703 result: std::io::Result<()>,
704 ) -> Option<(Uuid, String)> {
705 if self.finish_agent_write(agent_uuid, result).is_err() {
706 return None;
707 }
708 self.agents.remove(&agent_uuid);
709 self.scope_subscribers
710 .remove(&agent_uuid)
711 .map(|name| (agent_uuid, name))
712 }
713
714 fn flush_open_agents(&mut self) -> std::io::Result<AtifFlushWork> {
715 let scope_subscribers = std::mem::take(&mut self.scope_subscribers)
719 .into_iter()
720 .collect();
721 let agent_uuids = self
722 .agents
723 .iter()
724 .filter_map(|(agent_uuid, agent)| (!agent.written).then_some(*agent_uuid))
725 .collect::<Vec<_>>();
726 let mut writes = Vec::with_capacity(agent_uuids.len());
727 for agent_uuid in agent_uuids {
728 if let Some(agent) = self.agents.get_mut(&agent_uuid) {
729 writes.push(prepare_atif_file(agent_uuid, agent)?);
730 }
731 }
732 Ok(AtifFlushWork {
733 writes,
734 scope_subscribers,
735 })
736 }
737
738 fn finish_agent_write(
739 &mut self,
740 agent_uuid: Uuid,
741 result: std::io::Result<()>,
742 ) -> std::io::Result<()> {
743 match result {
744 Ok(()) => {
745 if let Some(agent) = self.agents.get_mut(&agent_uuid) {
746 agent.observed_events.clear();
747 }
748 Ok(())
749 }
750 Err(err) => {
751 if let Some(agent) = self.agents.get_mut(&agent_uuid) {
752 agent.written = false;
753 }
754 self.last_error = Some(err.to_string());
755 Err(err)
756 }
757 }
758 }
759
760 fn last_error_result(&self) -> std::io::Result<()> {
761 if let Some(message) = &self.last_error {
762 return Err(std::io::Error::other(message.clone()));
763 }
764 Ok(())
765 }
766
767 fn agent_info(&self) -> AtifAgentInfo {
768 AtifAgentInfo {
769 name: self.config.agent_name.clone(),
770 version: self.config.agent_version.clone(),
771 model_name: Some(self.config.model_name.clone()),
772 tool_definitions: self.config.tool_definitions.clone(),
773 extra: self.config.extra.clone(),
774 }
775 }
776
777 fn output_path(&self, session_id: &str) -> PathBuf {
778 let directory = self
779 .config
780 .output_directory
781 .clone()
782 .unwrap_or_else(default_output_directory);
783 let filename = self
784 .config
785 .filename_template
786 .replace("{session_id}", session_id);
787 directory.join(filename)
788 }
789}
790
791fn atif_dispatcher_subscriber(
792 manager: Arc<Mutex<AtifDispatcher>>,
793 subscriber_prefix: String,
794) -> EventSubscriberFn {
795 Arc::new(move |event: &Event| {
796 let Ok(mut guard) = manager.lock() else {
797 return;
798 };
799 guard.observe_global(event, &subscriber_prefix, Arc::clone(&manager));
800 })
801}
802
803fn atif_scope_subscriber(
804 manager: Arc<Mutex<AtifDispatcher>>,
805 agent_uuid: Uuid,
806) -> EventSubscriberFn {
807 Arc::new(move |event: &Event| {
808 let pending_write = {
809 let Ok(mut guard) = manager.lock() else {
810 return;
811 };
812 guard.observe_scope(event, agent_uuid)
813 };
814 let Some(write) = pending_write else {
815 return;
816 };
817 let result = write_atif_file(&write);
818 let scope_subscriber = {
819 let Ok(mut guard) = manager.lock() else {
820 return;
821 };
822 guard.complete_scope_write(write.agent_uuid, result)
823 };
824 if let Some((scope_uuid, name)) = scope_subscriber {
825 let _ = scope_deregister_subscriber(&scope_uuid, &name);
826 }
827 })
828}
829
830fn prepare_atif_file(
831 agent_uuid: Uuid,
832 agent: &mut ManagedAtifExporter,
833) -> std::io::Result<PendingAtifWrite> {
834 let trajectory = agent.exporter.export();
835 let mut value = serde_json::to_value(trajectory)?;
836 if let Some(object) = value.as_object_mut() {
837 object.insert(
838 "extra".to_string(),
839 serde_json::json!({
840 "observed_events": agent.observed_events,
841 }),
842 );
843 }
844 let payload = serde_json::to_vec_pretty(&value)?;
845 agent.written = true;
846 Ok(PendingAtifWrite {
847 agent_uuid,
848 path: agent.path.clone(),
849 payload,
850 })
851}
852
853fn write_atif_file(write: &PendingAtifWrite) -> std::io::Result<()> {
854 if let Some(parent) = write.path.parent() {
855 std::fs::create_dir_all(parent)?;
856 }
857 std::fs::write(&write.path, &write.payload)?;
858 Ok(())
859}
860
861fn is_top_level_agent_start(event: &Event) -> bool {
862 if event.scope_category() != Some(ScopeCategory::Start)
863 || event.scope_type() != Some(ScopeType::Agent)
864 {
865 return false;
866 }
867 let Some(parent_uuid) = event.parent_uuid() else {
868 return false;
869 };
870 current_scope_stack()
871 .read()
872 .map(|stack| stack.root_uuid() == parent_uuid)
873 .unwrap_or(false)
874}
875
876#[cfg(feature = "otel")]
877fn build_otel_config(section: OtlpSectionConfig) -> PluginResult<CoreOpenTelemetryConfig> {
878 let mut config = match section.transport.as_str() {
879 "http_binary" => CoreOpenTelemetryConfig::http_binary(section.service_name),
880 "grpc" => CoreOpenTelemetryConfig::grpc(section.service_name),
881 other => {
882 return Err(PluginError::InvalidConfig(format!(
883 "OpenTelemetry transport must be 'http_binary' or 'grpc', got {other:?}"
884 )));
885 }
886 }
887 .with_timeout(Duration::from_millis(section.timeout_millis));
888
889 if let Some(endpoint) = section.endpoint {
890 config = config.with_endpoint(endpoint);
891 }
892 if let Some(namespace) = section.service_namespace {
893 config = config.with_service_namespace(namespace);
894 }
895 if let Some(version) = section.service_version {
896 config = config.with_service_version(version);
897 }
898 if let Some(scope) = section.instrumentation_scope {
899 config = config.with_instrumentation_scope(scope);
900 }
901 for (key, value) in section.headers {
902 config = config.with_header(key, value);
903 }
904 for (key, value) in section.resource_attributes {
905 config = config.with_resource_attribute(key, value);
906 }
907 Ok(config)
908}
909
910#[cfg(feature = "openinference")]
911fn build_openinference_config(section: OtlpSectionConfig) -> PluginResult<CoreOpenInferenceConfig> {
912 let transport = match section.transport.as_str() {
913 "http_binary" => OpenInferenceTransport::HttpBinary,
914 "grpc" => OpenInferenceTransport::Grpc,
915 other => {
916 return Err(PluginError::InvalidConfig(format!(
917 "OpenInference transport must be 'http_binary' or 'grpc', got {other:?}"
918 )));
919 }
920 };
921 let mut config = CoreOpenInferenceConfig::new()
922 .with_transport(transport)
923 .with_service_name(section.service_name)
924 .with_timeout(Duration::from_millis(section.timeout_millis));
925
926 if let Some(endpoint) = section.endpoint {
927 config = config.with_endpoint(endpoint);
928 }
929 if let Some(namespace) = section.service_namespace {
930 config = config.with_service_namespace(namespace);
931 }
932 if let Some(version) = section.service_version {
933 config = config.with_service_version(version);
934 }
935 if let Some(scope) = section.instrumentation_scope {
936 config = config.with_instrumentation_scope(scope);
937 }
938 for (key, value) in section.headers {
939 config = config.with_header(key, value);
940 }
941 for (key, value) in section.resource_attributes {
942 config = config.with_resource_attribute(key, value);
943 }
944 Ok(config)
945}
946
947fn parse_observability_config(
948 plugin_config: &Map<String, Json>,
949) -> PluginResult<ObservabilityConfig> {
950 serde_json::from_value(Json::Object(plugin_config.clone())).map_err(|err| {
951 PluginError::InvalidConfig(format!("invalid observability plugin config: {err}"))
952 })
953}
954
955fn validate_observability_plugin_config(
956 plugin_config: &Map<String, Json>,
957) -> Vec<ConfigDiagnostic> {
958 let config = match parse_observability_config(plugin_config) {
959 Ok(config) => config,
960 Err(err) => {
961 return vec![ConfigDiagnostic {
962 level: DiagnosticLevel::Error,
963 code: "observability.invalid_plugin_config".to_string(),
964 component: Some(OBSERVABILITY_PLUGIN_KIND.to_string()),
965 field: None,
966 message: err.to_string(),
967 }];
968 }
969 };
970
971 let mut diagnostics = vec![];
972 validate_unknown_fields(
973 &mut diagnostics,
974 &config.policy,
975 Some(OBSERVABILITY_PLUGIN_KIND.to_string()),
976 plugin_config,
977 &[
978 "version",
979 "atof",
980 "atif",
981 "opentelemetry",
982 "openinference",
983 "policy",
984 ],
985 );
986
987 validate_version(&mut diagnostics, &config.policy, config.version);
988 validate_policy_fields(&mut diagnostics, &config.policy, plugin_config);
989 validate_section_fields(
990 &mut diagnostics,
991 &config.policy,
992 plugin_config,
993 "atof",
994 &["enabled", "output_directory", "filename", "mode"],
995 );
996 validate_section_fields(
997 &mut diagnostics,
998 &config.policy,
999 plugin_config,
1000 "atif",
1001 &[
1002 "enabled",
1003 "agent_name",
1004 "agent_version",
1005 "model_name",
1006 "tool_definitions",
1007 "extra",
1008 "output_directory",
1009 "filename_template",
1010 ],
1011 );
1012 validate_section_fields(
1013 &mut diagnostics,
1014 &config.policy,
1015 plugin_config,
1016 "opentelemetry",
1017 &[
1018 "enabled",
1019 "transport",
1020 "endpoint",
1021 "headers",
1022 "resource_attributes",
1023 "service_name",
1024 "service_namespace",
1025 "service_version",
1026 "instrumentation_scope",
1027 "timeout_millis",
1028 ],
1029 );
1030 validate_section_fields(
1031 &mut diagnostics,
1032 &config.policy,
1033 plugin_config,
1034 "openinference",
1035 &[
1036 "enabled",
1037 "transport",
1038 "endpoint",
1039 "headers",
1040 "resource_attributes",
1041 "service_name",
1042 "service_namespace",
1043 "service_version",
1044 "instrumentation_scope",
1045 "timeout_millis",
1046 ],
1047 );
1048
1049 if let Some(section) = &config.atof {
1050 validate_atof_values(&mut diagnostics, &config.policy, section);
1051 #[cfg(target_arch = "wasm32")]
1052 if section.enabled {
1053 push_policy_diag(
1054 &mut diagnostics,
1055 config.policy.unsupported_value,
1056 "observability.unsupported_value",
1057 Some("atof".to_string()),
1058 Some("enabled".to_string()),
1059 "ATOF file export is not supported on WebAssembly".to_string(),
1060 );
1061 }
1062 }
1063 if let Some(section) = &config.atif {
1064 validate_atif_values(&mut diagnostics, &config.policy, section);
1065 #[cfg(target_arch = "wasm32")]
1066 if section.enabled {
1067 push_policy_diag(
1068 &mut diagnostics,
1069 config.policy.unsupported_value,
1070 "observability.unsupported_value",
1071 Some("atif".to_string()),
1072 Some("enabled".to_string()),
1073 "ATIF file export is not supported on WebAssembly".to_string(),
1074 );
1075 }
1076 }
1077 if let Some(section) = &config.opentelemetry {
1078 validate_otlp_values(&mut diagnostics, &config.policy, "opentelemetry", section);
1079 #[cfg(not(feature = "otel"))]
1080 if section.enabled {
1081 push_policy_diag(
1082 &mut diagnostics,
1083 config.policy.unsupported_value,
1084 "observability.feature_disabled",
1085 Some("opentelemetry".to_string()),
1086 Some("enabled".to_string()),
1087 "OpenTelemetry support is not enabled in this build".to_string(),
1088 );
1089 }
1090 }
1091 if let Some(section) = &config.openinference {
1092 validate_otlp_values(&mut diagnostics, &config.policy, "openinference", section);
1093 #[cfg(not(feature = "openinference"))]
1094 if section.enabled {
1095 push_policy_diag(
1096 &mut diagnostics,
1097 config.policy.unsupported_value,
1098 "observability.feature_disabled",
1099 Some("openinference".to_string()),
1100 Some("enabled".to_string()),
1101 "OpenInference support is not enabled in this build".to_string(),
1102 );
1103 }
1104 }
1105
1106 diagnostics
1107}
1108
1109fn validate_version(diagnostics: &mut Vec<ConfigDiagnostic>, policy: &ConfigPolicy, version: u32) {
1110 if version != 1 {
1111 push_policy_diag(
1112 diagnostics,
1113 policy.unsupported_value,
1114 "observability.unsupported_config_version",
1115 Some(OBSERVABILITY_PLUGIN_KIND.to_string()),
1116 Some("version".to_string()),
1117 format!("observability config version {version} is unsupported"),
1118 );
1119 }
1120}
1121
1122fn validate_policy_fields(
1123 diagnostics: &mut Vec<ConfigDiagnostic>,
1124 policy: &ConfigPolicy,
1125 plugin_config: &Map<String, Json>,
1126) {
1127 if let Some(policy_json) = plugin_config.get("policy").and_then(Json::as_object) {
1128 validate_unknown_fields(
1129 diagnostics,
1130 policy,
1131 Some("policy".to_string()),
1132 policy_json,
1133 &["unknown_component", "unknown_field", "unsupported_value"],
1134 );
1135 }
1136}
1137
1138fn validate_section_fields(
1139 diagnostics: &mut Vec<ConfigDiagnostic>,
1140 policy: &ConfigPolicy,
1141 plugin_config: &Map<String, Json>,
1142 section: &str,
1143 known_fields: &[&str],
1144) {
1145 if let Some(section_json) = plugin_config.get(section).and_then(Json::as_object) {
1146 validate_unknown_fields(
1147 diagnostics,
1148 policy,
1149 Some(section.to_string()),
1150 section_json,
1151 known_fields,
1152 );
1153 }
1154}
1155
1156fn validate_atof_values(
1157 diagnostics: &mut Vec<ConfigDiagnostic>,
1158 policy: &ConfigPolicy,
1159 section: &AtofSectionConfig,
1160) {
1161 if AtofExporterMode::parse(§ion.mode).is_none() {
1162 push_policy_diag(
1163 diagnostics,
1164 policy.unsupported_value,
1165 "observability.unsupported_value",
1166 Some("atof".to_string()),
1167 Some("mode".to_string()),
1168 "ATOF mode must be 'append' or 'overwrite'".to_string(),
1169 );
1170 }
1171}
1172
1173fn validate_atif_values(
1174 diagnostics: &mut Vec<ConfigDiagnostic>,
1175 policy: &ConfigPolicy,
1176 section: &AtifSectionConfig,
1177) {
1178 if !section.filename_template.contains("{session_id}") {
1179 push_policy_diag(
1180 diagnostics,
1181 policy.unsupported_value,
1182 "observability.unsupported_value",
1183 Some("atif".to_string()),
1184 Some("filename_template".to_string()),
1185 "ATIF filename_template must contain '{session_id}'".to_string(),
1186 );
1187 }
1188}
1189
1190fn validate_otlp_values(
1191 diagnostics: &mut Vec<ConfigDiagnostic>,
1192 policy: &ConfigPolicy,
1193 section_name: &str,
1194 section: &OtlpSectionConfig,
1195) {
1196 if !matches!(section.transport.as_str(), "http_binary" | "grpc") {
1197 push_policy_diag(
1198 diagnostics,
1199 policy.unsupported_value,
1200 "observability.unsupported_value",
1201 Some(section_name.to_string()),
1202 Some("transport".to_string()),
1203 format!("{section_name} transport must be 'http_binary' or 'grpc'"),
1204 );
1205 }
1206}
1207
1208fn validate_unknown_fields(
1209 diagnostics: &mut Vec<ConfigDiagnostic>,
1210 policy: &ConfigPolicy,
1211 component: Option<String>,
1212 config: &Map<String, Json>,
1213 known_fields: &[&str],
1214) {
1215 for field in config.keys() {
1216 if !known_fields.contains(&field.as_str()) {
1217 push_policy_diag(
1218 diagnostics,
1219 policy.unknown_field,
1220 "observability.unknown_field",
1221 component.clone(),
1222 Some(field.clone()),
1223 format!(
1224 "field '{}' is not recognized for '{}'",
1225 field,
1226 component.as_deref().unwrap_or("unknown")
1227 ),
1228 );
1229 }
1230 }
1231}
1232
1233fn push_policy_diag(
1234 diagnostics: &mut Vec<ConfigDiagnostic>,
1235 behavior: UnsupportedBehavior,
1236 code: &str,
1237 component: Option<String>,
1238 field: Option<String>,
1239 message: String,
1240) {
1241 let level = match behavior {
1242 UnsupportedBehavior::Ignore => return,
1243 UnsupportedBehavior::Warn => DiagnosticLevel::Warning,
1244 UnsupportedBehavior::Error => DiagnosticLevel::Error,
1245 };
1246 diagnostics.push(ConfigDiagnostic {
1247 level,
1248 code: code.to_string(),
1249 component,
1250 field,
1251 message,
1252 });
1253}
1254
1255fn observability_registration_error(error: impl std::fmt::Display) -> PluginError {
1256 PluginError::RegistrationFailed(error.to_string())
1257}
1258
1259fn default_observability_config_version() -> u32 {
1260 1
1261}
1262
1263fn default_atof_mode() -> String {
1264 "append".to_string()
1265}
1266
1267fn default_agent_name() -> String {
1268 "NeMo Flow".to_string()
1269}
1270
1271fn default_agent_version() -> String {
1272 env!("CARGO_PKG_VERSION").to_string()
1273}
1274
1275fn default_model_name() -> String {
1276 "unknown".to_string()
1277}
1278
1279fn default_atif_filename_template() -> String {
1280 "nemo-flow-atif-{session_id}.json".to_string()
1281}
1282
1283fn default_otlp_transport() -> String {
1284 "http_binary".to_string()
1285}
1286
1287fn default_service_name() -> String {
1288 "nemo-flow".to_string()
1289}
1290
1291fn default_timeout_millis() -> u64 {
1292 3_000
1293}
1294
1295fn default_output_directory() -> PathBuf {
1296 std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))
1297}
1298
1299#[cfg(test)]
1300#[path = "../../tests/unit/observability/plugin_component_tests.rs"]
1301mod tests;