pipeflow 0.0.4

A lightweight, configuration-driven data pipeline framework
Documentation
//! System sinks for default system channel persistence.
//!
//! These sinks are built-in and write to default JSONL files, optionally configured
//! via the system config section.

use std::path::{Path, PathBuf};
use std::sync::Arc;

use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use tokio::fs::OpenOptions;
use tokio::io::AsyncWriteExt;
use tokio::sync::Mutex;

use crate::common::message::SharedMessage;
use crate::error::{Error, Result};
use crate::sink::Sink;

/// Built-in system sink ID for Dead Letter Queue output
pub const DLQ_SINK_ID: &str = "sink::system::dlq";

/// Built-in system sink ID for event output
pub const EVENT_SINK_ID: &str = "sink::system::event";

/// Built-in system sink ID for audit output
pub const AUDIT_SINK_ID: &str = "sink::system::audit";

/// All valid system sink IDs
pub const VALID_SYSTEM_SINK_IDS: &[&str] = &[DLQ_SINK_ID, EVENT_SINK_ID, AUDIT_SINK_ID];

/// Check if an ID is a valid system sink ID
pub fn is_valid_system_sink_id(id: &str) -> bool {
    VALID_SYSTEM_SINK_IDS.contains(&id)
}

/// System sink configuration (optional overrides for default file outputs).
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct SystemSinksConfig {
    /// Base directory for system sink files
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub dir: Option<PathBuf>,
    /// Override path for DLQ sink
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub dlq: Option<PathBuf>,
    /// Override path for event sink
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub event: Option<PathBuf>,
    /// Override path for audit sink
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub audit: Option<PathBuf>,
}

impl SystemSinksConfig {
    /// Merge another system sink config into this one (field-level override).
    pub fn merge(&mut self, other: &Self) {
        if other.dir.is_some() {
            self.dir = other.dir.clone();
        }
        if other.dlq.is_some() {
            self.dlq = other.dlq.clone();
        }
        if other.event.is_some() {
            self.event = other.event.clone();
        }
        if other.audit.is_some() {
            self.audit = other.audit.clone();
        }
    }

    fn path_for(&self, id: &str) -> Option<PathBuf> {
        match id {
            DLQ_SINK_ID => self.dlq.clone(),
            EVENT_SINK_ID => self.event.clone(),
            AUDIT_SINK_ID => self.audit.clone(),
            _ => None,
        }
    }
}

fn base_dir() -> PathBuf {
    std::env::var("PIPEFLOW_SYSTEM_SINK_DIR")
        .map(PathBuf::from)
        .unwrap_or_else(|_| PathBuf::from("data"))
}

fn default_filename_for(id: &str) -> Result<&'static str> {
    match id {
        DLQ_SINK_ID => Ok("system_dlq.jsonl"),
        EVENT_SINK_ID => Ok("system_event.jsonl"),
        AUDIT_SINK_ID => Ok("system_audit.jsonl"),
        _ => Err(Error::config(format!("Unknown system sink id '{}'", id))),
    }
}

fn resolve_path_for(id: &str, config: Option<&SystemSinksConfig>) -> Result<PathBuf> {
    if let Some(cfg) = config {
        if let Some(path) = cfg.path_for(id) {
            return Ok(path);
        }
        if let Some(dir) = cfg.dir.as_ref() {
            return Ok(dir.join(default_filename_for(id)?));
        }
    }

    let base = base_dir();
    Ok(base.join(default_filename_for(id)?))
}

fn ensure_parent_dir(path: &Path) -> Result<()> {
    if let Some(parent) = path.parent() {
        std::fs::create_dir_all(parent)?;
    }
    Ok(())
}

/// System sink that writes messages to a default JSONL file.
pub struct SystemSink {
    id: String,
    path: PathBuf,
    file: Arc<Mutex<tokio::fs::File>>,
}

impl SystemSink {
    /// Create a new system sink for the given ID.
    pub async fn new(id: &str, config: Option<&SystemSinksConfig>) -> Result<Self> {
        let path = resolve_path_for(id, config)?;
        ensure_parent_dir(&path)?;

        let file = OpenOptions::new()
            .create(true)
            .append(true)
            .open(&path)
            .await?;

        Ok(Self {
            id: id.to_string(),
            path,
            file: Arc::new(Mutex::new(file)),
        })
    }

    /// Return the resolved output path for this system sink.
    pub fn path(&self) -> &Path {
        &self.path
    }
}

#[async_trait]
impl Sink for SystemSink {
    fn id(&self) -> &str {
        &self.id
    }

    async fn process(&self, msg: SharedMessage) -> Result<()> {
        let json = serde_json::to_string(&*msg).map_err(|e| {
            Error::sink(format!(
                "System sink '{}' failed to serialize message: {}",
                self.id, e
            ))
        })?;

        let mut file = self.file.lock().await;
        file.write_all(json.as_bytes()).await?;
        file.write_all(b"\n").await?;
        Ok(())
    }
}