Skip to main content

faucet_source_csv/
stream.rs

1//! CSV file source.
2
3use crate::config::CsvSourceConfig;
4use async_trait::async_trait;
5use faucet_core::{FaucetError, Stream, StreamPage};
6use serde_json::{Map, Value};
7use std::pin::Pin;
8
9/// A source that reads records from a CSV file.
10///
11/// Each row is returned as a JSON object. If the file has headers, the header
12/// names are used as keys. Otherwise, generated names (`column_0`, `column_1`,
13/// etc.) are used.
14pub struct CsvSource {
15    config: CsvSourceConfig,
16}
17
18impl CsvSource {
19    /// Create a new CSV source from the given configuration.
20    pub fn new(config: CsvSourceConfig) -> Self {
21        Self { config }
22    }
23}
24
25#[async_trait]
26impl faucet_core::Source for CsvSource {
27    async fn fetch_with_context(
28        &self,
29        context: &std::collections::HashMap<String, serde_json::Value>,
30    ) -> Result<Vec<Value>, FaucetError> {
31        use futures::StreamExt;
32        let mut all = Vec::new();
33        let mut s = self.stream_pages(context, self.config.batch_size);
34        while let Some(page) = s.next().await {
35            let page = page?;
36            all.extend(page.records);
37        }
38        Ok(all)
39    }
40
41    /// Stream rows from the CSV file without buffering the full result set.
42    ///
43    /// The file is opened via [`tokio::fs::File`] and parsed by `csv-async`,
44    /// a streaming RFC-4180 reader. Because it tracks quote state across
45    /// physical lines, **quoted fields containing embedded newlines are parsed
46    /// correctly** — the file written by `faucet-sink-csv` round-trips back
47    /// losslessly (#78/#40). Each emitted [`StreamPage`] holds up to
48    /// [`CsvSourceConfig::batch_size`] rows.
49    ///
50    /// The trait-level `batch_size` argument is ignored in favour of the
51    /// config field — the config is the user-facing knob the README documents,
52    /// and routing the pipeline-supplied hint through it would silently
53    /// override an explicit config value.
54    ///
55    /// `batch_size = 0` drains the entire file into a single page. The CSV
56    /// source has no incremental-replication mode, so every emitted page
57    /// carries `bookmark: None`.
58    fn stream_pages<'a>(
59        &'a self,
60        context: &'a std::collections::HashMap<String, Value>,
61        _batch_size: usize,
62    ) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>> {
63        let batch_size = self.config.batch_size;
64
65        Box::pin(async_stream::try_stream! {
66            use futures::StreamExt as _;
67
68            let mut config = self.config.clone();
69            if !context.is_empty() {
70                config.path = faucet_core::util::substitute_context(&config.path, context);
71            }
72
73            let file = tokio::fs::File::open(&config.path).await.map_err(|e| {
74                FaucetError::Config(format!(
75                    "failed to open CSV file '{}': {e}",
76                    config.path
77                ))
78            })?;
79            let reader = tokio::io::BufReader::new(file);
80            #[cfg(feature = "compression")]
81            let reader = {
82                let codec = config.compression.resolve(&config.path);
83                faucet_core::compression::warn_mismatch(&config.path, codec);
84                faucet_core::compression::wrap_async_reader(reader, codec)
85            };
86
87            // `csv-async` tracks quote state across physical lines, so a
88            // record whose field contains an embedded newline is read as one
89            // record. Headers are handled manually below, so the underlying
90            // reader is configured header-less.
91            let mut csv_reader = csv_async::AsyncReaderBuilder::new()
92                .has_headers(false)
93                .delimiter(config.delimiter)
94                .quote(config.quote)
95                // Tolerate uneven field counts (the connector has always been
96                // lenient — uneven rows must not abort the run).
97                .flexible(true)
98                .create_reader(reader);
99
100            let mut records = csv_reader.records();
101
102            // The first record is the header row when configured.
103            let headers: Vec<String> = if config.has_headers {
104                match records.next().await {
105                    Some(rec) => {
106                        let rec = rec.map_err(|e| FaucetError::Config(format!(
107                            "CSV header parse error in '{}': {e}", config.path
108                        )))?;
109                        rec.iter().map(|f| f.to_string()).collect()
110                    }
111                    None => Vec::new(),
112                }
113            } else {
114                Vec::new()
115            };
116
117            let chunk = if batch_size == 0 { usize::MAX } else { batch_size };
118            let initial_capacity = if batch_size == 0 { 1024 } else { batch_size };
119            let mut buffer: Vec<Value> = Vec::with_capacity(initial_capacity);
120            let mut total = 0usize;
121            let mut row_idx = 0usize;
122
123            while let Some(rec) = records.next().await {
124                let record = rec.map_err(|e| FaucetError::Config(format!(
125                    "CSV parse error at line {} in '{}': {e}",
126                    // Physical file line: `row_idx` is the 0-based *data* row, so
127                    // +1 for 1-based, and +1 more when a header row was consumed —
128                    // otherwise the first data row (file line 2) is misreported as 1.
129                    row_idx + 1 + usize::from(config.has_headers),
130                    config.path
131                )))?;
132
133                let mut obj = Map::new();
134                for (col_idx, field) in record.iter().enumerate() {
135                    let key = if col_idx < headers.len() {
136                        headers[col_idx].clone()
137                    } else {
138                        format!("column_{col_idx}")
139                    };
140                    obj.insert(key, Value::String(field.to_string()));
141                }
142                buffer.push(Value::Object(obj));
143                row_idx += 1;
144
145                if buffer.len() >= chunk {
146                    let page = std::mem::replace(&mut buffer, Vec::with_capacity(initial_capacity));
147                    total += page.len();
148                    yield StreamPage { records: page, bookmark: None };
149                }
150            }
151
152            if !buffer.is_empty() {
153                total += buffer.len();
154                yield StreamPage { records: buffer, bookmark: None };
155            }
156
157            tracing::info!(
158                rows = total,
159                batch_size,
160                path = %config.path,
161                "CSV source stream complete",
162            );
163        })
164    }
165
166    fn config_schema(&self) -> serde_json::Value {
167        serde_json::to_value(faucet_core::schema_for!(CsvSourceConfig))
168            .expect("schema serialization")
169    }
170}
171
172#[cfg(test)]
173mod tests {
174    use super::*;
175    use faucet_core::Source;
176    use std::io::Write;
177    use tempfile::NamedTempFile;
178
179    #[tokio::test]
180    async fn reads_csv_with_headers() {
181        let mut tmp = NamedTempFile::new().unwrap();
182        writeln!(tmp, "id,name,age").unwrap();
183        writeln!(tmp, "1,Alice,30").unwrap();
184        writeln!(tmp, "2,Bob,25").unwrap();
185        tmp.flush().unwrap();
186
187        let config = CsvSourceConfig::new(tmp.path().to_str().unwrap());
188        let source = CsvSource::new(config);
189        let records = source.fetch_all().await.unwrap();
190
191        assert_eq!(records.len(), 2);
192        assert_eq!(records[0]["id"], "1");
193        assert_eq!(records[0]["name"], "Alice");
194        assert_eq!(records[0]["age"], "30");
195        assert_eq!(records[1]["name"], "Bob");
196    }
197
198    #[tokio::test]
199    async fn reads_csv_without_headers() {
200        let mut tmp = NamedTempFile::new().unwrap();
201        writeln!(tmp, "Alice,30").unwrap();
202        writeln!(tmp, "Bob,25").unwrap();
203        tmp.flush().unwrap();
204
205        let config = CsvSourceConfig::new(tmp.path().to_str().unwrap()).has_headers(false);
206        let source = CsvSource::new(config);
207        let records = source.fetch_all().await.unwrap();
208
209        assert_eq!(records.len(), 2);
210        assert_eq!(records[0]["column_0"], "Alice");
211        assert_eq!(records[0]["column_1"], "30");
212    }
213
214    #[tokio::test]
215    async fn reads_tsv_with_custom_delimiter() {
216        let mut tmp = NamedTempFile::new().unwrap();
217        writeln!(tmp, "id\tname").unwrap();
218        writeln!(tmp, "1\tAlice").unwrap();
219        tmp.flush().unwrap();
220
221        let config = CsvSourceConfig::new(tmp.path().to_str().unwrap()).delimiter(b'\t');
222        let source = CsvSource::new(config);
223        let records = source.fetch_all().await.unwrap();
224
225        assert_eq!(records.len(), 1);
226        assert_eq!(records[0]["id"], "1");
227        assert_eq!(records[0]["name"], "Alice");
228    }
229
230    #[tokio::test]
231    async fn reads_quoted_field_with_embedded_newline() {
232        // Regression for #78/#40: a quoted field spanning multiple physical
233        // lines (as the CSV sink emits) must round-trip as one record.
234        let mut tmp = NamedTempFile::new().unwrap();
235        write!(tmp, "id,note\n1,\"line one\nline two\"\n2,\"plain\"\n").unwrap();
236        tmp.flush().unwrap();
237
238        let config = CsvSourceConfig::new(tmp.path().to_str().unwrap());
239        let source = CsvSource::new(config);
240        let records = source.fetch_all().await.unwrap();
241
242        assert_eq!(records.len(), 2);
243        assert_eq!(records[0]["id"], "1");
244        assert_eq!(records[0]["note"], "line one\nline two");
245        assert_eq!(records[1]["note"], "plain");
246    }
247
248    #[tokio::test]
249    async fn reads_quoted_field_with_embedded_delimiter() {
250        let mut tmp = NamedTempFile::new().unwrap();
251        write!(tmp, "id,name\n1,\"Doe, John\"\n").unwrap();
252        tmp.flush().unwrap();
253
254        let config = CsvSourceConfig::new(tmp.path().to_str().unwrap());
255        let source = CsvSource::new(config);
256        let records = source.fetch_all().await.unwrap();
257        assert_eq!(records.len(), 1);
258        assert_eq!(records[0]["name"], "Doe, John");
259    }
260
261    #[tokio::test]
262    async fn empty_csv_returns_empty_vec() {
263        let mut tmp = NamedTempFile::new().unwrap();
264        writeln!(tmp, "id,name").unwrap();
265        tmp.flush().unwrap();
266
267        let config = CsvSourceConfig::new(tmp.path().to_str().unwrap());
268        let source = CsvSource::new(config);
269        let records = source.fetch_all().await.unwrap();
270
271        assert!(records.is_empty());
272    }
273
274    #[tokio::test]
275    async fn missing_file_returns_error() {
276        let config = CsvSourceConfig::new("/nonexistent/path/data.csv");
277        let source = CsvSource::new(config);
278        let result = source.fetch_all().await;
279
280        assert!(result.is_err());
281    }
282
283    #[cfg(feature = "compression")]
284    #[tokio::test]
285    async fn roundtrip_gzip_via_stream_pages() {
286        use faucet_core::CompressionConfig;
287        let tmp = NamedTempFile::with_suffix(".csv.gz").unwrap();
288        let path = tmp.path().to_str().unwrap().to_string();
289        let plain = b"id,name\n1,Alice\n2,Bob\n";
290        let compressed =
291            faucet_core::compression::compress_buf(plain, faucet_core::Compression::Gzip).unwrap();
292        tokio::fs::write(&path, &compressed).await.unwrap();
293
294        let config = CsvSourceConfig::new(&path).compression(CompressionConfig::Auto);
295        let source = CsvSource::new(config);
296        let records = source.fetch_all().await.unwrap();
297        assert_eq!(records.len(), 2);
298        assert_eq!(records[0]["name"], "Alice");
299        assert_eq!(records[1]["name"], "Bob");
300    }
301
302    #[cfg(feature = "compression")]
303    #[tokio::test]
304    async fn roundtrip_zstd_via_stream_pages() {
305        use faucet_core::CompressionConfig;
306        let tmp = NamedTempFile::with_suffix(".csv.zst").unwrap();
307        let path = tmp.path().to_str().unwrap().to_string();
308        let plain = b"id,name\n1,Carol\n";
309        let compressed =
310            faucet_core::compression::compress_buf(plain, faucet_core::Compression::Zstd).unwrap();
311        tokio::fs::write(&path, &compressed).await.unwrap();
312
313        let config = CsvSourceConfig::new(&path).compression(CompressionConfig::Auto);
314        let source = CsvSource::new(config);
315        let records = source.fetch_all().await.unwrap();
316        assert_eq!(records.len(), 1);
317        assert_eq!(records[0]["name"], "Carol");
318    }
319}