Skip to main content

rustcdc/sink/
stdout.rs

1//! [`StdoutSink`] — writes newline-delimited JSON to stdout.
2
3use std::io::{self, Write};
4
5use crate::core::{Error, Event, Result};
6use crate::sink::SinkAdapter;
7
8// ─── StdoutSink ───────────────────────────────────────────────────────────────
9
10/// Writes CDC events to **stdout** as newline-delimited JSON (NDJSON).
11///
12/// Each event is serialised with `serde_json::to_vec` and followed by a `\n`.
13/// Writes are synchronous and hold the stdout lock for the duration of each
14/// `send` call; this makes output ordering deterministic under concurrent async
15/// tasks.
16///
17/// # When to use
18///
19/// - Local debugging / development pipelines (`cdc-server | jq`).
20/// - Integration tests that capture stdout with `assert_cmd`.
21/// - Docker / Kubernetes deployments where stdout is the primary log channel.
22///
23/// # Not suitable for
24///
25/// High-throughput production pipelines. Every `send` acquires a global lock.
26/// For production use, implement [`SinkAdapter`] against your streaming platform.
27///
28/// # Example
29///
30/// ```rust,no_run
31/// use rustcdc::sink::StdoutSink;
32/// use rustcdc::sink::SinkAdapter;
33/// use rustcdc::{Event, Operation, EVENT_ENVELOPE_VERSION};
34///
35/// # #[tokio::main]
36/// # async fn main() {
37/// let mut sink = StdoutSink::new();
38/// let event = Event { op: Operation::Insert, ..Event::default() };
39/// sink.send(&event).await.unwrap();
40/// # }
41/// ```
42#[derive(Debug, Default)]
43pub struct StdoutSink {
44    closed: bool,
45    events_sent: u64,
46    pretty: bool,
47    /// When `Some`, every sent event is appended here for test inspection.
48    captured: Option<Vec<Event>>,
49}
50
51impl StdoutSink {
52    /// Create a new `StdoutSink` that writes compact (single-line) JSON.
53    pub fn new() -> Self {
54        Self::default()
55    }
56
57    /// Create a `StdoutSink` that writes pretty-printed JSON.
58    ///
59    /// Useful for human-readable development output and terminal piping.
60    /// For machine-consumption pipelines, prefer [`StdoutSink::new`].
61    pub fn with_pretty() -> Self {
62        Self {
63            pretty: true,
64            ..Self::default()
65        }
66    }
67
68    /// Create a `StdoutSink` that also buffers every sent event in memory.
69    ///
70    /// Intended for **testing only** — the buffer grows unbounded. Call
71    /// [`exported_events`](SinkAdapter::exported_events) on the sink to
72    /// inspect captured events after delivery.
73    pub fn with_capture() -> Self {
74        Self {
75            captured: Some(Vec::new()),
76            ..Self::default()
77        }
78    }
79
80    /// Total number of events sent through this sink (not reset on flush).
81    pub fn events_sent(&self) -> u64 {
82        self.events_sent
83    }
84}
85
86impl SinkAdapter for StdoutSink {
87    async fn send(&mut self, event: &Event) -> Result<()> {
88        if self.closed {
89            return Err(Error::StateError("StdoutSink is closed".into()));
90        }
91        let mut bytes = if self.pretty {
92            serde_json::to_vec_pretty(event)?
93        } else {
94            serde_json::to_vec(event)?
95        };
96        bytes.push(b'\n');
97        let stdout = io::stdout();
98        let mut handle = stdout.lock();
99        handle
100            .write_all(&bytes)
101            .map_err(|e| Error::StateError(format!("StdoutSink write: {e}")))?;
102        self.events_sent += 1;
103        if let Some(ref mut buf) = self.captured {
104            buf.push(event.clone());
105        }
106        Ok(())
107    }
108
109    async fn flush(&mut self) -> Result<()> {
110        if self.closed {
111            return Err(Error::StateError("StdoutSink is closed".into()));
112        }
113        let stdout = io::stdout();
114        let mut handle = stdout.lock();
115        handle
116            .flush()
117            .map_err(|e| Error::StateError(format!("StdoutSink flush: {e}")))?;
118        Ok(())
119    }
120
121    async fn close(&mut self) -> Result<()> {
122        if !self.closed {
123            // Best-effort flush before close.
124            let stdout = io::stdout();
125            let mut handle = stdout.lock();
126            let _ = handle.flush();
127        }
128        self.closed = true;
129        Ok(())
130    }
131
132    fn name(&self) -> &str {
133        "stdout"
134    }
135
136    fn is_closed(&self) -> bool {
137        self.closed
138    }
139
140    fn delivery_metrics(&self) -> Option<crate::sink::SinkDeliveryMetrics> {
141        Some(crate::sink::SinkDeliveryMetrics {
142            events_sent: self.events_sent,
143            ..Default::default()
144        })
145    }
146
147    fn exported_events(&self) -> Option<&[Event]> {
148        self.captured.as_deref()
149    }
150}
151
152#[cfg(test)]
153mod tests {
154    use super::*;
155
156    #[tokio::test]
157    async fn send_after_close_errors() {
158        let mut sink = StdoutSink::new();
159        sink.close().await.unwrap();
160        let event = Event::default();
161        assert!(sink.send(&event).await.is_err());
162    }
163
164    #[tokio::test]
165    async fn flush_after_close_errors() {
166        let mut sink = StdoutSink::new();
167        sink.close().await.unwrap();
168        assert!(sink.flush().await.is_err());
169    }
170
171    #[test]
172    fn name_is_stdout() {
173        let sink = StdoutSink::new();
174        assert_eq!(sink.name(), "stdout");
175    }
176
177    #[test]
178    fn is_closed_starts_false() {
179        let sink = StdoutSink::new();
180        assert!(!sink.is_closed());
181    }
182
183    #[tokio::test]
184    async fn is_closed_true_after_close() {
185        let mut sink = StdoutSink::new();
186        sink.close().await.unwrap();
187        assert!(sink.is_closed());
188    }
189
190    #[tokio::test]
191    async fn events_sent_increments() {
192        // We can't easily capture stdout in unit tests, but we can verify counter.
193        // Use a real StdoutSink — output goes to /dev/null in CI.
194        let mut sink = StdoutSink::new();
195        assert_eq!(sink.events_sent(), 0);
196        sink.send(&Event::default()).await.unwrap();
197        assert_eq!(sink.events_sent(), 1);
198        sink.send(&Event::default()).await.unwrap();
199        assert_eq!(sink.events_sent(), 2);
200    }
201}