Skip to main content

faucet_sink_csv/
config.rs

1//! CSV sink configuration.
2
3use faucet_core::DEFAULT_BATCH_SIZE;
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6
7/// Configuration for the CSV file sink.
8#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
9pub struct CsvSinkConfig {
10    /// Path to the output CSV file.
11    pub path: String,
12    /// Field delimiter byte. Defaults to `b','`.
13    #[serde(default = "default_delimiter")]
14    pub delimiter: u8,
15    /// Whether to write a header row. Defaults to `true`.
16    #[serde(default = "default_true")]
17    pub write_headers: bool,
18    /// Whether to append to an existing file. Defaults to `false` (truncates).
19    #[serde(default)]
20    pub append: bool,
21    /// Records per upstream [`StreamPage`](faucet_core::StreamPage). The CSV
22    /// sink writes rows to disk one at a time inside a `spawn_blocking` task
23    /// fronted by a buffered `csv::Writer`, so this field has **no
24    /// behavioural impact** at the sink — it is exposed purely for config
25    /// parity across every sink in the workspace. Defaults to
26    /// [`DEFAULT_BATCH_SIZE`].
27    ///
28    /// `batch_size = 0` (the "no batching" sentinel) and any positive value
29    /// produce byte-for-byte identical output for this sink: each record is
30    /// serialised and written individually regardless of how upstream chunked
31    /// the page.
32    #[serde(default = "default_batch_size")]
33    pub batch_size: usize,
34    /// Compression codec for the output file. Defaults to
35    /// [`CompressionConfig::Auto`](faucet_core::CompressionConfig::Auto) —
36    /// `.gz` / `.zst` suffix selects gzip / zstd. Requires the crate-local
37    /// `compression` feature.
38    #[cfg(feature = "compression")]
39    #[serde(default)]
40    pub compression: faucet_core::CompressionConfig,
41}
42
43fn default_delimiter() -> u8 {
44    b','
45}
46
47fn default_true() -> bool {
48    true
49}
50
51fn default_batch_size() -> usize {
52    DEFAULT_BATCH_SIZE
53}
54
55impl CsvSinkConfig {
56    /// Create a new config with the required file path and sensible defaults.
57    pub fn new(path: impl Into<String>) -> Self {
58        Self {
59            path: path.into(),
60            delimiter: b',',
61            write_headers: true,
62            append: false,
63            batch_size: DEFAULT_BATCH_SIZE,
64            #[cfg(feature = "compression")]
65            compression: faucet_core::CompressionConfig::Auto,
66        }
67    }
68
69    /// Set the field delimiter byte.
70    pub fn delimiter(mut self, d: u8) -> Self {
71        self.delimiter = d;
72        self
73    }
74
75    /// Set whether to write a header row.
76    pub fn write_headers(mut self, v: bool) -> Self {
77        self.write_headers = v;
78        self
79    }
80
81    /// Set whether to append to an existing file.
82    pub fn append(mut self, v: bool) -> Self {
83        self.append = v;
84        self
85    }
86
87    /// Set the per-page record count hint reported alongside other sink
88    /// configs.
89    ///
90    /// This sink writes per-record through a buffered writer, so the value is
91    /// observably a no-op: `0` (the "no batching" sentinel) and any positive
92    /// value produce the same on-disk output. Present for symmetry with sinks
93    /// whose `batch_size` does drive I/O sizing (e.g. SQL multi-row inserts,
94    /// BigQuery streaming inserts).
95    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
96        self.batch_size = batch_size;
97        self
98    }
99
100    /// Set the compression codec. Available only with the `compression` feature.
101    #[cfg(feature = "compression")]
102    pub fn compression(mut self, c: faucet_core::CompressionConfig) -> Self {
103        self.compression = c;
104        self
105    }
106}
107
108#[cfg(test)]
109mod tests {
110    use super::*;
111
112    #[test]
113    fn default_config() {
114        let config = CsvSinkConfig::new("/tmp/out.csv");
115        assert_eq!(config.path, "/tmp/out.csv");
116        assert_eq!(config.delimiter, b',');
117        assert!(config.write_headers);
118        assert!(!config.append);
119    }
120
121    #[test]
122    fn builder_methods() {
123        let config = CsvSinkConfig::new("/tmp/out.tsv")
124            .delimiter(b'\t')
125            .write_headers(false)
126            .append(true);
127        assert_eq!(config.delimiter, b'\t');
128        assert!(!config.write_headers);
129        assert!(config.append);
130    }
131
132    #[test]
133    fn batch_size_defaults_to_default_batch_size() {
134        let config = CsvSinkConfig::new("/tmp/out.csv");
135        assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
136    }
137
138    #[test]
139    fn with_batch_size_overrides_default() {
140        let config = CsvSinkConfig::new("/tmp/out.csv").with_batch_size(250);
141        assert_eq!(config.batch_size, 250);
142    }
143
144    #[test]
145    fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
146        let config = CsvSinkConfig::new("/tmp/out.csv").with_batch_size(0);
147        assert_eq!(config.batch_size, 0);
148        assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
149    }
150
151    #[test]
152    fn batch_size_above_max_is_rejected_by_validate_batch_size() {
153        let config =
154            CsvSinkConfig::new("/tmp/out.csv").with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
155        assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
156    }
157
158    #[test]
159    fn batch_size_deserializes_from_json() {
160        let json = r#"{
161            "path": "/tmp/out.csv",
162            "delimiter": 44,
163            "write_headers": true,
164            "append": false,
165            "batch_size": 500
166        }"#;
167        let config: CsvSinkConfig = serde_json::from_str(json).unwrap();
168        assert_eq!(config.batch_size, 500);
169    }
170
171    #[test]
172    fn batch_size_defaults_when_missing_in_json() {
173        let json = r#"{"path": "/tmp/out.csv"}"#;
174        let config: CsvSinkConfig = serde_json::from_str(json).unwrap();
175        assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
176    }
177}