use std::path::{Path, PathBuf};
use std::sync::Arc;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use tokio::fs::OpenOptions;
use tokio::io::AsyncWriteExt;
use tokio::sync::Mutex;
use crate::common::message::SharedMessage;
use crate::error::{Error, Result};
use crate::sink::Sink;
pub const DLQ_SINK_ID: &str = "sink::system::dlq";
pub const EVENT_SINK_ID: &str = "sink::system::event";
pub const AUDIT_SINK_ID: &str = "sink::system::audit";
pub const VALID_SYSTEM_SINK_IDS: &[&str] = &[DLQ_SINK_ID, EVENT_SINK_ID, AUDIT_SINK_ID];
pub fn is_valid_system_sink_id(id: &str) -> bool {
VALID_SYSTEM_SINK_IDS.contains(&id)
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct SystemSinksConfig {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub dir: Option<PathBuf>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub dlq: Option<PathBuf>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub event: Option<PathBuf>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub audit: Option<PathBuf>,
}
impl SystemSinksConfig {
pub fn merge(&mut self, other: &Self) {
if other.dir.is_some() {
self.dir = other.dir.clone();
}
if other.dlq.is_some() {
self.dlq = other.dlq.clone();
}
if other.event.is_some() {
self.event = other.event.clone();
}
if other.audit.is_some() {
self.audit = other.audit.clone();
}
}
fn path_for(&self, id: &str) -> Option<PathBuf> {
match id {
DLQ_SINK_ID => self.dlq.clone(),
EVENT_SINK_ID => self.event.clone(),
AUDIT_SINK_ID => self.audit.clone(),
_ => None,
}
}
}
fn base_dir() -> PathBuf {
std::env::var("PIPEFLOW_SYSTEM_SINK_DIR")
.map(PathBuf::from)
.unwrap_or_else(|_| PathBuf::from("data"))
}
fn default_filename_for(id: &str) -> Result<&'static str> {
match id {
DLQ_SINK_ID => Ok("system_dlq.jsonl"),
EVENT_SINK_ID => Ok("system_event.jsonl"),
AUDIT_SINK_ID => Ok("system_audit.jsonl"),
_ => Err(Error::config(format!("Unknown system sink id '{}'", id))),
}
}
fn resolve_path_for(id: &str, config: Option<&SystemSinksConfig>) -> Result<PathBuf> {
if let Some(cfg) = config {
if let Some(path) = cfg.path_for(id) {
return Ok(path);
}
if let Some(dir) = cfg.dir.as_ref() {
return Ok(dir.join(default_filename_for(id)?));
}
}
let base = base_dir();
Ok(base.join(default_filename_for(id)?))
}
fn ensure_parent_dir(path: &Path) -> Result<()> {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
Ok(())
}
pub struct SystemSink {
id: String,
path: PathBuf,
file: Arc<Mutex<tokio::fs::File>>,
}
impl SystemSink {
pub async fn new(id: &str, config: Option<&SystemSinksConfig>) -> Result<Self> {
let path = resolve_path_for(id, config)?;
ensure_parent_dir(&path)?;
let file = OpenOptions::new()
.create(true)
.append(true)
.open(&path)
.await?;
Ok(Self {
id: id.to_string(),
path,
file: Arc::new(Mutex::new(file)),
})
}
pub fn path(&self) -> &Path {
&self.path
}
}
#[async_trait]
impl Sink for SystemSink {
fn id(&self) -> &str {
&self.id
}
async fn process(&self, msg: SharedMessage) -> Result<()> {
let json = serde_json::to_string(&*msg).map_err(|e| {
Error::sink(format!(
"System sink '{}' failed to serialize message: {}",
self.id, e
))
})?;
let mut file = self.file.lock().await;
file.write_all(json.as_bytes()).await?;
file.write_all(b"\n").await?;
Ok(())
}
}