pipeflow 0.0.4

A lightweight, configuration-driven data pipeline framework
Documentation
use std::collections::HashMap;
use std::sync::Arc;

use tokio::sync::mpsc;

use crate::common::message::Message;
use crate::common::types::{Audit, Event, Severity};
use crate::config::SystemConfig;
use crate::error::{Error, Result};
use crate::sink::Sink;
use crate::source::Source;
use crate::source::system::{AUDIT_SOURCE_ID, DLQ_SOURCE_ID, EVENT_SOURCE_ID};

use super::PipelineMetrics;

const MAX_CHAIN_DEPTH: u8 = 8;

fn should_route_to_dlq(msg: &Message) -> bool {
    msg.meta.source_node != DLQ_SOURCE_ID && msg.meta.chain_depth < MAX_CHAIN_DEPTH
}

fn bump_chain_depth(msg: &Message) -> Message {
    let mut updated = msg.clone();
    updated.meta.chain_depth = updated.meta.chain_depth.saturating_add(1);
    updated
}

/// Handles to send messages to system sources.
#[derive(Clone, Debug)]
pub struct SystemChannels {
    /// Sender for Dead Letter Queue - receives original failed messages.
    pub dlq: mpsc::Sender<Message>,
    /// Sender for system events - receives structured error/event data.
    pub event: mpsc::Sender<Event>,
    /// Sender for audit records - receives operation audit data.
    pub audit: mpsc::Sender<Audit>,
}

/// Route a processing error to system channels (Event + DLQ).
pub async fn route_error_to_system(
    channels: &SystemChannels,
    metrics: &PipelineMetrics,
    node_id: &str,
    node_type: &str,
    error: &str,
    msg: &Message,
) {
    let event = Event::new(
        format!("{}_error", node_type),
        serde_json::json!({
            "node_id": node_id,
            "node_type": node_type,
            "error": error,
            "message_id": msg.meta.id.to_string(),
        }),
    );
    let _ = channels.event.send(event).await;

    if should_route_to_dlq(msg) {
        let _ = channels.dlq.send(bump_chain_depth(msg)).await;
        metrics.record_dlq();
    }
}

/// Route a buffer overflow event to system channels (Event with Critical severity).
pub async fn route_overflow_to_system(
    channels: &SystemChannels,
    node_id: &str,
    node_type: &str,
    dropped_count: u64,
) {
    let event = Event::new(
        "buffer_overflow",
        serde_json::json!({
            "node_id": node_id,
            "node_type": node_type,
            "dropped_count": dropped_count,
        }),
    )
    .with_severity(Severity::Critical);
    let _ = channels.event.send(event).await;
}

pub fn build_system_source(
    source_id: &str,
    pending_system_receivers: &mut HashMap<String, Box<dyn std::any::Any + Send>>,
) -> Result<Arc<dyn Source>> {
    use crate::source::system::{AuditSource, DlqSource, EventSource, is_valid_system_source_id};
    use tokio::sync::mpsc::Receiver;

    if !is_valid_system_source_id(source_id) {
        return Err(Error::config(format!(
            "Invalid system source ID '{}'",
            source_id
        )));
    }

    let receiver = pending_system_receivers.remove(source_id).ok_or_else(|| {
        Error::config(format!(
            "System receiver for '{}' not found. This may indicate a duplicate source ID.",
            source_id
        ))
    })?;

    match source_id {
        DLQ_SOURCE_ID => {
            let rx = receiver
                .downcast::<Receiver<Message>>()
                .map_err(|_| Error::config("Failed to downcast DLQ receiver"))?;
            Ok(Arc::new(DlqSource::new(*rx)) as Arc<dyn Source>)
        }
        EVENT_SOURCE_ID => {
            let rx = receiver
                .downcast::<Receiver<Event>>()
                .map_err(|_| Error::config("Failed to downcast Event receiver"))?;
            Ok(Arc::new(EventSource::new(*rx)) as Arc<dyn Source>)
        }
        AUDIT_SOURCE_ID => {
            let rx = receiver
                .downcast::<Receiver<Audit>>()
                .map_err(|_| Error::config("Failed to downcast Audit receiver"))?;
            Ok(Arc::new(AuditSource::new(*rx)) as Arc<dyn Source>)
        }
        _ => Err(Error::config(format!(
            "Unknown system source ID: {}",
            source_id
        ))),
    }
}

pub async fn build_system_sink(
    sink_id: &str,
    system_config: &SystemConfig,
) -> Result<Arc<dyn Sink>> {
    use crate::sink::system::{SystemSink, is_valid_system_sink_id};

    if !is_valid_system_sink_id(sink_id) {
        return Err(Error::config(format!(
            "Invalid system sink ID '{}'",
            sink_id
        )));
    }

    let sink = SystemSink::new(sink_id, Some(&system_config.sinks)).await?;
    Ok(Arc::new(sink) as Arc<dyn Sink>)
}