Skip to main content

nemo_flow/observability/
atof.rs

1// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Agent Trajectory Observability Format (ATOF) JSONL exporter support for NeMo
5//! Flow.
6//!
7//! The [`AtofExporter`] registers as an event subscriber and writes each raw
8//! NeMo Flow Agent Trajectory Observability Format (ATOF) event as one JSON
9//! object per JSONL line.
10
11use std::fs::{File, OpenOptions};
12use std::io::{BufWriter, Write};
13use std::path::{Path, PathBuf};
14use std::sync::{Arc, Mutex};
15
16use chrono::Utc;
17
18use crate::api::event::Event;
19use crate::api::runtime::EventSubscriberFn;
20use crate::api::subscriber::{deregister_subscriber, register_subscriber};
21use crate::error::FlowError;
22
23/// Result type for the ATOF JSONL exporter.
24pub type Result<T> = std::result::Result<T, AtofExporterError>;
25
26/// Errors produced while configuring or operating the ATOF JSONL exporter.
27#[derive(Debug, thiserror::Error)]
28pub enum AtofExporterError {
29    /// Failed to resolve the current working directory for default config.
30    #[error("failed to resolve current working directory: {0}")]
31    CurrentDirectory(std::io::Error),
32    /// Failed to open the output file.
33    #[error("failed to open ATOF output file {path:?}: {source}")]
34    OpenFile {
35        /// Output path that failed to open.
36        path: PathBuf,
37        /// Underlying I/O error.
38        source: std::io::Error,
39    },
40    /// Failed while flushing the output file.
41    #[error("failed to flush ATOF output file {path:?}: {source}")]
42    Flush {
43        /// Output path that failed to flush.
44        path: PathBuf,
45        /// Underlying I/O error.
46        source: std::io::Error,
47    },
48    /// The exporter recorded an earlier write or serialization error.
49    #[error("previous ATOF export failed for {path:?}: {message}")]
50    StoredFailure {
51        /// Output path associated with the failure.
52        path: PathBuf,
53        /// Stored failure message.
54        message: String,
55    },
56    /// The internal exporter state lock was poisoned.
57    #[error("the ATOF exporter state lock was poisoned")]
58    LockPoisoned,
59    /// Runtime subscriber registration failed.
60    #[error(transparent)]
61    Runtime(#[from] FlowError),
62}
63
64/// File write behavior for [`AtofExporter`].
65#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
66pub enum AtofExporterMode {
67    /// Append events to an existing file or create it if missing.
68    #[default]
69    Append,
70    /// Truncate an existing file when the exporter is created.
71    Overwrite,
72}
73
74impl AtofExporterMode {
75    /// Parse a string mode used by language bindings.
76    pub fn parse(value: &str) -> Option<Self> {
77        match value {
78            "append" => Some(Self::Append),
79            "overwrite" => Some(Self::Overwrite),
80            _ => None,
81        }
82    }
83
84    /// Return the stable string representation used by language bindings.
85    pub fn as_str(self) -> &'static str {
86        match self {
87            Self::Append => "append",
88            Self::Overwrite => "overwrite",
89        }
90    }
91}
92
93/// Configuration for [`AtofExporter`].
94#[derive(Debug, Clone, PartialEq, Eq)]
95pub struct AtofExporterConfig {
96    /// Directory that contains the JSONL output file.
97    pub output_directory: PathBuf,
98    /// Append or overwrite behavior used when opening the file.
99    pub mode: AtofExporterMode,
100    /// Output filename.
101    pub filename: String,
102}
103
104impl Default for AtofExporterConfig {
105    fn default() -> Self {
106        Self {
107            output_directory: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
108            mode: AtofExporterMode::Append,
109            filename: default_filename(),
110        }
111    }
112}
113
114impl AtofExporterConfig {
115    /// Create a config with defaults.
116    pub fn new() -> Self {
117        Self::default()
118    }
119
120    /// Override the output directory.
121    pub fn with_output_directory(mut self, output_directory: impl Into<PathBuf>) -> Self {
122        self.output_directory = output_directory.into();
123        self
124    }
125
126    /// Override the output mode.
127    pub fn with_mode(mut self, mode: AtofExporterMode) -> Self {
128        self.mode = mode;
129        self
130    }
131
132    /// Override the output filename.
133    pub fn with_filename(mut self, filename: impl Into<String>) -> Self {
134        self.filename = filename.into();
135        self
136    }
137
138    /// Return the full output path for this config.
139    pub fn path(&self) -> PathBuf {
140        self.output_directory.join(&self.filename)
141    }
142}
143
144struct AtofExporterState {
145    writer: BufWriter<File>,
146    last_error: Option<String>,
147}
148
149/// Filesystem-backed Agent Trajectory Observability Format (ATOF) JSONL event exporter.
150pub struct AtofExporter {
151    path: PathBuf,
152    state: Arc<Mutex<AtofExporterState>>,
153}
154
155impl AtofExporter {
156    /// Create a new exporter from config and open its output file.
157    pub fn new(config: AtofExporterConfig) -> Result<Self> {
158        let path = config.path();
159        let file = open_file(&path, config.mode)?;
160        Ok(Self {
161            path,
162            state: Arc::new(Mutex::new(AtofExporterState {
163                writer: BufWriter::new(file),
164                last_error: None,
165            })),
166        })
167    }
168
169    /// Return the output JSONL path.
170    pub fn path(&self) -> &Path {
171        self.path.as_path()
172    }
173
174    /// Return an event subscriber that writes one JSONL record per observed event.
175    pub fn subscriber(&self) -> EventSubscriberFn {
176        let state = Arc::clone(&self.state);
177        Arc::new(move |event: &Event| {
178            let Ok(mut state) = state.lock() else {
179                return;
180            };
181            if state.last_error.is_some() {
182                return;
183            }
184            if let Err(error) = write_event(&mut state.writer, event) {
185                state.last_error = Some(error);
186            }
187        })
188    }
189
190    /// Register this exporter globally under the given subscriber name.
191    pub fn register(&self, name: &str) -> Result<()> {
192        register_subscriber(name, self.subscriber()).map_err(Into::into)
193    }
194
195    /// Deregister a global subscriber by name.
196    pub fn deregister(&self, name: &str) -> Result<bool> {
197        deregister_subscriber(name).map_err(Into::into)
198    }
199
200    /// Flush the underlying file and report any stored write error.
201    pub fn force_flush(&self) -> Result<()> {
202        let mut state = self
203            .state
204            .lock()
205            .map_err(|_| AtofExporterError::LockPoisoned)?;
206        state
207            .writer
208            .flush()
209            .map_err(|source| AtofExporterError::Flush {
210                path: self.path.clone(),
211                source,
212            })?;
213        if let Some(message) = &state.last_error {
214            return Err(AtofExporterError::StoredFailure {
215                path: self.path.clone(),
216                message: message.clone(),
217            });
218        }
219        Ok(())
220    }
221
222    /// Shut down the exporter by flushing any buffered data.
223    pub fn shutdown(&self) -> Result<()> {
224        self.force_flush()
225    }
226}
227
228fn default_filename() -> String {
229    format!(
230        "nemo-flow-events-{}.jsonl",
231        Utc::now().format("%Y-%m-%d-%H.%M.%S")
232    )
233}
234
235fn open_file(path: &Path, mode: AtofExporterMode) -> Result<File> {
236    let mut options = OpenOptions::new();
237    options.create(true);
238    match mode {
239        AtofExporterMode::Append => {
240            options.append(true);
241        }
242        AtofExporterMode::Overwrite => {
243            options.write(true).truncate(true);
244        }
245    }
246    options
247        .open(path)
248        .map_err(|source| AtofExporterError::OpenFile {
249            path: path.to_path_buf(),
250            source,
251        })
252}
253
254fn write_event(writer: &mut BufWriter<File>, event: &Event) -> std::result::Result<(), String> {
255    serde_json::to_writer(&mut *writer, event).map_err(|error| error.to_string())?;
256    writer.write_all(b"\n").map_err(|error| error.to_string())?;
257    writer.flush().map_err(|error| error.to_string())
258}
259
260// ---------------------------------------------------------------------------
261// Tests
262// ---------------------------------------------------------------------------
263
264#[cfg(test)]
265#[path = "../../tests/unit/observability/atof_tests.rs"]
266mod tests;