pipeflow 0.0.4

A lightweight, configuration-driven data pipeline framework
Documentation
//! Source trait and implementations

use async_trait::async_trait;
use std::sync::Arc;
use tokio::sync::broadcast;

use crate::common::message::SharedMessage;
use crate::error::Result;

#[cfg(feature = "file")]
pub mod file;
#[cfg(feature = "http-client")]
pub mod http_client;
#[cfg(feature = "http-server")]
pub mod http_server;
#[cfg(feature = "redis")]
pub mod redis;
#[cfg(feature = "database")]
pub mod sql;
pub mod system;

// Re-export system source IDs and implementations
pub use system::{
    AUDIT_SOURCE_ID, AuditSource, DLQ_SOURCE_ID, DlqSource, EVENT_SOURCE_ID, EventSource,
};

/// Callback invoked when a source sends a message successfully.
pub type SendHook = Arc<dyn Fn() + Send + Sync>;

/// Audit hook for sending audit records when a source emits a message.
pub type AuditHook = Arc<dyn Fn(&str) + Send + Sync>;

/// Sender wrapper that allows metric hooks without changing source behavior.
#[derive(Clone)]
pub struct MessageSender {
    inner: broadcast::Sender<SharedMessage>,
    on_send: Option<SendHook>,
    on_audit: Option<AuditHook>,
}

impl MessageSender {
    /// Create a new message sender with an optional send hook.
    pub fn new(inner: broadcast::Sender<SharedMessage>, on_send: Option<SendHook>) -> Self {
        Self {
            inner,
            on_send,
            on_audit: None,
        }
    }

    /// Set the audit hook for this sender.
    pub fn with_audit_hook(mut self, hook: AuditHook) -> Self {
        self.on_audit = Some(hook);
        self
    }

    /// Send a message and invoke the hook on success.
    pub fn send(&self, msg: SharedMessage) -> Result<usize> {
        let message_id = msg.meta.id.to_string();
        let result = self
            .inner
            .send(msg)
            .map_err(|err| crate::error::Error::source(format!("failed to send message: {err}")));
        if result.is_ok() {
            if let Some(ref hook) = self.on_send {
                hook();
            }
            if let Some(ref audit_hook) = self.on_audit {
                audit_hook(&message_id);
            }
        }
        result
    }
}

/// Source produces messages into the pipeline
#[async_trait]
pub trait Source: Send + Sync {
    /// Unique node identifier
    fn id(&self) -> &str;

    /// Start the source and send messages to the provided sender
    ///
    /// This method runs until shutdown is signaled or an unrecoverable error occurs.
    /// Sources should wrap messages in Arc before sending to avoid expensive clones
    /// when broadcasting to multiple sinks.
    async fn run(&self, sender: MessageSender, shutdown: broadcast::Receiver<()>) -> Result<()>;
}