1use crate::config::{StdStream, StdoutFormat, StdoutSinkConfig};
4use async_trait::async_trait;
5use faucet_core::FaucetError;
6use serde_json::Value;
7use std::io;
8use tokio::io::{AsyncWrite, AsyncWriteExt};
9use tokio::sync::Mutex;
10
11struct State {
14 writer: Box<dyn AsyncWrite + Unpin + Send>,
15 written: usize,
16 closed: bool,
17}
18
19pub struct StdoutSink {
21 config: StdoutSinkConfig,
22 state: Mutex<State>,
23}
24
25impl StdoutSink {
26 pub fn new(config: StdoutSinkConfig) -> Self {
28 let writer: Box<dyn AsyncWrite + Unpin + Send> = match config.destination {
29 StdStream::Stdout => Box::new(tokio::io::stdout()),
30 StdStream::Stderr => Box::new(tokio::io::stderr()),
31 };
32 Self::with_writer(config, writer)
33 }
34
35 pub fn with_writer(
39 config: StdoutSinkConfig,
40 writer: Box<dyn AsyncWrite + Unpin + Send>,
41 ) -> Self {
42 Self {
43 config,
44 state: Mutex::new(State {
45 writer,
46 written: 0,
47 closed: false,
48 }),
49 }
50 }
51
52 fn encode(&self, record: &Value) -> Result<Vec<u8>, FaucetError> {
53 match self.config.format {
54 StdoutFormat::JsonLines => {
55 let mut bytes = serde_json::to_vec(record)
56 .map_err(|e| FaucetError::Sink(format!("JSON serialization failed: {e}")))?;
57 bytes.push(b'\n');
58 Ok(bytes)
59 }
60 StdoutFormat::PrettyJson => {
61 let mut bytes = serde_json::to_vec_pretty(record)
62 .map_err(|e| FaucetError::Sink(format!("JSON serialization failed: {e}")))?;
63 bytes.push(b'\n');
64 Ok(bytes)
65 }
66 StdoutFormat::Tsv => encode_tsv(record),
67 }
68 }
69}
70
71fn encode_tsv(record: &Value) -> Result<Vec<u8>, FaucetError> {
72 let obj = record.as_object().ok_or_else(|| {
73 FaucetError::Sink("Tsv format requires each record to be a JSON object".into())
74 })?;
75 let mut keys: Vec<&String> = obj.keys().collect();
76 keys.sort();
77 let mut line = String::new();
78 for (i, key) in keys.iter().enumerate() {
79 if i > 0 {
80 line.push('\t');
81 }
82 let value = &obj[*key];
83 line.push_str(&tsv_cell(value)?);
84 }
85 line.push('\n');
86 Ok(line.into_bytes())
87}
88
89fn tsv_cell(value: &Value) -> Result<String, FaucetError> {
90 Ok(match value {
91 Value::String(s) => s.replace(['\t', '\n', '\r'], " "),
94 Value::Null => String::new(),
95 Value::Bool(_) | Value::Number(_) => value.to_string(),
96 Value::Array(_) | Value::Object(_) => serde_json::to_string(value)
97 .map_err(|e| FaucetError::Sink(format!("JSON serialization failed: {e}")))?,
98 })
99}
100
101#[async_trait]
102impl faucet_core::Sink for StdoutSink {
103 fn config_schema(&self) -> Value {
104 serde_json::to_value(faucet_core::schema_for!(StdoutSinkConfig))
105 .expect("schema serialization")
106 }
107
108 async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
109 if records.is_empty() {
110 return Ok(0);
111 }
112
113 let mut state = self.state.lock().await;
114 if state.closed {
115 return Ok(0);
116 }
117
118 let remaining = match self.config.max_records {
119 Some(max) => max.saturating_sub(state.written),
120 None => usize::MAX,
121 };
122 if remaining == 0 {
123 return Ok(0);
124 }
125
126 let take = records.len().min(remaining);
127 let mut written_this_call = 0usize;
128 for record in records.iter().take(take) {
129 let bytes = self.encode(record)?;
130 match state.writer.write_all(&bytes).await {
131 Ok(()) => {}
132 Err(e) if e.kind() == io::ErrorKind::BrokenPipe => {
133 state.closed = true;
134 tracing::debug!("stdout consumer closed pipe; stopping writes");
135 return Ok(written_this_call);
136 }
137 Err(e) => return Err(FaucetError::Sink(format!("write failed: {e}"))),
138 }
139 if self.config.flush_per_record {
140 state
141 .writer
142 .flush()
143 .await
144 .map_err(|e| FaucetError::Sink(format!("flush failed: {e}")))?;
145 }
146 state.written += 1;
147 written_this_call += 1;
148 }
149 Ok(written_this_call)
150 }
151
152 async fn flush(&self) -> Result<(), FaucetError> {
153 let mut state = self.state.lock().await;
154 state
155 .writer
156 .flush()
157 .await
158 .map_err(|e| FaucetError::Sink(format!("flush failed: {e}")))
159 }
160
161 async fn check(
165 &self,
166 _ctx: &faucet_core::check::CheckContext,
167 ) -> Result<faucet_core::check::CheckReport, FaucetError> {
168 use faucet_core::check::{CheckReport, Probe};
169 Ok(CheckReport::single(Probe::pass(
170 "io",
171 std::time::Duration::ZERO,
172 )))
173 }
174}
175
176#[cfg(test)]
177mod tests {
178 use super::*;
179 use faucet_core::Sink;
180 use serde_json::json;
181 use std::pin::Pin;
182 use std::sync::Arc;
183 use std::sync::Mutex as StdMutex;
184 use std::task::{Context, Poll};
185 use tokio::io::AsyncWrite;
186
187 #[derive(Clone, Default)]
190 struct CaptureWriter {
191 inner: Arc<StdMutex<CaptureInner>>,
192 }
193
194 #[derive(Default)]
195 struct CaptureInner {
196 bytes: Vec<u8>,
197 flushes: usize,
198 fail_after: Option<usize>,
199 writes: usize,
200 }
201
202 impl CaptureWriter {
203 fn fail_after(n: usize) -> Self {
204 let me = Self::default();
205 me.inner.lock().unwrap().fail_after = Some(n);
206 me
207 }
208 fn captured(&self) -> Vec<u8> {
209 self.inner.lock().unwrap().bytes.clone()
210 }
211 fn flushes(&self) -> usize {
212 self.inner.lock().unwrap().flushes
213 }
214 fn as_str(&self) -> String {
215 String::from_utf8(self.captured()).unwrap()
216 }
217 }
218
219 impl AsyncWrite for CaptureWriter {
220 fn poll_write(
221 self: Pin<&mut Self>,
222 _cx: &mut Context<'_>,
223 buf: &[u8],
224 ) -> Poll<io::Result<usize>> {
225 let mut inner = self.inner.lock().unwrap();
226 inner.writes += 1;
227 if let Some(fail_after) = inner.fail_after
228 && inner.writes > fail_after
229 {
230 return Poll::Ready(Err(io::Error::from(io::ErrorKind::BrokenPipe)));
231 }
232 inner.bytes.extend_from_slice(buf);
233 Poll::Ready(Ok(buf.len()))
234 }
235 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
236 self.inner.lock().unwrap().flushes += 1;
237 Poll::Ready(Ok(()))
238 }
239 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
240 Poll::Ready(Ok(()))
241 }
242 }
243
244 fn sink_with(config: StdoutSinkConfig) -> (StdoutSink, CaptureWriter) {
245 let writer = CaptureWriter::default();
246 let sink = StdoutSink::with_writer(config, Box::new(writer.clone()));
247 (sink, writer)
248 }
249
250 #[tokio::test]
251 async fn json_lines_emits_one_record_per_line() {
252 let (sink, capture) = sink_with(StdoutSinkConfig::new());
253 let records = vec![json!({"id": 1}), json!({"id": 2})];
254 let n = sink.write_batch(&records).await.unwrap();
255 assert_eq!(n, 2);
256 let out = capture.as_str();
257 let lines: Vec<&str> = out.lines().collect();
258 assert_eq!(lines.len(), 2);
259 assert_eq!(
260 serde_json::from_str::<Value>(lines[0]).unwrap(),
261 json!({"id": 1})
262 );
263 assert_eq!(
264 serde_json::from_str::<Value>(lines[1]).unwrap(),
265 json!({"id": 2})
266 );
267 }
268
269 #[tokio::test]
270 async fn pretty_json_indents_and_separates_records() {
271 let (sink, capture) = sink_with(StdoutSinkConfig::new().format(StdoutFormat::PrettyJson));
272 sink.write_batch(&[json!({"id": 1, "nested": {"k": "v"}})])
273 .await
274 .unwrap();
275 let out = capture.as_str();
276 assert!(out.contains(" \"id\": 1"));
277 assert!(out.contains(" \"nested\": {"));
278 assert!(out.ends_with('\n'));
279 }
280
281 #[tokio::test]
282 async fn tsv_emits_keys_sorted_with_tab_separators() {
283 let (sink, capture) = sink_with(StdoutSinkConfig::new().format(StdoutFormat::Tsv));
284 sink.write_batch(&[json!({"name": "alice", "id": 7, "tags": ["a","b"], "active": true})])
285 .await
286 .unwrap();
287 let out = capture.as_str();
288 let line = out.lines().next().unwrap();
289 let cells: Vec<&str> = line.split('\t').collect();
290 assert_eq!(cells, vec!["true", "7", "alice", r#"["a","b"]"#]);
292 }
293
294 #[tokio::test]
295 async fn tsv_replaces_tabs_and_newlines_in_string_values() {
296 let (sink, capture) = sink_with(StdoutSinkConfig::new().format(StdoutFormat::Tsv));
297 sink.write_batch(&[json!({"a": "tab\there\nand-newline"})])
298 .await
299 .unwrap();
300 let out = capture.as_str();
301 let line = out.lines().next().unwrap();
302 assert_eq!(line, "tab here and-newline");
303 }
304
305 #[tokio::test]
306 async fn tsv_rejects_non_object_records() {
307 let (sink, _capture) = sink_with(StdoutSinkConfig::new().format(StdoutFormat::Tsv));
308 let result = sink.write_batch(&[json!([1, 2, 3])]).await;
309 assert!(matches!(result, Err(FaucetError::Sink(_))));
310 }
311
312 #[tokio::test]
313 async fn empty_batch_returns_zero() {
314 let (sink, _capture) = sink_with(StdoutSinkConfig::new());
315 let n = sink.write_batch(&[]).await.unwrap();
316 assert_eq!(n, 0);
317 }
318
319 #[tokio::test]
320 async fn max_records_caps_output() {
321 let (sink, capture) = sink_with(StdoutSinkConfig::new().max_records(2));
322 let n = sink
323 .write_batch(&[json!({"id": 1}), json!({"id": 2}), json!({"id": 3})])
324 .await
325 .unwrap();
326 assert_eq!(n, 2);
327 assert_eq!(capture.as_str().lines().count(), 2);
328 let n2 = sink.write_batch(&[json!({"id": 4})]).await.unwrap();
330 assert_eq!(n2, 0);
331 assert_eq!(capture.as_str().lines().count(), 2);
332 }
333
334 #[tokio::test]
335 async fn flush_per_record_flushes_after_each() {
336 let (sink, capture) = sink_with(StdoutSinkConfig::new().flush_per_record(true));
337 sink.write_batch(&[json!({"id": 1}), json!({"id": 2})])
338 .await
339 .unwrap();
340 assert_eq!(capture.flushes(), 2);
341 }
342
343 #[tokio::test]
344 async fn batch_boundary_flush_only_on_explicit_flush() {
345 let (sink, capture) = sink_with(StdoutSinkConfig::new());
346 sink.write_batch(&[json!({"id": 1})]).await.unwrap();
347 assert_eq!(capture.flushes(), 0);
348 sink.flush().await.unwrap();
349 assert_eq!(capture.flushes(), 1);
350 }
351
352 #[tokio::test]
353 async fn broken_pipe_is_treated_as_clean_termination() {
354 let capture = CaptureWriter::fail_after(1);
356 let sink = StdoutSink::with_writer(StdoutSinkConfig::new(), Box::new(capture.clone()));
357 let n = sink
358 .write_batch(&[json!({"id": 1}), json!({"id": 2}), json!({"id": 3})])
359 .await
360 .unwrap();
361 assert_eq!(n, 1);
362 let n2 = sink.write_batch(&[json!({"id": 4})]).await.unwrap();
364 assert_eq!(n2, 0);
365 }
366
367 #[tokio::test]
368 async fn as_trait_object() {
369 let capture = CaptureWriter::default();
370 let sink: Box<dyn Sink> = Box::new(StdoutSink::with_writer(
371 StdoutSinkConfig::new(),
372 Box::new(capture.clone()),
373 ));
374 let n = sink.write_batch(&[json!({"id": 1})]).await.unwrap();
375 assert_eq!(n, 1);
376 assert!(capture.as_str().contains("\"id\":1"));
377 }
378
379 #[tokio::test]
380 async fn config_schema_is_well_formed_object() {
381 let sink = StdoutSink::new(StdoutSinkConfig::new());
382 let schema = sink.config_schema();
383 assert_eq!(schema["type"], "object");
384 assert!(schema["properties"].is_object());
385 }
386
387 #[tokio::test]
388 async fn check_always_passes() {
389 let sink = StdoutSink::new(StdoutSinkConfig::new());
390 let report = sink
391 .check(&faucet_core::check::CheckContext::default())
392 .await
393 .unwrap();
394 assert_eq!(report.failed_count(), 0);
395 assert_eq!(report.probes.len(), 1);
396 assert_eq!(report.probes[0].name, "io");
397 }
398}