Skip to main content

faucet_sink_stdout/
sink.rs

1//! Stdout/stderr sink implementation.
2
3use crate::config::{StdStream, StdoutFormat, StdoutSinkConfig};
4use async_trait::async_trait;
5use faucet_core::FaucetError;
6use serde_json::Value;
7use std::io;
8use tokio::io::{AsyncWrite, AsyncWriteExt};
9use tokio::sync::Mutex;
10
11/// State guarded by a single mutex so the running record count, the writer,
12/// and the "consumer closed the pipe" flag can't race against each other.
13struct State {
14    writer: Box<dyn AsyncWrite + Unpin + Send>,
15    written: usize,
16    closed: bool,
17}
18
19/// A sink that writes records to standard output or standard error.
20pub struct StdoutSink {
21    config: StdoutSinkConfig,
22    state: Mutex<State>,
23}
24
25impl StdoutSink {
26    /// Create a new stdout/stderr sink. Opens the underlying stream eagerly.
27    pub fn new(config: StdoutSinkConfig) -> Self {
28        let writer: Box<dyn AsyncWrite + Unpin + Send> = match config.destination {
29            StdStream::Stdout => Box::new(tokio::io::stdout()),
30            StdStream::Stderr => Box::new(tokio::io::stderr()),
31        };
32        Self::with_writer(config, writer)
33    }
34
35    /// Construct with a caller-provided async writer. Used by tests to capture
36    /// output, and by integrators who want to redirect into something other
37    /// than the real stdio handles (e.g. a log file, an in-memory buffer).
38    pub fn with_writer(
39        config: StdoutSinkConfig,
40        writer: Box<dyn AsyncWrite + Unpin + Send>,
41    ) -> Self {
42        Self {
43            config,
44            state: Mutex::new(State {
45                writer,
46                written: 0,
47                closed: false,
48            }),
49        }
50    }
51
52    fn encode(&self, record: &Value) -> Result<Vec<u8>, FaucetError> {
53        match self.config.format {
54            StdoutFormat::JsonLines => {
55                let mut bytes = serde_json::to_vec(record)
56                    .map_err(|e| FaucetError::Sink(format!("JSON serialization failed: {e}")))?;
57                bytes.push(b'\n');
58                Ok(bytes)
59            }
60            StdoutFormat::PrettyJson => {
61                let mut bytes = serde_json::to_vec_pretty(record)
62                    .map_err(|e| FaucetError::Sink(format!("JSON serialization failed: {e}")))?;
63                bytes.push(b'\n');
64                Ok(bytes)
65            }
66            StdoutFormat::Tsv => encode_tsv(record),
67        }
68    }
69}
70
71fn encode_tsv(record: &Value) -> Result<Vec<u8>, FaucetError> {
72    let obj = record.as_object().ok_or_else(|| {
73        FaucetError::Sink("Tsv format requires each record to be a JSON object".into())
74    })?;
75    let mut keys: Vec<&String> = obj.keys().collect();
76    keys.sort();
77    let mut line = String::new();
78    for (i, key) in keys.iter().enumerate() {
79        if i > 0 {
80            line.push('\t');
81        }
82        let value = &obj[*key];
83        line.push_str(&tsv_cell(value)?);
84    }
85    line.push('\n');
86    Ok(line.into_bytes())
87}
88
89fn tsv_cell(value: &Value) -> Result<String, FaucetError> {
90    Ok(match value {
91        // Render strings without JSON quoting, but neutralise control chars
92        // that would corrupt the TSV layout.
93        Value::String(s) => s.replace(['\t', '\n', '\r'], " "),
94        Value::Null => String::new(),
95        Value::Bool(_) | Value::Number(_) => value.to_string(),
96        Value::Array(_) | Value::Object(_) => serde_json::to_string(value)
97            .map_err(|e| FaucetError::Sink(format!("JSON serialization failed: {e}")))?,
98    })
99}
100
101#[async_trait]
102impl faucet_core::Sink for StdoutSink {
103    fn config_schema(&self) -> Value {
104        serde_json::to_value(faucet_core::schema_for!(StdoutSinkConfig))
105            .expect("schema serialization")
106    }
107
108    async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
109        if records.is_empty() {
110            return Ok(0);
111        }
112
113        let mut state = self.state.lock().await;
114        if state.closed {
115            return Ok(0);
116        }
117
118        let remaining = match self.config.max_records {
119            Some(max) => max.saturating_sub(state.written),
120            None => usize::MAX,
121        };
122        if remaining == 0 {
123            return Ok(0);
124        }
125
126        let take = records.len().min(remaining);
127        let mut written_this_call = 0usize;
128        for record in records.iter().take(take) {
129            let bytes = self.encode(record)?;
130            match state.writer.write_all(&bytes).await {
131                Ok(()) => {}
132                Err(e) if e.kind() == io::ErrorKind::BrokenPipe => {
133                    state.closed = true;
134                    tracing::debug!("stdout consumer closed pipe; stopping writes");
135                    return Ok(written_this_call);
136                }
137                Err(e) => return Err(FaucetError::Sink(format!("write failed: {e}"))),
138            }
139            if self.config.flush_per_record {
140                state
141                    .writer
142                    .flush()
143                    .await
144                    .map_err(|e| FaucetError::Sink(format!("flush failed: {e}")))?;
145            }
146            state.written += 1;
147            written_this_call += 1;
148        }
149        Ok(written_this_call)
150    }
151
152    async fn flush(&self) -> Result<(), FaucetError> {
153        let mut state = self.state.lock().await;
154        state
155            .writer
156            .flush()
157            .await
158            .map_err(|e| FaucetError::Sink(format!("flush failed: {e}")))
159    }
160
161    /// Preflight probe for `faucet doctor`. The standard streams are always
162    /// reachable (the OS hands them to every process), so there is nothing to
163    /// fail on — this always passes immediately.
164    async fn check(
165        &self,
166        _ctx: &faucet_core::check::CheckContext,
167    ) -> Result<faucet_core::check::CheckReport, FaucetError> {
168        use faucet_core::check::{CheckReport, Probe};
169        Ok(CheckReport::single(Probe::pass(
170            "io",
171            std::time::Duration::ZERO,
172        )))
173    }
174}
175
176#[cfg(test)]
177mod tests {
178    use super::*;
179    use faucet_core::Sink;
180    use serde_json::json;
181    use std::pin::Pin;
182    use std::sync::Arc;
183    use std::sync::Mutex as StdMutex;
184    use std::task::{Context, Poll};
185    use tokio::io::AsyncWrite;
186
187    /// In-memory async writer that records bytes for assertions and can
188    /// optionally simulate a broken-pipe error after a fixed number of writes.
189    #[derive(Clone, Default)]
190    struct CaptureWriter {
191        inner: Arc<StdMutex<CaptureInner>>,
192    }
193
194    #[derive(Default)]
195    struct CaptureInner {
196        bytes: Vec<u8>,
197        flushes: usize,
198        fail_after: Option<usize>,
199        writes: usize,
200    }
201
202    impl CaptureWriter {
203        fn fail_after(n: usize) -> Self {
204            let me = Self::default();
205            me.inner.lock().unwrap().fail_after = Some(n);
206            me
207        }
208        fn captured(&self) -> Vec<u8> {
209            self.inner.lock().unwrap().bytes.clone()
210        }
211        fn flushes(&self) -> usize {
212            self.inner.lock().unwrap().flushes
213        }
214        fn as_str(&self) -> String {
215            String::from_utf8(self.captured()).unwrap()
216        }
217    }
218
219    impl AsyncWrite for CaptureWriter {
220        fn poll_write(
221            self: Pin<&mut Self>,
222            _cx: &mut Context<'_>,
223            buf: &[u8],
224        ) -> Poll<io::Result<usize>> {
225            let mut inner = self.inner.lock().unwrap();
226            inner.writes += 1;
227            if let Some(fail_after) = inner.fail_after
228                && inner.writes > fail_after
229            {
230                return Poll::Ready(Err(io::Error::from(io::ErrorKind::BrokenPipe)));
231            }
232            inner.bytes.extend_from_slice(buf);
233            Poll::Ready(Ok(buf.len()))
234        }
235        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
236            self.inner.lock().unwrap().flushes += 1;
237            Poll::Ready(Ok(()))
238        }
239        fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
240            Poll::Ready(Ok(()))
241        }
242    }
243
244    fn sink_with(config: StdoutSinkConfig) -> (StdoutSink, CaptureWriter) {
245        let writer = CaptureWriter::default();
246        let sink = StdoutSink::with_writer(config, Box::new(writer.clone()));
247        (sink, writer)
248    }
249
250    #[tokio::test]
251    async fn json_lines_emits_one_record_per_line() {
252        let (sink, capture) = sink_with(StdoutSinkConfig::new());
253        let records = vec![json!({"id": 1}), json!({"id": 2})];
254        let n = sink.write_batch(&records).await.unwrap();
255        assert_eq!(n, 2);
256        let out = capture.as_str();
257        let lines: Vec<&str> = out.lines().collect();
258        assert_eq!(lines.len(), 2);
259        assert_eq!(
260            serde_json::from_str::<Value>(lines[0]).unwrap(),
261            json!({"id": 1})
262        );
263        assert_eq!(
264            serde_json::from_str::<Value>(lines[1]).unwrap(),
265            json!({"id": 2})
266        );
267    }
268
269    #[tokio::test]
270    async fn pretty_json_indents_and_separates_records() {
271        let (sink, capture) = sink_with(StdoutSinkConfig::new().format(StdoutFormat::PrettyJson));
272        sink.write_batch(&[json!({"id": 1, "nested": {"k": "v"}})])
273            .await
274            .unwrap();
275        let out = capture.as_str();
276        assert!(out.contains("  \"id\": 1"));
277        assert!(out.contains("  \"nested\": {"));
278        assert!(out.ends_with('\n'));
279    }
280
281    #[tokio::test]
282    async fn tsv_emits_keys_sorted_with_tab_separators() {
283        let (sink, capture) = sink_with(StdoutSinkConfig::new().format(StdoutFormat::Tsv));
284        sink.write_batch(&[json!({"name": "alice", "id": 7, "tags": ["a","b"], "active": true})])
285            .await
286            .unwrap();
287        let out = capture.as_str();
288        let line = out.lines().next().unwrap();
289        let cells: Vec<&str> = line.split('\t').collect();
290        // sorted: active, id, name, tags
291        assert_eq!(cells, vec!["true", "7", "alice", r#"["a","b"]"#]);
292    }
293
294    #[tokio::test]
295    async fn tsv_replaces_tabs_and_newlines_in_string_values() {
296        let (sink, capture) = sink_with(StdoutSinkConfig::new().format(StdoutFormat::Tsv));
297        sink.write_batch(&[json!({"a": "tab\there\nand-newline"})])
298            .await
299            .unwrap();
300        let out = capture.as_str();
301        let line = out.lines().next().unwrap();
302        assert_eq!(line, "tab here and-newline");
303    }
304
305    #[tokio::test]
306    async fn tsv_rejects_non_object_records() {
307        let (sink, _capture) = sink_with(StdoutSinkConfig::new().format(StdoutFormat::Tsv));
308        let result = sink.write_batch(&[json!([1, 2, 3])]).await;
309        assert!(matches!(result, Err(FaucetError::Sink(_))));
310    }
311
312    #[tokio::test]
313    async fn empty_batch_returns_zero() {
314        let (sink, _capture) = sink_with(StdoutSinkConfig::new());
315        let n = sink.write_batch(&[]).await.unwrap();
316        assert_eq!(n, 0);
317    }
318
319    #[tokio::test]
320    async fn max_records_caps_output() {
321        let (sink, capture) = sink_with(StdoutSinkConfig::new().max_records(2));
322        let n = sink
323            .write_batch(&[json!({"id": 1}), json!({"id": 2}), json!({"id": 3})])
324            .await
325            .unwrap();
326        assert_eq!(n, 2);
327        assert_eq!(capture.as_str().lines().count(), 2);
328        // Subsequent calls become no-ops.
329        let n2 = sink.write_batch(&[json!({"id": 4})]).await.unwrap();
330        assert_eq!(n2, 0);
331        assert_eq!(capture.as_str().lines().count(), 2);
332    }
333
334    #[tokio::test]
335    async fn flush_per_record_flushes_after_each() {
336        let (sink, capture) = sink_with(StdoutSinkConfig::new().flush_per_record(true));
337        sink.write_batch(&[json!({"id": 1}), json!({"id": 2})])
338            .await
339            .unwrap();
340        assert_eq!(capture.flushes(), 2);
341    }
342
343    #[tokio::test]
344    async fn batch_boundary_flush_only_on_explicit_flush() {
345        let (sink, capture) = sink_with(StdoutSinkConfig::new());
346        sink.write_batch(&[json!({"id": 1})]).await.unwrap();
347        assert_eq!(capture.flushes(), 0);
348        sink.flush().await.unwrap();
349        assert_eq!(capture.flushes(), 1);
350    }
351
352    #[tokio::test]
353    async fn broken_pipe_is_treated_as_clean_termination() {
354        // Writer accepts 1 write then errors with BrokenPipe.
355        let capture = CaptureWriter::fail_after(1);
356        let sink = StdoutSink::with_writer(StdoutSinkConfig::new(), Box::new(capture.clone()));
357        let n = sink
358            .write_batch(&[json!({"id": 1}), json!({"id": 2}), json!({"id": 3})])
359            .await
360            .unwrap();
361        assert_eq!(n, 1);
362        // Further writes are no-ops because the sink is now marked closed.
363        let n2 = sink.write_batch(&[json!({"id": 4})]).await.unwrap();
364        assert_eq!(n2, 0);
365    }
366
367    #[tokio::test]
368    async fn as_trait_object() {
369        let capture = CaptureWriter::default();
370        let sink: Box<dyn Sink> = Box::new(StdoutSink::with_writer(
371            StdoutSinkConfig::new(),
372            Box::new(capture.clone()),
373        ));
374        let n = sink.write_batch(&[json!({"id": 1})]).await.unwrap();
375        assert_eq!(n, 1);
376        assert!(capture.as_str().contains("\"id\":1"));
377    }
378
379    #[tokio::test]
380    async fn config_schema_is_well_formed_object() {
381        let sink = StdoutSink::new(StdoutSinkConfig::new());
382        let schema = sink.config_schema();
383        assert_eq!(schema["type"], "object");
384        assert!(schema["properties"].is_object());
385    }
386
387    #[tokio::test]
388    async fn check_always_passes() {
389        let sink = StdoutSink::new(StdoutSinkConfig::new());
390        let report = sink
391            .check(&faucet_core::check::CheckContext::default())
392            .await
393            .unwrap();
394        assert_eq!(report.failed_count(), 0);
395        assert_eq!(report.probes.len(), 1);
396        assert_eq!(report.probes[0].name, "io");
397    }
398}