Skip to main content

faucet_source_csv/
config.rs

1//! CSV source configuration.
2
3use faucet_core::DEFAULT_BATCH_SIZE;
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6
7/// Configuration for the CSV file source.
8#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
9pub struct CsvSourceConfig {
10    /// Path to the CSV file.
11    pub path: String,
12    /// Whether the file has a header row. Defaults to `true`.
13    #[serde(default = "default_true")]
14    pub has_headers: bool,
15    /// Field delimiter byte. Defaults to `b','`.
16    #[serde(default = "default_delimiter")]
17    pub delimiter: u8,
18    /// Quote character byte. Defaults to `b'"'`.
19    #[serde(default = "default_quote")]
20    pub quote: u8,
21    /// Records per emitted [`StreamPage`](faucet_core::StreamPage). Rows are
22    /// parsed line-by-line from a tokio `BufReader` and yielded whenever the
23    /// buffer reaches this size. Defaults to [`DEFAULT_BATCH_SIZE`].
24    ///
25    /// `batch_size = 0` is the "no batching" sentinel: the file is fully
26    /// drained and the entire result set is emitted in a single page. Useful
27    /// for small lookup tables or for sinks (e.g. SQL `COPY`, BigQuery load
28    /// jobs) that prefer one large request to many small ones.
29    #[serde(default = "default_batch_size")]
30    pub batch_size: usize,
31    /// Compression codec for the input file. Defaults to
32    /// [`CompressionConfig::Auto`](faucet_core::CompressionConfig::Auto) —
33    /// `.gz` / `.zst` suffix selects gzip / zstd. Requires the crate-local
34    /// `compression` feature.
35    #[cfg(feature = "compression")]
36    #[serde(default)]
37    pub compression: faucet_core::CompressionConfig,
38}
39
40fn default_true() -> bool {
41    true
42}
43
44fn default_delimiter() -> u8 {
45    b','
46}
47
48fn default_quote() -> u8 {
49    b'"'
50}
51
52fn default_batch_size() -> usize {
53    DEFAULT_BATCH_SIZE
54}
55
56impl CsvSourceConfig {
57    /// Create a new config with the required file path and sensible defaults.
58    pub fn new(path: impl Into<String>) -> Self {
59        Self {
60            path: path.into(),
61            has_headers: true,
62            delimiter: b',',
63            quote: b'"',
64            batch_size: DEFAULT_BATCH_SIZE,
65            #[cfg(feature = "compression")]
66            compression: faucet_core::CompressionConfig::Auto,
67        }
68    }
69
70    /// Set whether the file has a header row.
71    pub fn has_headers(mut self, v: bool) -> Self {
72        self.has_headers = v;
73        self
74    }
75
76    /// Set the field delimiter byte.
77    pub fn delimiter(mut self, d: u8) -> Self {
78        self.delimiter = d;
79        self
80    }
81
82    /// Set the quote character byte.
83    pub fn quote(mut self, q: u8) -> Self {
84        self.quote = q;
85        self
86    }
87
88    /// Set the per-page row count for [`Source::stream_pages`](faucet_core::Source::stream_pages).
89    ///
90    /// Pass `0` to opt out of batching — the entire file is emitted in a
91    /// single [`StreamPage`](faucet_core::StreamPage).
92    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
93        self.batch_size = batch_size;
94        self
95    }
96
97    /// Set the compression codec. Available only with the `compression` feature.
98    #[cfg(feature = "compression")]
99    pub fn compression(mut self, c: faucet_core::CompressionConfig) -> Self {
100        self.compression = c;
101        self
102    }
103}
104
105#[cfg(test)]
106mod tests {
107    use super::*;
108
109    #[test]
110    fn default_config() {
111        let config = CsvSourceConfig::new("/tmp/data.csv");
112        assert_eq!(config.path, "/tmp/data.csv");
113        assert!(config.has_headers);
114        assert_eq!(config.delimiter, b',');
115        assert_eq!(config.quote, b'"');
116    }
117
118    #[test]
119    fn builder_methods() {
120        let config = CsvSourceConfig::new("/tmp/data.tsv")
121            .has_headers(false)
122            .delimiter(b'\t')
123            .quote(b'\'');
124        assert!(!config.has_headers);
125        assert_eq!(config.delimiter, b'\t');
126        assert_eq!(config.quote, b'\'');
127    }
128
129    #[test]
130    fn batch_size_defaults_to_default_batch_size() {
131        let config = CsvSourceConfig::new("/tmp/data.csv");
132        assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
133    }
134
135    #[test]
136    fn with_batch_size_overrides_default() {
137        let config = CsvSourceConfig::new("/tmp/data.csv").with_batch_size(500);
138        assert_eq!(config.batch_size, 500);
139    }
140
141    #[test]
142    fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
143        let config = CsvSourceConfig::new("/tmp/data.csv").with_batch_size(0);
144        assert_eq!(config.batch_size, 0);
145        assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
146    }
147
148    #[test]
149    fn batch_size_above_max_is_rejected_by_validate_batch_size() {
150        let config =
151            CsvSourceConfig::new("/tmp/data.csv").with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
152        assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
153    }
154
155    #[test]
156    fn batch_size_deserializes_from_json() {
157        let json = r#"{
158            "path": "/tmp/data.csv",
159            "batch_size": 250
160        }"#;
161        let config: CsvSourceConfig = serde_json::from_str(json).unwrap();
162        assert_eq!(config.batch_size, 250);
163    }
164}