Skip to main content

faucet_sink_stdout/
config.rs

1//! Stdout/stderr sink configuration.
2
3use faucet_core::DEFAULT_BATCH_SIZE;
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6
7/// Which standard stream to write records to.
8#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
9#[serde(rename_all = "lowercase")]
10pub enum StdStream {
11    /// Standard output (default). Honors shell redirection.
12    #[default]
13    Stdout,
14    /// Standard error. Useful when stdout is reserved for piping pipeline output.
15    Stderr,
16}
17
18/// How each record should be serialized before writing.
19#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
20#[serde(rename_all = "snake_case")]
21pub enum StdoutFormat {
22    /// One compact JSON object per line (default — matches JSONL format).
23    #[default]
24    JsonLines,
25    /// Indented JSON, separated by newlines. Easier to read but not a single-line format.
26    PrettyJson,
27    /// Tab-separated values, with each record's keys sorted alphabetically.
28    /// Scalars are emitted as-is; nested objects/arrays are emitted as compact JSON.
29    Tsv,
30}
31
32/// Configuration for the stdout/stderr sink.
33#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
34pub struct StdoutSinkConfig {
35    /// Which standard stream to write to.
36    #[serde(default)]
37    pub destination: StdStream,
38    /// Output format.
39    #[serde(default)]
40    pub format: StdoutFormat,
41    /// Flush the underlying writer after every record instead of at batch boundaries.
42    /// Tradeoff: lower latency for live preview, slightly lower throughput.
43    #[serde(default)]
44    pub flush_per_record: bool,
45    /// Stop writing after this many records have been emitted. Subsequent
46    /// `write_batch` calls become no-ops. `None` means unlimited.
47    #[serde(default)]
48    pub max_records: Option<usize>,
49    /// Records per upstream [`StreamPage`](faucet_core::StreamPage). The
50    /// stdout sink writes records to the chosen standard stream one at a time
51    /// through a buffered writer, so this field has **no behavioural impact**
52    /// at the sink — it is exposed purely for config parity across every sink
53    /// in the workspace. Defaults to [`DEFAULT_BATCH_SIZE`].
54    ///
55    /// `batch_size = 0` (the "no batching" sentinel) and any positive value
56    /// produce byte-for-byte identical output for this sink: each record is
57    /// serialised and written individually regardless of how upstream chunked
58    /// the page.
59    #[serde(default = "default_batch_size")]
60    pub batch_size: usize,
61}
62
63fn default_batch_size() -> usize {
64    DEFAULT_BATCH_SIZE
65}
66
67impl Default for StdoutSinkConfig {
68    fn default() -> Self {
69        Self {
70            destination: StdStream::default(),
71            format: StdoutFormat::default(),
72            flush_per_record: false,
73            max_records: None,
74            batch_size: DEFAULT_BATCH_SIZE,
75        }
76    }
77}
78
79impl StdoutSinkConfig {
80    /// Create a new config with all defaults (stdout, JSON Lines, no limit).
81    pub fn new() -> Self {
82        Self::default()
83    }
84
85    /// Send records to the given standard stream.
86    pub fn destination(mut self, destination: StdStream) -> Self {
87        self.destination = destination;
88        self
89    }
90
91    /// Choose the output format.
92    pub fn format(mut self, format: StdoutFormat) -> Self {
93        self.format = format;
94        self
95    }
96
97    /// Flush after every record.
98    pub fn flush_per_record(mut self, flush_per_record: bool) -> Self {
99        self.flush_per_record = flush_per_record;
100        self
101    }
102
103    /// Stop after writing `n` records total.
104    pub fn max_records(mut self, max_records: usize) -> Self {
105        self.max_records = Some(max_records);
106        self
107    }
108
109    /// Set the per-page record count hint reported alongside other sink
110    /// configs.
111    ///
112    /// This sink writes per-record through a buffered writer, so the value is
113    /// observably a no-op: `0` (the "no batching" sentinel) and any positive
114    /// value produce the same stdout/stderr output. Present for symmetry with
115    /// sinks whose `batch_size` does drive I/O sizing (e.g. SQL multi-row
116    /// inserts, BigQuery streaming inserts).
117    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
118        self.batch_size = batch_size;
119        self
120    }
121}
122
123#[cfg(test)]
124mod tests {
125    use super::*;
126
127    #[test]
128    fn defaults() {
129        let c = StdoutSinkConfig::new();
130        assert_eq!(c.destination, StdStream::Stdout);
131        assert_eq!(c.format, StdoutFormat::JsonLines);
132        assert!(!c.flush_per_record);
133        assert!(c.max_records.is_none());
134    }
135
136    #[test]
137    fn builder_chains() {
138        let c = StdoutSinkConfig::new()
139            .destination(StdStream::Stderr)
140            .format(StdoutFormat::PrettyJson)
141            .flush_per_record(true)
142            .max_records(10);
143        assert_eq!(c.destination, StdStream::Stderr);
144        assert_eq!(c.format, StdoutFormat::PrettyJson);
145        assert!(c.flush_per_record);
146        assert_eq!(c.max_records, Some(10));
147    }
148
149    #[test]
150    fn serde_round_trip() {
151        let c = StdoutSinkConfig::new()
152            .destination(StdStream::Stderr)
153            .format(StdoutFormat::Tsv);
154        let json = serde_json::to_string(&c).unwrap();
155        let back: StdoutSinkConfig = serde_json::from_str(&json).unwrap();
156        assert_eq!(back.destination, StdStream::Stderr);
157        assert_eq!(back.format, StdoutFormat::Tsv);
158    }
159
160    #[test]
161    fn deserialize_from_minimal_json() {
162        let c: StdoutSinkConfig = serde_json::from_str("{}").unwrap();
163        assert_eq!(c.destination, StdStream::Stdout);
164        assert_eq!(c.format, StdoutFormat::JsonLines);
165    }
166
167    #[test]
168    fn batch_size_defaults_to_default_batch_size() {
169        let c = StdoutSinkConfig::new();
170        assert_eq!(c.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
171    }
172
173    #[test]
174    fn with_batch_size_overrides_default() {
175        let c = StdoutSinkConfig::new().with_batch_size(250);
176        assert_eq!(c.batch_size, 250);
177    }
178
179    #[test]
180    fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
181        let c = StdoutSinkConfig::new().with_batch_size(0);
182        assert_eq!(c.batch_size, 0);
183        assert!(faucet_core::validate_batch_size(c.batch_size).is_ok());
184    }
185
186    #[test]
187    fn batch_size_above_max_is_rejected_by_validate_batch_size() {
188        let c = StdoutSinkConfig::new().with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
189        assert!(faucet_core::validate_batch_size(c.batch_size).is_err());
190    }
191
192    #[test]
193    fn batch_size_deserializes_from_json() {
194        let json = r#"{
195            "destination": "stdout",
196            "format": "json_lines",
197            "batch_size": 500
198        }"#;
199        let c: StdoutSinkConfig = serde_json::from_str(json).unwrap();
200        assert_eq!(c.batch_size, 500);
201    }
202
203    #[test]
204    fn batch_size_defaults_when_missing_in_json() {
205        let c: StdoutSinkConfig = serde_json::from_str("{}").unwrap();
206        assert_eq!(c.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
207    }
208}