rustcdc 0.3.0

Embeddable Rust CDC library focused on correctness-first capture primitives
Documentation
//! [`StdoutSink`] — writes newline-delimited JSON to stdout.

use std::io::{self, Write};

use crate::core::{Error, Event, Result};
use crate::sink::SinkAdapter;

// ─── StdoutSink ───────────────────────────────────────────────────────────────

/// Writes CDC events to **stdout** as newline-delimited JSON (NDJSON).
///
/// Each event is serialised with `serde_json::to_vec` and followed by a `\n`.
/// Writes are synchronous and hold the stdout lock for the duration of each
/// `send` call; this makes output ordering deterministic under concurrent async
/// tasks.
///
/// # When to use
///
/// - Local debugging / development pipelines (`cdc-server | jq`).
/// - Integration tests that capture stdout with `assert_cmd`.
/// - Docker / Kubernetes deployments where stdout is the primary log channel.
///
/// # Not suitable for
///
/// High-throughput production pipelines. Every `send` acquires a global lock.
/// For production use, implement [`SinkAdapter`] against your streaming platform.
///
/// # Example
///
/// ```rust,no_run
/// use rustcdc::sink::StdoutSink;
/// use rustcdc::sink::SinkAdapter;
/// use rustcdc::{Event, Operation, EVENT_ENVELOPE_VERSION};
///
/// # #[tokio::main]
/// # async fn main() {
/// let mut sink = StdoutSink::new();
/// let event = Event { op: Operation::Insert, ..Event::default() };
/// sink.send(&event).await.unwrap();
/// # }
/// ```
#[derive(Debug, Default)]
pub struct StdoutSink {
    closed: bool,
    events_sent: u64,
}

impl StdoutSink {
    /// Create a new `StdoutSink`.
    pub fn new() -> Self {
        Self::default()
    }

    /// Total number of events sent through this sink (not reset on flush).
    pub fn events_sent(&self) -> u64 {
        self.events_sent
    }
}

impl SinkAdapter for StdoutSink {
    async fn send(&mut self, event: &Event) -> Result<()> {
        if self.closed {
            return Err(Error::StateError("StdoutSink is closed".into()));
        }
        let mut bytes = serde_json::to_vec(event)?;
        bytes.push(b'\n');
        let stdout = io::stdout();
        let mut handle = stdout.lock();
        handle
            .write_all(&bytes)
            .map_err(|e| Error::StateError(format!("StdoutSink write: {e}")))?;
        self.events_sent += 1;
        Ok(())
    }

    async fn flush(&mut self) -> Result<()> {
        if self.closed {
            return Err(Error::StateError("StdoutSink is closed".into()));
        }
        let stdout = io::stdout();
        let mut handle = stdout.lock();
        handle
            .flush()
            .map_err(|e| Error::StateError(format!("StdoutSink flush: {e}")))?;
        Ok(())
    }

    async fn close(&mut self) -> Result<()> {
        if !self.closed {
            // Best-effort flush before close.
            let stdout = io::stdout();
            let mut handle = stdout.lock();
            let _ = handle.flush();
        }
        self.closed = true;
        Ok(())
    }

    fn name(&self) -> &str {
        "stdout"
    }

    fn is_closed(&self) -> Option<bool> {
        Some(self.closed)
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn send_after_close_errors() {
        let mut sink = StdoutSink::new();
        sink.close().await.unwrap();
        let event = Event::default();
        assert!(sink.send(&event).await.is_err());
    }

    #[tokio::test]
    async fn flush_after_close_errors() {
        let mut sink = StdoutSink::new();
        sink.close().await.unwrap();
        assert!(sink.flush().await.is_err());
    }

    #[test]
    fn name_is_stdout() {
        let sink = StdoutSink::new();
        assert_eq!(sink.name(), "stdout");
    }

    #[test]
    fn is_closed_starts_false() {
        let sink = StdoutSink::new();
        assert_eq!(sink.is_closed(), Some(false));
    }

    #[tokio::test]
    async fn is_closed_true_after_close() {
        let mut sink = StdoutSink::new();
        sink.close().await.unwrap();
        assert_eq!(sink.is_closed(), Some(true));
    }

    #[tokio::test]
    async fn events_sent_increments() {
        // We can't easily capture stdout in unit tests, but we can verify counter.
        // Use a real StdoutSink — output goes to /dev/null in CI.
        let mut sink = StdoutSink::new();
        assert_eq!(sink.events_sent(), 0);
        sink.send(&Event::default()).await.unwrap();
        assert_eq!(sink.events_sent(), 1);
        sink.send(&Event::default()).await.unwrap();
        assert_eq!(sink.events_sent(), 2);
    }
}