Skip to main content

nemo_flow/observability/
plugin_component.rs

1// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Built-in observability plugin component.
5//!
6//! This module packages NeMo Flow's first-party observability exporters behind
7//! the shared plugin configuration system. Each exporter section is opt-in:
8//! omitted sections and sections with `enabled = false` validate but do not
9//! register subscribers or construct exporters.
10//!
11//! The plugin intentionally infers subscriber names from the component namespace
12//! so configuration remains portable across bindings. Agent Trajectory
13//! Observability Format (ATOF), OpenTelemetry, and OpenInference each register
14//! one global subscriber when enabled. Agent Trajectory Interchange Format
15//! (ATIF) uses a global dispatcher that detects direct child agent scopes and
16//! creates one scope-local exporter for each top-level agent run.
17
18use std::collections::HashMap;
19use std::future::Future;
20use std::path::PathBuf;
21use std::pin::Pin;
22use std::sync::{Arc, Mutex};
23use std::time::Duration;
24
25use serde::{Deserialize, Serialize};
26use serde_json::{Map, Value as Json};
27use uuid::Uuid;
28
29use crate::api::event::{Event, ScopeCategory};
30use crate::api::runtime::{EventSubscriberFn, current_scope_stack};
31use crate::api::scope::ScopeType;
32use crate::api::subscriber::{scope_deregister_subscriber, scope_register_subscriber};
33use crate::observability::atif::{AtifAgentInfo, AtifExporter};
34use crate::observability::atof::{
35    AtofExporter, AtofExporterConfig as CoreAtofExporterConfig, AtofExporterMode,
36};
37#[cfg(feature = "openinference")]
38use crate::observability::openinference::{
39    OpenInferenceConfig as CoreOpenInferenceConfig, OpenInferenceSubscriber,
40    OtlpTransport as OpenInferenceTransport,
41};
42#[cfg(feature = "otel")]
43use crate::observability::otel::{
44    OpenTelemetryConfig as CoreOpenTelemetryConfig, OpenTelemetrySubscriber,
45};
46use crate::plugin::{
47    ConfigDiagnostic, ConfigPolicy, DiagnosticLevel, Plugin, PluginComponentSpec, PluginError,
48    PluginRegistration, PluginRegistrationContext, Result as PluginResult, UnsupportedBehavior,
49    deregister_plugin, register_plugin,
50};
51
52/// The plugin kind registered by the core crate.
53pub const OBSERVABILITY_PLUGIN_KIND: &str = "observability";
54
55/// Top-level observability component wrapper.
56///
57/// Use this wrapper when constructing a [`PluginComponentSpec`] from Rust
58/// instead of hand-writing the generic plugin component shape. The component
59/// kind is always [`OBSERVABILITY_PLUGIN_KIND`].
60#[derive(Debug, Clone)]
61pub struct ComponentSpec {
62    /// Whether the observability component should be activated.
63    pub enabled: bool,
64    /// Observability config for this top-level component.
65    pub config: ObservabilityConfig,
66}
67
68impl ComponentSpec {
69    /// Creates an enabled observability component spec.
70    ///
71    /// The returned component can be converted into the generic plugin config
72    /// entry with `PluginComponentSpec::from(...)`.
73    pub fn new(config: ObservabilityConfig) -> Self {
74        Self {
75            enabled: true,
76            config,
77        }
78    }
79}
80
81impl From<ComponentSpec> for PluginComponentSpec {
82    fn from(value: ComponentSpec) -> Self {
83        let Json::Object(config) = serde_json::to_value(value.config)
84            .expect("observability config should serialize to object")
85        else {
86            unreachable!("observability config must serialize to object");
87        };
88
89        PluginComponentSpec {
90            kind: OBSERVABILITY_PLUGIN_KIND.to_string(),
91            enabled: value.enabled,
92            config,
93        }
94    }
95}
96
97/// Canonical config document for the observability plugin component.
98///
99/// Every section is optional. A missing section has the same activation
100/// behavior as a section with `enabled = false`: it contributes no runtime
101/// subscribers and performs no export work.
102#[derive(Debug, Clone, Serialize, Deserialize)]
103#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
104pub struct ObservabilityConfig {
105    /// Observability config schema version.
106    #[serde(default = "default_observability_config_version")]
107    pub version: u32,
108    /// Filesystem-backed raw ATOF JSONL export.
109    #[serde(default, skip_serializing_if = "Option::is_none")]
110    pub atof: Option<AtofSectionConfig>,
111    /// Per-top-level-agent ATIF trajectory export.
112    #[serde(default, skip_serializing_if = "Option::is_none")]
113    pub atif: Option<AtifSectionConfig>,
114    /// OpenTelemetry trace export.
115    #[serde(default, skip_serializing_if = "Option::is_none")]
116    pub opentelemetry: Option<OtlpSectionConfig>,
117    /// OpenInference trace export.
118    #[serde(default, skip_serializing_if = "Option::is_none")]
119    pub openinference: Option<OtlpSectionConfig>,
120    /// Observability-local unsupported-config policy.
121    #[serde(default)]
122    pub policy: ConfigPolicy,
123}
124
125impl Default for ObservabilityConfig {
126    fn default() -> Self {
127        Self {
128            version: default_observability_config_version(),
129            atof: None,
130            atif: None,
131            opentelemetry: None,
132            openinference: None,
133            policy: ConfigPolicy::default(),
134        }
135    }
136}
137
138/// Filesystem-backed ATOF JSONL exporter config.
139///
140/// When enabled, this section wraps
141/// [`crate::observability::atof::AtofExporter`] and writes the raw ATOF event
142/// stream as JSONL. The exporter uses the current working directory and a
143/// timestamped filename when no explicit path settings are supplied.
144#[derive(Debug, Clone, Serialize, Deserialize)]
145#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
146pub struct AtofSectionConfig {
147    /// Whether ATOF JSONL export is active.
148    #[serde(default)]
149    pub enabled: bool,
150    /// Directory containing the JSONL output file.
151    #[serde(default, skip_serializing_if = "Option::is_none")]
152    pub output_directory: Option<PathBuf>,
153    /// Output filename. Defaults to the underlying ATOF exporter timestamped filename.
154    #[serde(default, skip_serializing_if = "Option::is_none")]
155    pub filename: Option<String>,
156    /// File open mode: `append` or `overwrite`.
157    #[serde(default = "default_atof_mode")]
158    #[cfg_attr(feature = "schema", schemars(schema_with = "atof_mode_schema"))]
159    pub mode: String,
160}
161
162impl Default for AtofSectionConfig {
163    fn default() -> Self {
164        Self {
165            enabled: false,
166            output_directory: None,
167            filename: None,
168            mode: default_atof_mode(),
169        }
170    }
171}
172
173/// Per-agent ATIF trajectory exporter config.
174///
175/// When enabled, this section creates a dispatcher that opens a separate
176/// [`crate::observability::atif::AtifExporter`] for each top-level agent scope. The `{session_id}`
177/// placeholder in [`AtifSectionConfig::filename_template`] is required so
178/// concurrent sibling agents cannot overwrite each other's trajectory files.
179#[derive(Debug, Clone, Serialize, Deserialize)]
180#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
181pub struct AtifSectionConfig {
182    /// Whether ATIF export is active.
183    #[serde(default)]
184    pub enabled: bool,
185    /// Human-readable agent name.
186    #[serde(default = "default_agent_name")]
187    pub agent_name: String,
188    /// Agent version string.
189    #[serde(default = "default_agent_version")]
190    pub agent_version: String,
191    /// Default model name.
192    #[serde(default = "default_model_name")]
193    pub model_name: String,
194    /// Tool definitions available to the agent.
195    #[serde(default, skip_serializing_if = "Option::is_none")]
196    pub tool_definitions: Option<Vec<Json>>,
197    /// Extra ATIF agent metadata.
198    #[serde(default, skip_serializing_if = "Option::is_none")]
199    pub extra: Option<Json>,
200    /// Directory containing trajectory JSON files.
201    #[serde(default, skip_serializing_if = "Option::is_none")]
202    pub output_directory: Option<PathBuf>,
203    /// Filename template. `{session_id}` is replaced with the top-level agent scope UUID.
204    #[serde(default = "default_atif_filename_template")]
205    pub filename_template: String,
206}
207
208impl Default for AtifSectionConfig {
209    fn default() -> Self {
210        Self {
211            enabled: false,
212            agent_name: default_agent_name(),
213            agent_version: default_agent_version(),
214            model_name: default_model_name(),
215            tool_definitions: None,
216            extra: None,
217            output_directory: None,
218            filename_template: default_atif_filename_template(),
219        }
220    }
221}
222
223/// Shared OTLP exporter config for OpenTelemetry and OpenInference.
224///
225/// The `opentelemetry` and `openinference` sections share the same shape but
226/// construct different subscriber implementations. Both sections are disabled
227/// by default and use `http_binary` transport unless configured otherwise.
228#[derive(Debug, Clone, Serialize, Deserialize)]
229#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
230pub struct OtlpSectionConfig {
231    /// Whether the subscriber is active.
232    #[serde(default)]
233    pub enabled: bool,
234    /// OTLP transport: `http_binary` or `grpc`.
235    #[serde(default = "default_otlp_transport")]
236    #[cfg_attr(feature = "schema", schemars(schema_with = "otlp_transport_schema"))]
237    pub transport: String,
238    /// OTLP endpoint.
239    #[serde(default, skip_serializing_if = "Option::is_none")]
240    pub endpoint: Option<String>,
241    /// Extra exporter headers or metadata.
242    #[serde(default)]
243    pub headers: HashMap<String, String>,
244    /// Extra resource attributes.
245    #[serde(default)]
246    pub resource_attributes: HashMap<String, String>,
247    /// `service.name` resource attribute.
248    #[serde(default = "default_service_name")]
249    pub service_name: String,
250    /// Optional `service.namespace` resource attribute.
251    #[serde(default, skip_serializing_if = "Option::is_none")]
252    pub service_namespace: Option<String>,
253    /// Optional `service.version` resource attribute.
254    #[serde(default, skip_serializing_if = "Option::is_none")]
255    pub service_version: Option<String>,
256    /// Instrumentation scope name.
257    #[serde(default, skip_serializing_if = "Option::is_none")]
258    pub instrumentation_scope: Option<String>,
259    /// Export timeout in milliseconds.
260    #[serde(default = "default_timeout_millis")]
261    pub timeout_millis: u64,
262}
263
264impl Default for OtlpSectionConfig {
265    fn default() -> Self {
266        Self {
267            enabled: false,
268            transport: default_otlp_transport(),
269            endpoint: None,
270            headers: HashMap::new(),
271            resource_attributes: HashMap::new(),
272            service_name: default_service_name(),
273            service_namespace: None,
274            service_version: None,
275            instrumentation_scope: None,
276            timeout_millis: default_timeout_millis(),
277        }
278    }
279}
280
281crate::editor_config! {
282    impl ObservabilityConfig {
283        atof => {
284            label: "ATOF",
285            kind: Section,
286            optional: true,
287            nested: AtofSectionConfig,
288            default: AtofSectionConfig,
289        },
290        atif => {
291            label: "ATIF",
292            kind: Section,
293            optional: true,
294            nested: AtifSectionConfig,
295            default: AtifSectionConfig,
296        },
297        opentelemetry => {
298            label: "OpenTelemetry",
299            kind: Section,
300            optional: true,
301            nested: OtlpSectionConfig,
302            default: OtlpSectionConfig,
303        },
304        openinference => {
305            label: "OpenInference",
306            kind: Section,
307            optional: true,
308            nested: OtlpSectionConfig,
309            default: OtlpSectionConfig,
310        },
311        policy => {
312            label: "policy",
313            kind: Section,
314            nested: ConfigPolicy,
315            default: ConfigPolicy,
316        },
317    }
318}
319
320crate::editor_config! {
321    impl AtofSectionConfig {
322        enabled => { label: "enabled", kind: Boolean },
323        output_directory => { label: "output_directory", kind: String, optional: true },
324        filename => { label: "filename", kind: String, optional: true },
325        mode => { label: "mode", kind: Enum, values: ["append", "overwrite"] },
326    }
327}
328
329crate::editor_config! {
330    impl AtifSectionConfig {
331        enabled => { label: "enabled", kind: Boolean },
332        agent_name => { label: "agent_name", kind: String },
333        agent_version => { label: "agent_version", kind: String },
334        model_name => { label: "model_name", kind: String },
335        tool_definitions => { label: "tool_definitions", kind: Json, optional: true },
336        extra => { label: "extra", kind: Json, optional: true },
337        output_directory => { label: "output_directory", kind: String, optional: true },
338        filename_template => { label: "filename_template", kind: String },
339    }
340}
341
342crate::editor_config! {
343    impl OtlpSectionConfig {
344        enabled => { label: "enabled", kind: Boolean },
345        transport => { label: "transport", kind: Enum, values: ["http_binary", "grpc"] },
346        endpoint => { label: "endpoint", kind: String, optional: true },
347        headers => { label: "headers", kind: StringMap },
348        resource_attributes => { label: "resource_attributes", kind: StringMap },
349        service_name => { label: "service_name", kind: String },
350        service_namespace => { label: "service_namespace", kind: String, optional: true },
351        service_version => { label: "service_version", kind: String, optional: true },
352        instrumentation_scope => { label: "instrumentation_scope", kind: String, optional: true },
353        timeout_millis => { label: "timeout_millis", kind: Integer },
354    }
355}
356
357struct ObservabilityPlugin;
358
359impl Plugin for ObservabilityPlugin {
360    fn plugin_kind(&self) -> &str {
361        OBSERVABILITY_PLUGIN_KIND
362    }
363
364    fn allows_multiple_components(&self) -> bool {
365        false
366    }
367
368    fn validate(&self, plugin_config: &Map<String, Json>) -> Vec<ConfigDiagnostic> {
369        validate_observability_plugin_config(plugin_config)
370    }
371
372    fn register<'a>(
373        &'a self,
374        plugin_config: &Map<String, Json>,
375        ctx: &'a mut PluginRegistrationContext,
376    ) -> Pin<Box<dyn Future<Output = PluginResult<()>> + Send + 'a>> {
377        let plugin_config = plugin_config.clone();
378        Box::pin(async move {
379            let config = parse_observability_config(&plugin_config)?;
380            register_observability(config, ctx)
381        })
382    }
383}
384
385/// Registers the observability component kind in the core plugin registry.
386///
387/// Calling this function more than once is safe. The core plugin APIs call it
388/// automatically before listing, looking up, validating, or initializing plugin
389/// components, so applications normally do not need to invoke it directly.
390pub fn register_observability_component() -> PluginResult<()> {
391    match register_plugin(Arc::new(ObservabilityPlugin)) {
392        Ok(()) => Ok(()),
393        Err(PluginError::RegistrationFailed(message)) if message.contains("already registered") => {
394            Ok(())
395        }
396        Err(err) => Err(err),
397    }
398}
399
400/// Deregisters the observability component kind from the core plugin registry.
401///
402/// This helper exists primarily for tests and specialized embedding scenarios.
403/// It removes the plugin kind from future registry lookups but does not clear an
404/// already active plugin configuration.
405pub fn deregister_observability_component() -> bool {
406    deregister_plugin(OBSERVABILITY_PLUGIN_KIND)
407}
408
409/// Returns the JSON Schema for the observability component configuration.
410#[cfg(feature = "schema")]
411pub fn observability_config_schema() -> serde_json::Value {
412    serde_json::to_value(schemars::schema_for!(ObservabilityConfig))
413        .expect("observability config schema should serialize")
414}
415
416#[cfg(feature = "schema")]
417fn atof_mode_schema(generator: &mut schemars::r#gen::SchemaGenerator) -> schemars::schema::Schema {
418    string_enum_schema(generator, &["append", "overwrite"], Some("append"))
419}
420
421#[cfg(feature = "schema")]
422fn otlp_transport_schema(
423    generator: &mut schemars::r#gen::SchemaGenerator,
424) -> schemars::schema::Schema {
425    string_enum_schema(generator, &["http_binary", "grpc"], Some("http_binary"))
426}
427
428#[cfg(feature = "schema")]
429fn string_enum_schema(
430    generator: &mut schemars::r#gen::SchemaGenerator,
431    values: &[&str],
432    default: Option<&str>,
433) -> schemars::schema::Schema {
434    let mut schema: schemars::schema::SchemaObject =
435        <String as schemars::JsonSchema>::json_schema(generator).into();
436    schema.enum_values = Some(
437        values
438            .iter()
439            .map(|value| Json::String((*value).into()))
440            .collect(),
441    );
442    if let Some(default) = default {
443        schema.metadata().default = Some(Json::String(default.into()));
444    }
445    schema.into()
446}
447
448fn register_observability(
449    config: ObservabilityConfig,
450    ctx: &mut PluginRegistrationContext,
451) -> PluginResult<()> {
452    if let Some(atof) = config.atof.filter(|section| section.enabled) {
453        register_atof_exporter(atof, ctx)?;
454    }
455    if let Some(atif) = config.atif.filter(|section| section.enabled) {
456        register_atif_dispatcher(atif, ctx)?;
457    }
458    if let Some(otel) = config.opentelemetry.filter(|section| section.enabled) {
459        register_opentelemetry(otel, ctx)?;
460    }
461    if let Some(openinference) = config.openinference.filter(|section| section.enabled) {
462        register_openinference(openinference, ctx)?;
463    }
464    Ok(())
465}
466
467fn register_atof_exporter(
468    section: AtofSectionConfig,
469    ctx: &mut PluginRegistrationContext,
470) -> PluginResult<()> {
471    let mode = AtofExporterMode::parse(&section.mode).ok_or_else(|| {
472        PluginError::InvalidConfig("ATOF mode must be 'append' or 'overwrite'".to_string())
473    })?;
474    let mut config = CoreAtofExporterConfig::new().with_mode(mode);
475    if let Some(output_directory) = section.output_directory {
476        config = config.with_output_directory(output_directory);
477    }
478    if let Some(filename) = section.filename {
479        config = config.with_filename(filename);
480    }
481
482    let exporter = Arc::new(AtofExporter::new(config).map_err(observability_registration_error)?);
483    ctx.register_subscriber("atof", exporter.subscriber())?;
484    ctx.add_registration(PluginRegistration::new(
485        "observability",
486        ctx.qualify_name("atof.shutdown"),
487        Box::new(move || {
488            exporter
489                .shutdown()
490                .map_err(observability_registration_error)
491        }),
492    ));
493    Ok(())
494}
495
496fn register_atif_dispatcher(
497    section: AtifSectionConfig,
498    ctx: &mut PluginRegistrationContext,
499) -> PluginResult<()> {
500    if !section.filename_template.contains("{session_id}") {
501        return Err(PluginError::InvalidConfig(
502            "ATIF filename_template must contain '{session_id}'".to_string(),
503        ));
504    }
505
506    let manager = Arc::new(Mutex::new(AtifDispatcher::new(section)));
507    let dispatcher = atif_dispatcher_subscriber(Arc::clone(&manager), ctx.qualify_name("atif-"));
508    ctx.register_subscriber("atif", dispatcher)?;
509    ctx.add_registration(PluginRegistration::new(
510        "observability",
511        ctx.qualify_name("atif.shutdown"),
512        Box::new(move || {
513            let work = {
514                let mut guard = manager.lock().map_err(|err| {
515                    PluginError::Internal(format!("ATIF dispatcher lock poisoned: {err}"))
516                })?;
517                guard
518                    .flush_open_agents()
519                    .map_err(observability_registration_error)?
520            };
521            for (scope_uuid, name) in work.scope_subscribers {
522                let _ = scope_deregister_subscriber(&scope_uuid, &name);
523            }
524            for write in work.writes {
525                let agent_uuid = write.agent_uuid;
526                let result = write_atif_file(&write);
527                let mut guard = manager.lock().map_err(|err| {
528                    PluginError::Internal(format!("ATIF dispatcher lock poisoned: {err}"))
529                })?;
530                guard
531                    .finish_agent_write(agent_uuid, result)
532                    .map_err(observability_registration_error)?;
533            }
534            let guard = manager.lock().map_err(|err| {
535                PluginError::Internal(format!("ATIF dispatcher lock poisoned: {err}"))
536            })?;
537            guard
538                .last_error_result()
539                .map_err(observability_registration_error)
540        }),
541    ));
542    Ok(())
543}
544
545#[cfg(feature = "otel")]
546fn register_opentelemetry(
547    section: OtlpSectionConfig,
548    ctx: &mut PluginRegistrationContext,
549) -> PluginResult<()> {
550    let subscriber = Arc::new(
551        OpenTelemetrySubscriber::new(build_otel_config(section)?)
552            .map_err(observability_registration_error)?,
553    );
554    ctx.register_subscriber("opentelemetry", subscriber.subscriber())?;
555    ctx.add_registration(PluginRegistration::new(
556        "observability",
557        ctx.qualify_name("opentelemetry.shutdown"),
558        Box::new(move || {
559            subscriber
560                .shutdown()
561                .map_err(observability_registration_error)
562        }),
563    ));
564    Ok(())
565}
566
567#[cfg(not(feature = "otel"))]
568fn register_opentelemetry(
569    _section: OtlpSectionConfig,
570    _ctx: &mut PluginRegistrationContext,
571) -> PluginResult<()> {
572    Err(PluginError::InvalidConfig(
573        "OpenTelemetry support is not enabled in this build".to_string(),
574    ))
575}
576
577#[cfg(feature = "openinference")]
578fn register_openinference(
579    section: OtlpSectionConfig,
580    ctx: &mut PluginRegistrationContext,
581) -> PluginResult<()> {
582    let subscriber = Arc::new(
583        OpenInferenceSubscriber::new(build_openinference_config(section)?)
584            .map_err(observability_registration_error)?,
585    );
586    ctx.register_subscriber("openinference", subscriber.subscriber())?;
587    ctx.add_registration(PluginRegistration::new(
588        "observability",
589        ctx.qualify_name("openinference.shutdown"),
590        Box::new(move || {
591            subscriber
592                .shutdown()
593                .map_err(observability_registration_error)
594        }),
595    ));
596    Ok(())
597}
598
599#[cfg(not(feature = "openinference"))]
600fn register_openinference(
601    _section: OtlpSectionConfig,
602    _ctx: &mut PluginRegistrationContext,
603) -> PluginResult<()> {
604    Err(PluginError::InvalidConfig(
605        "OpenInference support is not enabled in this build".to_string(),
606    ))
607}
608
609struct AtifDispatcher {
610    config: AtifSectionConfig,
611    agents: HashMap<Uuid, ManagedAtifExporter>,
612    scope_subscribers: HashMap<Uuid, String>,
613    last_error: Option<String>,
614}
615
616struct ManagedAtifExporter {
617    exporter: AtifExporter,
618    path: PathBuf,
619    observed_events: Vec<Event>,
620    written: bool,
621}
622
623struct PendingAtifWrite {
624    agent_uuid: Uuid,
625    path: PathBuf,
626    payload: Vec<u8>,
627}
628
629struct AtifFlushWork {
630    writes: Vec<PendingAtifWrite>,
631    scope_subscribers: Vec<(Uuid, String)>,
632}
633
634impl AtifDispatcher {
635    fn new(config: AtifSectionConfig) -> Self {
636        Self {
637            config,
638            agents: HashMap::new(),
639            scope_subscribers: HashMap::new(),
640            last_error: None,
641        }
642    }
643
644    fn observe_global(&mut self, event: &Event, subscriber_prefix: &str, state: Arc<Mutex<Self>>) {
645        if self.last_error.is_some() || !is_top_level_agent_start(event) {
646            return;
647        }
648
649        // The top-level agent scope UUID is the ATIF session ID. The global
650        // dispatcher records the start event itself because the scope-local
651        // subscriber is attached after that start event has already been
652        // emitted.
653        let session_id = event.uuid().to_string();
654        let exporter = AtifExporter::new(session_id.clone(), self.agent_info());
655        (exporter.subscriber())(event);
656        let path = self.output_path(&session_id);
657        self.agents.insert(
658            event.uuid(),
659            ManagedAtifExporter {
660                exporter,
661                path,
662                observed_events: vec![event.clone()],
663                written: false,
664            },
665        );
666
667        let agent_uuid = event.uuid();
668        let name = format!("{subscriber_prefix}{agent_uuid}");
669        let callback = atif_scope_subscriber(state, agent_uuid);
670        // Attach the per-agent subscriber to the agent scope rather than the
671        // global registry so sibling top-level agents never share events.
672        if let Err(err) = scope_register_subscriber(&agent_uuid, &name, callback) {
673            self.last_error = Some(format!("failed to register ATIF scope subscriber: {err}"));
674        } else {
675            self.scope_subscribers.insert(agent_uuid, name);
676        }
677    }
678
679    fn observe_scope(&mut self, event: &Event, agent_uuid: Uuid) -> Option<PendingAtifWrite> {
680        if self.last_error.is_some() {
681            return None;
682        }
683        let should_finalize =
684            event.uuid() == agent_uuid && event.scope_category() == Some(ScopeCategory::End);
685        let agent = self.agents.get_mut(&agent_uuid)?;
686        (agent.exporter.subscriber())(event);
687        agent.observed_events.push(event.clone());
688        if !should_finalize || agent.written {
689            return None;
690        }
691        match prepare_atif_file(agent_uuid, agent) {
692            Ok(write) => Some(write),
693            Err(err) => {
694                self.last_error = Some(err.to_string());
695                None
696            }
697        }
698    }
699
700    fn complete_scope_write(
701        &mut self,
702        agent_uuid: Uuid,
703        result: std::io::Result<()>,
704    ) -> Option<(Uuid, String)> {
705        if self.finish_agent_write(agent_uuid, result).is_err() {
706            return None;
707        }
708        self.agents.remove(&agent_uuid);
709        self.scope_subscribers
710            .remove(&agent_uuid)
711            .map(|name| (agent_uuid, name))
712    }
713
714    fn flush_open_agents(&mut self) -> std::io::Result<AtifFlushWork> {
715        // Plugin teardown may run before an agent scope closes. Remove dynamic
716        // scope-local subscribers first so the later scope end event cannot
717        // trigger a second write after the dispatcher has flushed.
718        let scope_subscribers = std::mem::take(&mut self.scope_subscribers)
719            .into_iter()
720            .collect();
721        let agent_uuids = self
722            .agents
723            .iter()
724            .filter_map(|(agent_uuid, agent)| (!agent.written).then_some(*agent_uuid))
725            .collect::<Vec<_>>();
726        let mut writes = Vec::with_capacity(agent_uuids.len());
727        for agent_uuid in agent_uuids {
728            if let Some(agent) = self.agents.get_mut(&agent_uuid) {
729                writes.push(prepare_atif_file(agent_uuid, agent)?);
730            }
731        }
732        Ok(AtifFlushWork {
733            writes,
734            scope_subscribers,
735        })
736    }
737
738    fn finish_agent_write(
739        &mut self,
740        agent_uuid: Uuid,
741        result: std::io::Result<()>,
742    ) -> std::io::Result<()> {
743        match result {
744            Ok(()) => {
745                if let Some(agent) = self.agents.get_mut(&agent_uuid) {
746                    agent.observed_events.clear();
747                }
748                Ok(())
749            }
750            Err(err) => {
751                if let Some(agent) = self.agents.get_mut(&agent_uuid) {
752                    agent.written = false;
753                }
754                self.last_error = Some(err.to_string());
755                Err(err)
756            }
757        }
758    }
759
760    fn last_error_result(&self) -> std::io::Result<()> {
761        if let Some(message) = &self.last_error {
762            return Err(std::io::Error::other(message.clone()));
763        }
764        Ok(())
765    }
766
767    fn agent_info(&self) -> AtifAgentInfo {
768        AtifAgentInfo {
769            name: self.config.agent_name.clone(),
770            version: self.config.agent_version.clone(),
771            model_name: Some(self.config.model_name.clone()),
772            tool_definitions: self.config.tool_definitions.clone(),
773            extra: self.config.extra.clone(),
774        }
775    }
776
777    fn output_path(&self, session_id: &str) -> PathBuf {
778        let directory = self
779            .config
780            .output_directory
781            .clone()
782            .unwrap_or_else(default_output_directory);
783        let filename = self
784            .config
785            .filename_template
786            .replace("{session_id}", session_id);
787        directory.join(filename)
788    }
789}
790
791fn atif_dispatcher_subscriber(
792    manager: Arc<Mutex<AtifDispatcher>>,
793    subscriber_prefix: String,
794) -> EventSubscriberFn {
795    Arc::new(move |event: &Event| {
796        let Ok(mut guard) = manager.lock() else {
797            return;
798        };
799        guard.observe_global(event, &subscriber_prefix, Arc::clone(&manager));
800    })
801}
802
803fn atif_scope_subscriber(
804    manager: Arc<Mutex<AtifDispatcher>>,
805    agent_uuid: Uuid,
806) -> EventSubscriberFn {
807    Arc::new(move |event: &Event| {
808        let pending_write = {
809            let Ok(mut guard) = manager.lock() else {
810                return;
811            };
812            guard.observe_scope(event, agent_uuid)
813        };
814        let Some(write) = pending_write else {
815            return;
816        };
817        let result = write_atif_file(&write);
818        let scope_subscriber = {
819            let Ok(mut guard) = manager.lock() else {
820                return;
821            };
822            guard.complete_scope_write(write.agent_uuid, result)
823        };
824        if let Some((scope_uuid, name)) = scope_subscriber {
825            let _ = scope_deregister_subscriber(&scope_uuid, &name);
826        }
827    })
828}
829
830fn prepare_atif_file(
831    agent_uuid: Uuid,
832    agent: &mut ManagedAtifExporter,
833) -> std::io::Result<PendingAtifWrite> {
834    let trajectory = agent.exporter.export();
835    let mut value = serde_json::to_value(trajectory)?;
836    if let Some(object) = value.as_object_mut() {
837        object.insert(
838            "extra".to_string(),
839            serde_json::json!({
840                "observed_events": agent.observed_events,
841            }),
842        );
843    }
844    let payload = serde_json::to_vec_pretty(&value)?;
845    agent.written = true;
846    Ok(PendingAtifWrite {
847        agent_uuid,
848        path: agent.path.clone(),
849        payload,
850    })
851}
852
853fn write_atif_file(write: &PendingAtifWrite) -> std::io::Result<()> {
854    if let Some(parent) = write.path.parent() {
855        std::fs::create_dir_all(parent)?;
856    }
857    std::fs::write(&write.path, &write.payload)?;
858    Ok(())
859}
860
861fn is_top_level_agent_start(event: &Event) -> bool {
862    if event.scope_category() != Some(ScopeCategory::Start)
863        || event.scope_type() != Some(ScopeType::Agent)
864    {
865        return false;
866    }
867    let Some(parent_uuid) = event.parent_uuid() else {
868        return false;
869    };
870    current_scope_stack()
871        .read()
872        .map(|stack| stack.root_uuid() == parent_uuid)
873        .unwrap_or(false)
874}
875
876#[cfg(feature = "otel")]
877fn build_otel_config(section: OtlpSectionConfig) -> PluginResult<CoreOpenTelemetryConfig> {
878    let mut config = match section.transport.as_str() {
879        "http_binary" => CoreOpenTelemetryConfig::http_binary(section.service_name),
880        "grpc" => CoreOpenTelemetryConfig::grpc(section.service_name),
881        other => {
882            return Err(PluginError::InvalidConfig(format!(
883                "OpenTelemetry transport must be 'http_binary' or 'grpc', got {other:?}"
884            )));
885        }
886    }
887    .with_timeout(Duration::from_millis(section.timeout_millis));
888
889    if let Some(endpoint) = section.endpoint {
890        config = config.with_endpoint(endpoint);
891    }
892    if let Some(namespace) = section.service_namespace {
893        config = config.with_service_namespace(namespace);
894    }
895    if let Some(version) = section.service_version {
896        config = config.with_service_version(version);
897    }
898    if let Some(scope) = section.instrumentation_scope {
899        config = config.with_instrumentation_scope(scope);
900    }
901    for (key, value) in section.headers {
902        config = config.with_header(key, value);
903    }
904    for (key, value) in section.resource_attributes {
905        config = config.with_resource_attribute(key, value);
906    }
907    Ok(config)
908}
909
910#[cfg(feature = "openinference")]
911fn build_openinference_config(section: OtlpSectionConfig) -> PluginResult<CoreOpenInferenceConfig> {
912    let transport = match section.transport.as_str() {
913        "http_binary" => OpenInferenceTransport::HttpBinary,
914        "grpc" => OpenInferenceTransport::Grpc,
915        other => {
916            return Err(PluginError::InvalidConfig(format!(
917                "OpenInference transport must be 'http_binary' or 'grpc', got {other:?}"
918            )));
919        }
920    };
921    let mut config = CoreOpenInferenceConfig::new()
922        .with_transport(transport)
923        .with_service_name(section.service_name)
924        .with_timeout(Duration::from_millis(section.timeout_millis));
925
926    if let Some(endpoint) = section.endpoint {
927        config = config.with_endpoint(endpoint);
928    }
929    if let Some(namespace) = section.service_namespace {
930        config = config.with_service_namespace(namespace);
931    }
932    if let Some(version) = section.service_version {
933        config = config.with_service_version(version);
934    }
935    if let Some(scope) = section.instrumentation_scope {
936        config = config.with_instrumentation_scope(scope);
937    }
938    for (key, value) in section.headers {
939        config = config.with_header(key, value);
940    }
941    for (key, value) in section.resource_attributes {
942        config = config.with_resource_attribute(key, value);
943    }
944    Ok(config)
945}
946
947fn parse_observability_config(
948    plugin_config: &Map<String, Json>,
949) -> PluginResult<ObservabilityConfig> {
950    serde_json::from_value(Json::Object(plugin_config.clone())).map_err(|err| {
951        PluginError::InvalidConfig(format!("invalid observability plugin config: {err}"))
952    })
953}
954
955fn validate_observability_plugin_config(
956    plugin_config: &Map<String, Json>,
957) -> Vec<ConfigDiagnostic> {
958    let config = match parse_observability_config(plugin_config) {
959        Ok(config) => config,
960        Err(err) => {
961            return vec![ConfigDiagnostic {
962                level: DiagnosticLevel::Error,
963                code: "observability.invalid_plugin_config".to_string(),
964                component: Some(OBSERVABILITY_PLUGIN_KIND.to_string()),
965                field: None,
966                message: err.to_string(),
967            }];
968        }
969    };
970
971    let mut diagnostics = vec![];
972    validate_unknown_fields(
973        &mut diagnostics,
974        &config.policy,
975        Some(OBSERVABILITY_PLUGIN_KIND.to_string()),
976        plugin_config,
977        &[
978            "version",
979            "atof",
980            "atif",
981            "opentelemetry",
982            "openinference",
983            "policy",
984        ],
985    );
986
987    validate_version(&mut diagnostics, &config.policy, config.version);
988    validate_policy_fields(&mut diagnostics, &config.policy, plugin_config);
989    validate_section_fields(
990        &mut diagnostics,
991        &config.policy,
992        plugin_config,
993        "atof",
994        &["enabled", "output_directory", "filename", "mode"],
995    );
996    validate_section_fields(
997        &mut diagnostics,
998        &config.policy,
999        plugin_config,
1000        "atif",
1001        &[
1002            "enabled",
1003            "agent_name",
1004            "agent_version",
1005            "model_name",
1006            "tool_definitions",
1007            "extra",
1008            "output_directory",
1009            "filename_template",
1010        ],
1011    );
1012    validate_section_fields(
1013        &mut diagnostics,
1014        &config.policy,
1015        plugin_config,
1016        "opentelemetry",
1017        &[
1018            "enabled",
1019            "transport",
1020            "endpoint",
1021            "headers",
1022            "resource_attributes",
1023            "service_name",
1024            "service_namespace",
1025            "service_version",
1026            "instrumentation_scope",
1027            "timeout_millis",
1028        ],
1029    );
1030    validate_section_fields(
1031        &mut diagnostics,
1032        &config.policy,
1033        plugin_config,
1034        "openinference",
1035        &[
1036            "enabled",
1037            "transport",
1038            "endpoint",
1039            "headers",
1040            "resource_attributes",
1041            "service_name",
1042            "service_namespace",
1043            "service_version",
1044            "instrumentation_scope",
1045            "timeout_millis",
1046        ],
1047    );
1048
1049    if let Some(section) = &config.atof {
1050        validate_atof_values(&mut diagnostics, &config.policy, section);
1051        #[cfg(target_arch = "wasm32")]
1052        if section.enabled {
1053            push_policy_diag(
1054                &mut diagnostics,
1055                config.policy.unsupported_value,
1056                "observability.unsupported_value",
1057                Some("atof".to_string()),
1058                Some("enabled".to_string()),
1059                "ATOF file export is not supported on WebAssembly".to_string(),
1060            );
1061        }
1062    }
1063    if let Some(section) = &config.atif {
1064        validate_atif_values(&mut diagnostics, &config.policy, section);
1065        #[cfg(target_arch = "wasm32")]
1066        if section.enabled {
1067            push_policy_diag(
1068                &mut diagnostics,
1069                config.policy.unsupported_value,
1070                "observability.unsupported_value",
1071                Some("atif".to_string()),
1072                Some("enabled".to_string()),
1073                "ATIF file export is not supported on WebAssembly".to_string(),
1074            );
1075        }
1076    }
1077    if let Some(section) = &config.opentelemetry {
1078        validate_otlp_values(&mut diagnostics, &config.policy, "opentelemetry", section);
1079        #[cfg(not(feature = "otel"))]
1080        if section.enabled {
1081            push_policy_diag(
1082                &mut diagnostics,
1083                config.policy.unsupported_value,
1084                "observability.feature_disabled",
1085                Some("opentelemetry".to_string()),
1086                Some("enabled".to_string()),
1087                "OpenTelemetry support is not enabled in this build".to_string(),
1088            );
1089        }
1090    }
1091    if let Some(section) = &config.openinference {
1092        validate_otlp_values(&mut diagnostics, &config.policy, "openinference", section);
1093        #[cfg(not(feature = "openinference"))]
1094        if section.enabled {
1095            push_policy_diag(
1096                &mut diagnostics,
1097                config.policy.unsupported_value,
1098                "observability.feature_disabled",
1099                Some("openinference".to_string()),
1100                Some("enabled".to_string()),
1101                "OpenInference support is not enabled in this build".to_string(),
1102            );
1103        }
1104    }
1105
1106    diagnostics
1107}
1108
1109fn validate_version(diagnostics: &mut Vec<ConfigDiagnostic>, policy: &ConfigPolicy, version: u32) {
1110    if version != 1 {
1111        push_policy_diag(
1112            diagnostics,
1113            policy.unsupported_value,
1114            "observability.unsupported_config_version",
1115            Some(OBSERVABILITY_PLUGIN_KIND.to_string()),
1116            Some("version".to_string()),
1117            format!("observability config version {version} is unsupported"),
1118        );
1119    }
1120}
1121
1122fn validate_policy_fields(
1123    diagnostics: &mut Vec<ConfigDiagnostic>,
1124    policy: &ConfigPolicy,
1125    plugin_config: &Map<String, Json>,
1126) {
1127    if let Some(policy_json) = plugin_config.get("policy").and_then(Json::as_object) {
1128        validate_unknown_fields(
1129            diagnostics,
1130            policy,
1131            Some("policy".to_string()),
1132            policy_json,
1133            &["unknown_component", "unknown_field", "unsupported_value"],
1134        );
1135    }
1136}
1137
1138fn validate_section_fields(
1139    diagnostics: &mut Vec<ConfigDiagnostic>,
1140    policy: &ConfigPolicy,
1141    plugin_config: &Map<String, Json>,
1142    section: &str,
1143    known_fields: &[&str],
1144) {
1145    if let Some(section_json) = plugin_config.get(section).and_then(Json::as_object) {
1146        validate_unknown_fields(
1147            diagnostics,
1148            policy,
1149            Some(section.to_string()),
1150            section_json,
1151            known_fields,
1152        );
1153    }
1154}
1155
1156fn validate_atof_values(
1157    diagnostics: &mut Vec<ConfigDiagnostic>,
1158    policy: &ConfigPolicy,
1159    section: &AtofSectionConfig,
1160) {
1161    if AtofExporterMode::parse(&section.mode).is_none() {
1162        push_policy_diag(
1163            diagnostics,
1164            policy.unsupported_value,
1165            "observability.unsupported_value",
1166            Some("atof".to_string()),
1167            Some("mode".to_string()),
1168            "ATOF mode must be 'append' or 'overwrite'".to_string(),
1169        );
1170    }
1171}
1172
1173fn validate_atif_values(
1174    diagnostics: &mut Vec<ConfigDiagnostic>,
1175    policy: &ConfigPolicy,
1176    section: &AtifSectionConfig,
1177) {
1178    if !section.filename_template.contains("{session_id}") {
1179        push_policy_diag(
1180            diagnostics,
1181            policy.unsupported_value,
1182            "observability.unsupported_value",
1183            Some("atif".to_string()),
1184            Some("filename_template".to_string()),
1185            "ATIF filename_template must contain '{session_id}'".to_string(),
1186        );
1187    }
1188}
1189
1190fn validate_otlp_values(
1191    diagnostics: &mut Vec<ConfigDiagnostic>,
1192    policy: &ConfigPolicy,
1193    section_name: &str,
1194    section: &OtlpSectionConfig,
1195) {
1196    if !matches!(section.transport.as_str(), "http_binary" | "grpc") {
1197        push_policy_diag(
1198            diagnostics,
1199            policy.unsupported_value,
1200            "observability.unsupported_value",
1201            Some(section_name.to_string()),
1202            Some("transport".to_string()),
1203            format!("{section_name} transport must be 'http_binary' or 'grpc'"),
1204        );
1205    }
1206}
1207
1208fn validate_unknown_fields(
1209    diagnostics: &mut Vec<ConfigDiagnostic>,
1210    policy: &ConfigPolicy,
1211    component: Option<String>,
1212    config: &Map<String, Json>,
1213    known_fields: &[&str],
1214) {
1215    for field in config.keys() {
1216        if !known_fields.contains(&field.as_str()) {
1217            push_policy_diag(
1218                diagnostics,
1219                policy.unknown_field,
1220                "observability.unknown_field",
1221                component.clone(),
1222                Some(field.clone()),
1223                format!(
1224                    "field '{}' is not recognized for '{}'",
1225                    field,
1226                    component.as_deref().unwrap_or("unknown")
1227                ),
1228            );
1229        }
1230    }
1231}
1232
1233fn push_policy_diag(
1234    diagnostics: &mut Vec<ConfigDiagnostic>,
1235    behavior: UnsupportedBehavior,
1236    code: &str,
1237    component: Option<String>,
1238    field: Option<String>,
1239    message: String,
1240) {
1241    let level = match behavior {
1242        UnsupportedBehavior::Ignore => return,
1243        UnsupportedBehavior::Warn => DiagnosticLevel::Warning,
1244        UnsupportedBehavior::Error => DiagnosticLevel::Error,
1245    };
1246    diagnostics.push(ConfigDiagnostic {
1247        level,
1248        code: code.to_string(),
1249        component,
1250        field,
1251        message,
1252    });
1253}
1254
1255fn observability_registration_error(error: impl std::fmt::Display) -> PluginError {
1256    PluginError::RegistrationFailed(error.to_string())
1257}
1258
1259fn default_observability_config_version() -> u32 {
1260    1
1261}
1262
1263fn default_atof_mode() -> String {
1264    "append".to_string()
1265}
1266
1267fn default_agent_name() -> String {
1268    "NeMo Flow".to_string()
1269}
1270
1271fn default_agent_version() -> String {
1272    env!("CARGO_PKG_VERSION").to_string()
1273}
1274
1275fn default_model_name() -> String {
1276    "unknown".to_string()
1277}
1278
1279fn default_atif_filename_template() -> String {
1280    "nemo-flow-atif-{session_id}.json".to_string()
1281}
1282
1283fn default_otlp_transport() -> String {
1284    "http_binary".to_string()
1285}
1286
1287fn default_service_name() -> String {
1288    "nemo-flow".to_string()
1289}
1290
1291fn default_timeout_millis() -> u64 {
1292    3_000
1293}
1294
1295fn default_output_directory() -> PathBuf {
1296    std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))
1297}
1298
1299#[cfg(test)]
1300#[path = "../../tests/unit/observability/plugin_component_tests.rs"]
1301mod tests;