use crate::config::{StdStream, StdoutFormat, StdoutSinkConfig};
use async_trait::async_trait;
use faucet_core::FaucetError;
use serde_json::Value;
use std::io;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::Mutex;
struct State {
writer: Box<dyn AsyncWrite + Unpin + Send>,
written: usize,
closed: bool,
}
pub struct StdoutSink {
config: StdoutSinkConfig,
state: Mutex<State>,
}
impl StdoutSink {
pub fn new(config: StdoutSinkConfig) -> Self {
let writer: Box<dyn AsyncWrite + Unpin + Send> = match config.destination {
StdStream::Stdout => Box::new(tokio::io::stdout()),
StdStream::Stderr => Box::new(tokio::io::stderr()),
};
Self::with_writer(config, writer)
}
pub fn with_writer(
config: StdoutSinkConfig,
writer: Box<dyn AsyncWrite + Unpin + Send>,
) -> Self {
Self {
config,
state: Mutex::new(State {
writer,
written: 0,
closed: false,
}),
}
}
fn encode(&self, record: &Value) -> Result<Vec<u8>, FaucetError> {
match self.config.format {
StdoutFormat::JsonLines => {
let mut bytes = serde_json::to_vec(record)
.map_err(|e| FaucetError::Sink(format!("JSON serialization failed: {e}")))?;
bytes.push(b'\n');
Ok(bytes)
}
StdoutFormat::PrettyJson => {
let mut bytes = serde_json::to_vec_pretty(record)
.map_err(|e| FaucetError::Sink(format!("JSON serialization failed: {e}")))?;
bytes.push(b'\n');
Ok(bytes)
}
StdoutFormat::Tsv => encode_tsv(record),
}
}
}
fn encode_tsv(record: &Value) -> Result<Vec<u8>, FaucetError> {
let obj = record.as_object().ok_or_else(|| {
FaucetError::Sink("Tsv format requires each record to be a JSON object".into())
})?;
let mut keys: Vec<&String> = obj.keys().collect();
keys.sort();
let mut line = String::new();
for (i, key) in keys.iter().enumerate() {
if i > 0 {
line.push('\t');
}
let value = &obj[*key];
line.push_str(&tsv_cell(value)?);
}
line.push('\n');
Ok(line.into_bytes())
}
fn tsv_cell(value: &Value) -> Result<String, FaucetError> {
Ok(match value {
Value::String(s) => s.replace(['\t', '\n', '\r'], " "),
Value::Null => String::new(),
Value::Bool(_) | Value::Number(_) => value.to_string(),
Value::Array(_) | Value::Object(_) => serde_json::to_string(value)
.map_err(|e| FaucetError::Sink(format!("JSON serialization failed: {e}")))?,
})
}
#[async_trait]
impl faucet_core::Sink for StdoutSink {
fn config_schema(&self) -> Value {
serde_json::to_value(faucet_core::schema_for!(StdoutSinkConfig))
.expect("schema serialization")
}
async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
if records.is_empty() {
return Ok(0);
}
let mut state = self.state.lock().await;
if state.closed {
return Ok(0);
}
let remaining = match self.config.max_records {
Some(max) => max.saturating_sub(state.written),
None => usize::MAX,
};
if remaining == 0 {
return Ok(0);
}
let take = records.len().min(remaining);
let mut written_this_call = 0usize;
for record in records.iter().take(take) {
let bytes = self.encode(record)?;
match state.writer.write_all(&bytes).await {
Ok(()) => {}
Err(e) if e.kind() == io::ErrorKind::BrokenPipe => {
state.closed = true;
tracing::debug!("stdout consumer closed pipe; stopping writes");
return Ok(written_this_call);
}
Err(e) => return Err(FaucetError::Sink(format!("write failed: {e}"))),
}
if self.config.flush_per_record {
state
.writer
.flush()
.await
.map_err(|e| FaucetError::Sink(format!("flush failed: {e}")))?;
}
state.written += 1;
written_this_call += 1;
}
Ok(written_this_call)
}
async fn flush(&self) -> Result<(), FaucetError> {
let mut state = self.state.lock().await;
state
.writer
.flush()
.await
.map_err(|e| FaucetError::Sink(format!("flush failed: {e}")))
}
async fn check(
&self,
_ctx: &faucet_core::check::CheckContext,
) -> Result<faucet_core::check::CheckReport, FaucetError> {
use faucet_core::check::{CheckReport, Probe};
Ok(CheckReport::single(Probe::pass(
"io",
std::time::Duration::ZERO,
)))
}
}
#[cfg(test)]
mod tests {
use super::*;
use faucet_core::Sink;
use serde_json::json;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::task::{Context, Poll};
use tokio::io::AsyncWrite;
#[derive(Clone, Default)]
struct CaptureWriter {
inner: Arc<StdMutex<CaptureInner>>,
}
#[derive(Default)]
struct CaptureInner {
bytes: Vec<u8>,
flushes: usize,
fail_after: Option<usize>,
writes: usize,
}
impl CaptureWriter {
fn fail_after(n: usize) -> Self {
let me = Self::default();
me.inner.lock().unwrap().fail_after = Some(n);
me
}
fn captured(&self) -> Vec<u8> {
self.inner.lock().unwrap().bytes.clone()
}
fn flushes(&self) -> usize {
self.inner.lock().unwrap().flushes
}
fn as_str(&self) -> String {
String::from_utf8(self.captured()).unwrap()
}
}
impl AsyncWrite for CaptureWriter {
fn poll_write(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let mut inner = self.inner.lock().unwrap();
inner.writes += 1;
if let Some(fail_after) = inner.fail_after
&& inner.writes > fail_after
{
return Poll::Ready(Err(io::Error::from(io::ErrorKind::BrokenPipe)));
}
inner.bytes.extend_from_slice(buf);
Poll::Ready(Ok(buf.len()))
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.inner.lock().unwrap().flushes += 1;
Poll::Ready(Ok(()))
}
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}
fn sink_with(config: StdoutSinkConfig) -> (StdoutSink, CaptureWriter) {
let writer = CaptureWriter::default();
let sink = StdoutSink::with_writer(config, Box::new(writer.clone()));
(sink, writer)
}
#[tokio::test]
async fn json_lines_emits_one_record_per_line() {
let (sink, capture) = sink_with(StdoutSinkConfig::new());
let records = vec![json!({"id": 1}), json!({"id": 2})];
let n = sink.write_batch(&records).await.unwrap();
assert_eq!(n, 2);
let out = capture.as_str();
let lines: Vec<&str> = out.lines().collect();
assert_eq!(lines.len(), 2);
assert_eq!(
serde_json::from_str::<Value>(lines[0]).unwrap(),
json!({"id": 1})
);
assert_eq!(
serde_json::from_str::<Value>(lines[1]).unwrap(),
json!({"id": 2})
);
}
#[tokio::test]
async fn pretty_json_indents_and_separates_records() {
let (sink, capture) = sink_with(StdoutSinkConfig::new().format(StdoutFormat::PrettyJson));
sink.write_batch(&[json!({"id": 1, "nested": {"k": "v"}})])
.await
.unwrap();
let out = capture.as_str();
assert!(out.contains(" \"id\": 1"));
assert!(out.contains(" \"nested\": {"));
assert!(out.ends_with('\n'));
}
#[tokio::test]
async fn tsv_emits_keys_sorted_with_tab_separators() {
let (sink, capture) = sink_with(StdoutSinkConfig::new().format(StdoutFormat::Tsv));
sink.write_batch(&[json!({"name": "alice", "id": 7, "tags": ["a","b"], "active": true})])
.await
.unwrap();
let out = capture.as_str();
let line = out.lines().next().unwrap();
let cells: Vec<&str> = line.split('\t').collect();
assert_eq!(cells, vec!["true", "7", "alice", r#"["a","b"]"#]);
}
#[tokio::test]
async fn tsv_replaces_tabs_and_newlines_in_string_values() {
let (sink, capture) = sink_with(StdoutSinkConfig::new().format(StdoutFormat::Tsv));
sink.write_batch(&[json!({"a": "tab\there\nand-newline"})])
.await
.unwrap();
let out = capture.as_str();
let line = out.lines().next().unwrap();
assert_eq!(line, "tab here and-newline");
}
#[tokio::test]
async fn tsv_rejects_non_object_records() {
let (sink, _capture) = sink_with(StdoutSinkConfig::new().format(StdoutFormat::Tsv));
let result = sink.write_batch(&[json!([1, 2, 3])]).await;
assert!(matches!(result, Err(FaucetError::Sink(_))));
}
#[tokio::test]
async fn empty_batch_returns_zero() {
let (sink, _capture) = sink_with(StdoutSinkConfig::new());
let n = sink.write_batch(&[]).await.unwrap();
assert_eq!(n, 0);
}
#[tokio::test]
async fn max_records_caps_output() {
let (sink, capture) = sink_with(StdoutSinkConfig::new().max_records(2));
let n = sink
.write_batch(&[json!({"id": 1}), json!({"id": 2}), json!({"id": 3})])
.await
.unwrap();
assert_eq!(n, 2);
assert_eq!(capture.as_str().lines().count(), 2);
let n2 = sink.write_batch(&[json!({"id": 4})]).await.unwrap();
assert_eq!(n2, 0);
assert_eq!(capture.as_str().lines().count(), 2);
}
#[tokio::test]
async fn flush_per_record_flushes_after_each() {
let (sink, capture) = sink_with(StdoutSinkConfig::new().flush_per_record(true));
sink.write_batch(&[json!({"id": 1}), json!({"id": 2})])
.await
.unwrap();
assert_eq!(capture.flushes(), 2);
}
#[tokio::test]
async fn batch_boundary_flush_only_on_explicit_flush() {
let (sink, capture) = sink_with(StdoutSinkConfig::new());
sink.write_batch(&[json!({"id": 1})]).await.unwrap();
assert_eq!(capture.flushes(), 0);
sink.flush().await.unwrap();
assert_eq!(capture.flushes(), 1);
}
#[tokio::test]
async fn broken_pipe_is_treated_as_clean_termination() {
let capture = CaptureWriter::fail_after(1);
let sink = StdoutSink::with_writer(StdoutSinkConfig::new(), Box::new(capture.clone()));
let n = sink
.write_batch(&[json!({"id": 1}), json!({"id": 2}), json!({"id": 3})])
.await
.unwrap();
assert_eq!(n, 1);
let n2 = sink.write_batch(&[json!({"id": 4})]).await.unwrap();
assert_eq!(n2, 0);
}
#[tokio::test]
async fn as_trait_object() {
let capture = CaptureWriter::default();
let sink: Box<dyn Sink> = Box::new(StdoutSink::with_writer(
StdoutSinkConfig::new(),
Box::new(capture.clone()),
));
let n = sink.write_batch(&[json!({"id": 1})]).await.unwrap();
assert_eq!(n, 1);
assert!(capture.as_str().contains("\"id\":1"));
}
#[tokio::test]
async fn config_schema_is_well_formed_object() {
let sink = StdoutSink::new(StdoutSinkConfig::new());
let schema = sink.config_schema();
assert_eq!(schema["type"], "object");
assert!(schema["properties"].is_object());
}
#[tokio::test]
async fn check_always_passes() {
let sink = StdoutSink::new(StdoutSinkConfig::new());
let report = sink
.check(&faucet_core::check::CheckContext::default())
.await
.unwrap();
assert_eq!(report.failed_count(), 0);
assert_eq!(report.probes.len(), 1);
assert_eq!(report.probes[0].name, "io");
}
}