nemo_flow/observability/
atof.rs1use std::fs::{File, OpenOptions};
12use std::io::{BufWriter, Write};
13use std::path::{Path, PathBuf};
14use std::sync::{Arc, Mutex};
15
16use chrono::Utc;
17
18use crate::api::event::Event;
19use crate::api::runtime::EventSubscriberFn;
20use crate::api::subscriber::{deregister_subscriber, register_subscriber};
21use crate::error::FlowError;
22
23pub type Result<T> = std::result::Result<T, AtofExporterError>;
25
26#[derive(Debug, thiserror::Error)]
28pub enum AtofExporterError {
29 #[error("failed to resolve current working directory: {0}")]
31 CurrentDirectory(std::io::Error),
32 #[error("failed to open ATOF output file {path:?}: {source}")]
34 OpenFile {
35 path: PathBuf,
37 source: std::io::Error,
39 },
40 #[error("failed to flush ATOF output file {path:?}: {source}")]
42 Flush {
43 path: PathBuf,
45 source: std::io::Error,
47 },
48 #[error("previous ATOF export failed for {path:?}: {message}")]
50 StoredFailure {
51 path: PathBuf,
53 message: String,
55 },
56 #[error("the ATOF exporter state lock was poisoned")]
58 LockPoisoned,
59 #[error(transparent)]
61 Runtime(#[from] FlowError),
62}
63
64#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
66pub enum AtofExporterMode {
67 #[default]
69 Append,
70 Overwrite,
72}
73
74impl AtofExporterMode {
75 pub fn parse(value: &str) -> Option<Self> {
77 match value {
78 "append" => Some(Self::Append),
79 "overwrite" => Some(Self::Overwrite),
80 _ => None,
81 }
82 }
83
84 pub fn as_str(self) -> &'static str {
86 match self {
87 Self::Append => "append",
88 Self::Overwrite => "overwrite",
89 }
90 }
91}
92
93#[derive(Debug, Clone, PartialEq, Eq)]
95pub struct AtofExporterConfig {
96 pub output_directory: PathBuf,
98 pub mode: AtofExporterMode,
100 pub filename: String,
102}
103
104impl Default for AtofExporterConfig {
105 fn default() -> Self {
106 Self {
107 output_directory: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
108 mode: AtofExporterMode::Append,
109 filename: default_filename(),
110 }
111 }
112}
113
114impl AtofExporterConfig {
115 pub fn new() -> Self {
117 Self::default()
118 }
119
120 pub fn with_output_directory(mut self, output_directory: impl Into<PathBuf>) -> Self {
122 self.output_directory = output_directory.into();
123 self
124 }
125
126 pub fn with_mode(mut self, mode: AtofExporterMode) -> Self {
128 self.mode = mode;
129 self
130 }
131
132 pub fn with_filename(mut self, filename: impl Into<String>) -> Self {
134 self.filename = filename.into();
135 self
136 }
137
138 pub fn path(&self) -> PathBuf {
140 self.output_directory.join(&self.filename)
141 }
142}
143
144struct AtofExporterState {
145 writer: BufWriter<File>,
146 last_error: Option<String>,
147}
148
149pub struct AtofExporter {
151 path: PathBuf,
152 state: Arc<Mutex<AtofExporterState>>,
153}
154
155impl AtofExporter {
156 pub fn new(config: AtofExporterConfig) -> Result<Self> {
158 let path = config.path();
159 let file = open_file(&path, config.mode)?;
160 Ok(Self {
161 path,
162 state: Arc::new(Mutex::new(AtofExporterState {
163 writer: BufWriter::new(file),
164 last_error: None,
165 })),
166 })
167 }
168
169 pub fn path(&self) -> &Path {
171 self.path.as_path()
172 }
173
174 pub fn subscriber(&self) -> EventSubscriberFn {
176 let state = Arc::clone(&self.state);
177 Arc::new(move |event: &Event| {
178 let Ok(mut state) = state.lock() else {
179 return;
180 };
181 if state.last_error.is_some() {
182 return;
183 }
184 if let Err(error) = write_event(&mut state.writer, event) {
185 state.last_error = Some(error);
186 }
187 })
188 }
189
190 pub fn register(&self, name: &str) -> Result<()> {
192 register_subscriber(name, self.subscriber()).map_err(Into::into)
193 }
194
195 pub fn deregister(&self, name: &str) -> Result<bool> {
197 deregister_subscriber(name).map_err(Into::into)
198 }
199
200 pub fn force_flush(&self) -> Result<()> {
202 let mut state = self
203 .state
204 .lock()
205 .map_err(|_| AtofExporterError::LockPoisoned)?;
206 state
207 .writer
208 .flush()
209 .map_err(|source| AtofExporterError::Flush {
210 path: self.path.clone(),
211 source,
212 })?;
213 if let Some(message) = &state.last_error {
214 return Err(AtofExporterError::StoredFailure {
215 path: self.path.clone(),
216 message: message.clone(),
217 });
218 }
219 Ok(())
220 }
221
222 pub fn shutdown(&self) -> Result<()> {
224 self.force_flush()
225 }
226}
227
228fn default_filename() -> String {
229 format!(
230 "nemo-flow-events-{}.jsonl",
231 Utc::now().format("%Y-%m-%d-%H.%M.%S")
232 )
233}
234
235fn open_file(path: &Path, mode: AtofExporterMode) -> Result<File> {
236 let mut options = OpenOptions::new();
237 options.create(true);
238 match mode {
239 AtofExporterMode::Append => {
240 options.append(true);
241 }
242 AtofExporterMode::Overwrite => {
243 options.write(true).truncate(true);
244 }
245 }
246 options
247 .open(path)
248 .map_err(|source| AtofExporterError::OpenFile {
249 path: path.to_path_buf(),
250 source,
251 })
252}
253
254fn write_event(writer: &mut BufWriter<File>, event: &Event) -> std::result::Result<(), String> {
255 serde_json::to_writer(&mut *writer, event).map_err(|error| error.to_string())?;
256 writer.write_all(b"\n").map_err(|error| error.to_string())?;
257 writer.flush().map_err(|error| error.to_string())
258}
259
260#[cfg(test)]
265#[path = "../../tests/unit/observability/atof_tests.rs"]
266mod tests;