use std::fs::{File, OpenOptions};
use std::io::{BufWriter, Write};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use chrono::Utc;
use crate::api::event::Event;
use crate::api::runtime::EventSubscriberFn;
use crate::api::subscriber::{deregister_subscriber, register_subscriber};
use crate::error::FlowError;
pub type Result<T> = std::result::Result<T, AtofExporterError>;
#[derive(Debug, thiserror::Error)]
pub enum AtofExporterError {
#[error("failed to resolve current working directory: {0}")]
CurrentDirectory(std::io::Error),
#[error("failed to open ATOF output file {path:?}: {source}")]
OpenFile {
path: PathBuf,
source: std::io::Error,
},
#[error("failed to flush ATOF output file {path:?}: {source}")]
Flush {
path: PathBuf,
source: std::io::Error,
},
#[error("previous ATOF export failed for {path:?}: {message}")]
StoredFailure {
path: PathBuf,
message: String,
},
#[error("the ATOF exporter state lock was poisoned")]
LockPoisoned,
#[error(transparent)]
Runtime(#[from] FlowError),
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum AtofExporterMode {
#[default]
Append,
Overwrite,
}
impl AtofExporterMode {
pub fn parse(value: &str) -> Option<Self> {
match value {
"append" => Some(Self::Append),
"overwrite" => Some(Self::Overwrite),
_ => None,
}
}
pub fn as_str(self) -> &'static str {
match self {
Self::Append => "append",
Self::Overwrite => "overwrite",
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AtofExporterConfig {
pub output_directory: PathBuf,
pub mode: AtofExporterMode,
pub filename: String,
}
impl Default for AtofExporterConfig {
fn default() -> Self {
Self {
output_directory: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
mode: AtofExporterMode::Append,
filename: default_filename(),
}
}
}
impl AtofExporterConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_output_directory(mut self, output_directory: impl Into<PathBuf>) -> Self {
self.output_directory = output_directory.into();
self
}
pub fn with_mode(mut self, mode: AtofExporterMode) -> Self {
self.mode = mode;
self
}
pub fn with_filename(mut self, filename: impl Into<String>) -> Self {
self.filename = filename.into();
self
}
pub fn path(&self) -> PathBuf {
self.output_directory.join(&self.filename)
}
}
struct AtofExporterState {
writer: BufWriter<File>,
last_error: Option<String>,
}
pub struct AtofExporter {
path: PathBuf,
state: Arc<Mutex<AtofExporterState>>,
}
impl AtofExporter {
pub fn new(config: AtofExporterConfig) -> Result<Self> {
let path = config.path();
let file = open_file(&path, config.mode)?;
Ok(Self {
path,
state: Arc::new(Mutex::new(AtofExporterState {
writer: BufWriter::new(file),
last_error: None,
})),
})
}
pub fn path(&self) -> &Path {
self.path.as_path()
}
pub fn subscriber(&self) -> EventSubscriberFn {
let state = Arc::clone(&self.state);
Arc::new(move |event: &Event| {
let Ok(mut state) = state.lock() else {
return;
};
if state.last_error.is_some() {
return;
}
if let Err(error) = write_event(&mut state.writer, event) {
state.last_error = Some(error);
}
})
}
pub fn register(&self, name: &str) -> Result<()> {
register_subscriber(name, self.subscriber()).map_err(Into::into)
}
pub fn deregister(&self, name: &str) -> Result<bool> {
deregister_subscriber(name).map_err(Into::into)
}
pub fn force_flush(&self) -> Result<()> {
let mut state = self
.state
.lock()
.map_err(|_| AtofExporterError::LockPoisoned)?;
state
.writer
.flush()
.map_err(|source| AtofExporterError::Flush {
path: self.path.clone(),
source,
})?;
if let Some(message) = &state.last_error {
return Err(AtofExporterError::StoredFailure {
path: self.path.clone(),
message: message.clone(),
});
}
Ok(())
}
pub fn shutdown(&self) -> Result<()> {
self.force_flush()
}
}
fn default_filename() -> String {
format!(
"nemo-flow-events-{}.jsonl",
Utc::now().format("%Y-%m-%d-%H.%M.%S")
)
}
fn open_file(path: &Path, mode: AtofExporterMode) -> Result<File> {
let mut options = OpenOptions::new();
options.create(true);
match mode {
AtofExporterMode::Append => {
options.append(true);
}
AtofExporterMode::Overwrite => {
options.write(true).truncate(true);
}
}
options
.open(path)
.map_err(|source| AtofExporterError::OpenFile {
path: path.to_path_buf(),
source,
})
}
fn write_event(writer: &mut BufWriter<File>, event: &Event) -> std::result::Result<(), String> {
serde_json::to_writer(&mut *writer, event).map_err(|error| error.to_string())?;
writer.write_all(b"\n").map_err(|error| error.to_string())?;
writer.flush().map_err(|error| error.to_string())
}
#[cfg(test)]
#[path = "../../tests/unit/observability/atof_tests.rs"]
mod tests;