Skip to main content

faucet_sink_csv/
sink.rs

1//! CSV file sink.
2
3use crate::config::CsvSinkConfig;
4use async_trait::async_trait;
5use faucet_core::FaucetError;
6use serde_json::Value;
7use std::fs::OpenOptions;
8use std::sync::Mutex;
9
10/// The inner writer the CSV serializer writes into. With compression enabled
11/// it is a [`SyncCompressWriter`](faucet_core::compression::SyncCompressWriter)
12/// that retains the concrete encoder so `finish()` errors surface on flush
13/// (#78/#41); otherwise it is the raw file.
14#[cfg(feature = "compression")]
15type SinkWriter = faucet_core::compression::SyncCompressWriter<std::fs::File>;
16#[cfg(not(feature = "compression"))]
17type SinkWriter = std::fs::File;
18
19/// State for the CSV writer, including the determined column order.
20struct WriterState {
21    writer: csv::Writer<SinkWriter>,
22    columns: Vec<String>,
23}
24
25/// A sink that writes JSON records to a CSV file.
26///
27/// Column order is the union of keys across the records of the first
28/// `write_batch` call, in first-seen order (so a field present only in a later
29/// record of that batch is still captured). Subsequent records use the same
30/// column order; missing fields are written as empty strings.
31///
32/// [`Sink::flush`](faucet_core::Sink::flush) finalises the encoder (writes the trailer) and clears the
33/// writer slot — a subsequent `write_batch` reopens the file in append mode
34/// (independent of `config.append`) and starts a fresh encoder. This makes
35/// the per-page `flush` the pipeline emits for bookmarked pages safe for CDC
36/// sources — every transaction appends rather than truncates.
37pub struct CsvSink {
38    config: CsvSinkConfig,
39    state: Mutex<Option<WriterState>>,
40    /// Tracks whether the file has been opened at least once.
41    /// On re-opens (after `flush()` clears the writer), we always use
42    /// append mode regardless of `config.append` so the new gzip / zstd
43    /// member appends instead of truncating the file. Without this, the
44    /// pipeline's per-bookmark flush would silently lose data when
45    /// `config.append = false` (the default).
46    opened_once: std::sync::atomic::AtomicBool,
47}
48
49impl CsvSink {
50    /// Create a new CSV sink. The file is opened on the first `write_batch` call.
51    pub fn new(config: CsvSinkConfig) -> Self {
52        Self {
53            config,
54            state: Mutex::new(None),
55            opened_once: std::sync::atomic::AtomicBool::new(false),
56        }
57    }
58
59    /// Convert a JSON value to a string suitable for a CSV field.
60    fn value_to_csv_field(value: &Value) -> String {
61        match value {
62            Value::Null => String::new(),
63            Value::String(s) => s.clone(),
64            Value::Bool(b) => b.to_string(),
65            Value::Number(n) => n.to_string(),
66            // For nested objects/arrays, serialize as JSON.
67            other => other.to_string(),
68        }
69    }
70}
71
72#[async_trait]
73impl faucet_core::Sink for CsvSink {
74    fn config_schema(&self) -> serde_json::Value {
75        serde_json::to_value(faucet_core::schema_for!(CsvSinkConfig)).expect("schema serialization")
76    }
77
78    async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
79        if records.is_empty() {
80            return Ok(0);
81        }
82
83        let config = self.config.clone();
84        let records: Vec<Value> = records.to_vec();
85
86        // Extract state from the mutex before entering the blocking task.
87        // This avoids holding the MutexGuard across an await point.
88        let current_state = {
89            let mut guard = self
90                .state
91                .lock()
92                .map_err(|e| FaucetError::Sink(format!("CSV sink lock poisoned: {e}")))?;
93            guard.take()
94        };
95
96        let opened_before = self.opened_once.load(std::sync::atomic::Ordering::Relaxed);
97
98        let result = tokio::task::spawn_blocking(move || {
99            write_csv_blocking(config, current_state, &records, opened_before)
100        })
101        .await
102        .map_err(|e| FaucetError::Sink(format!("CSV write task failed: {e}")))?;
103
104        let (new_state, count) = result?;
105
106        // Mark opened. From now on, re-opens (after flush) use append mode.
107        self.opened_once
108            .store(true, std::sync::atomic::Ordering::Relaxed);
109
110        // Put the state back.
111        {
112            let mut guard = self
113                .state
114                .lock()
115                .map_err(|e| FaucetError::Sink(format!("CSV sink lock poisoned: {e}")))?;
116            *guard = Some(new_state);
117        }
118
119        Ok(count)
120    }
121
122    async fn flush(&self) -> Result<(), FaucetError> {
123        // Take the state out of the mutex so we can move it into a blocking
124        // task. Replacing it with None means the next write_batch reopens
125        // the file in append mode — for compressed output this starts a
126        // fresh gzip/zstd member, which decoders read back transparently.
127        let state = {
128            let mut guard = self
129                .state
130                .lock()
131                .map_err(|e| FaucetError::Sink(format!("CSV sink lock poisoned: {e}")))?;
132            guard.take()
133        };
134        if let Some(state) = state {
135            tokio::task::spawn_blocking(move || -> Result<(), FaucetError> {
136                let WriterState { writer, .. } = state;
137                // Flush the csv serializer's buffer and recover the inner
138                // writer so the compression encoder can be finalised with its
139                // error captured, rather than swallowed on drop (#78/#41).
140                let inner = writer
141                    .into_inner()
142                    .map_err(|e| FaucetError::Sink(format!("CSV flush failed: {e}")))?;
143                #[cfg(feature = "compression")]
144                {
145                    // Writes the gzip/zstd trailer and surfaces any I/O error.
146                    inner.finish().map_err(|e| {
147                        FaucetError::Sink(format!("CSV compression finalise failed: {e}"))
148                    })?;
149                }
150                #[cfg(not(feature = "compression"))]
151                {
152                    let mut f = inner;
153                    std::io::Write::flush(&mut f)
154                        .map_err(|e| FaucetError::Sink(format!("CSV flush failed: {e}")))?;
155                }
156                Ok(())
157            })
158            .await
159            .map_err(|e| FaucetError::Sink(format!("CSV flush task failed: {e}")))??;
160        }
161        Ok(())
162    }
163
164    /// Preflight probe for `faucet doctor`. Verifies the configured output
165    /// path's parent directory exists and is writable by creating, then
166    /// immediately removing, a uniquely-named temp file there. Never touches
167    /// the user's actual output file, so it is fully idempotent.
168    async fn check(
169        &self,
170        _ctx: &faucet_core::check::CheckContext,
171    ) -> Result<faucet_core::check::CheckReport, FaucetError> {
172        use faucet_core::check::CheckReport;
173        let path = self.config.path.clone();
174        // The filesystem probe is synchronous; run it on a blocking thread to
175        // stay off the async runtime, matching how the sink does its I/O.
176        let probe = tokio::task::spawn_blocking(move || {
177            crate::probe::probe_parent_writable(&path, std::time::Instant::now())
178        })
179        .await
180        .map_err(|e| FaucetError::Sink(format!("CSV check task failed: {e}")))?;
181        Ok(CheckReport::single(probe))
182    }
183}
184
185/// Synchronous CSV writing logic, run inside `spawn_blocking`.
186fn write_csv_blocking(
187    config: CsvSinkConfig,
188    existing_state: Option<WriterState>,
189    records: &[Value],
190    opened_before: bool,
191) -> Result<(WriterState, usize), FaucetError> {
192    let mut state = match existing_state {
193        Some(s) => s,
194        None => {
195            // Determine columns from the UNION of keys across the first batch's
196            // records, in first-seen order — not just `records[0]`. Otherwise a
197            // field present only in a later record of the first batch would be
198            // absent from the header and silently dropped from every row (audit
199            // #146 H2). (A later flush-segment cannot change the already-written
200            // header — that is a separate, documented limitation.)
201            let mut columns: Vec<String> = Vec::new();
202            let mut seen: std::collections::HashSet<&str> = std::collections::HashSet::new();
203            for record in records {
204                match record {
205                    Value::Object(map) => {
206                        for k in map.keys() {
207                            if seen.insert(k.as_str()) {
208                                columns.push(k.clone());
209                            }
210                        }
211                    }
212                    _ => {
213                        return Err(FaucetError::Sink(
214                            "CSV sink expects JSON objects, got non-object record".into(),
215                        ));
216                    }
217                }
218            }
219
220            // First open obeys `config.append`. Re-opens (after flush()
221            // cleared the writer) always append, so flush-then-write
222            // sequences do not truncate previously-written data.
223            let (append, truncate) = if opened_before {
224                (true, false)
225            } else {
226                (config.append, !config.append)
227            };
228
229            if let Some(parent) = std::path::Path::new(&config.path).parent()
230                && !parent.as_os_str().is_empty()
231            {
232                std::fs::create_dir_all(parent).map_err(|e| {
233                    FaucetError::Sink(format!(
234                        "failed to create parent directory '{}': {e}",
235                        parent.display()
236                    ))
237                })?;
238            }
239            let file = OpenOptions::new()
240                .create(true)
241                .write(true)
242                .append(append)
243                .truncate(truncate)
244                .open(&config.path)
245                .map_err(|e| {
246                    FaucetError::Sink(format!("failed to open CSV file '{}': {e}", config.path))
247                })?;
248
249            #[cfg(feature = "compression")]
250            let inner: SinkWriter = {
251                let codec = config.compression.resolve(&config.path);
252                faucet_core::compression::warn_mismatch(&config.path, codec);
253                faucet_core::compression::sync_compress_writer(file, codec)
254            };
255            #[cfg(not(feature = "compression"))]
256            let inner: SinkWriter = file;
257
258            let mut writer = csv::WriterBuilder::new()
259                .delimiter(config.delimiter)
260                .from_writer(inner);
261
262            // Write header row if configured and this is the first open.
263            if config.write_headers && !append {
264                writer
265                    .write_record(&columns)
266                    .map_err(|e| FaucetError::Sink(format!("failed to write CSV headers: {e}")))?;
267            }
268
269            WriterState { writer, columns }
270        }
271    };
272
273    let mut count = 0;
274    for record in records {
275        let row: Vec<String> = state
276            .columns
277            .iter()
278            .map(|col| {
279                record
280                    .get(col)
281                    .map(CsvSink::value_to_csv_field)
282                    .unwrap_or_default()
283            })
284            .collect();
285
286        state
287            .writer
288            .write_record(&row)
289            .map_err(|e| FaucetError::Sink(format!("CSV write error: {e}")))?;
290        count += 1;
291    }
292
293    tracing::debug!(records = count, path = %config.path, "CSV batch written");
294
295    Ok((state, count))
296}
297
298#[cfg(test)]
299mod tests {
300    use super::*;
301    use faucet_core::Sink;
302    use serde_json::json;
303    use tempfile::NamedTempFile;
304
305    #[tokio::test]
306    async fn writes_csv_records() {
307        let tmp = NamedTempFile::new().unwrap();
308        let path = tmp.path().to_str().unwrap().to_string();
309        let sink = CsvSink::new(CsvSinkConfig::new(&path));
310
311        let records = vec![
312            json!({"id": 1, "name": "Alice"}),
313            json!({"id": 2, "name": "Bob"}),
314        ];
315        let count = sink.write_batch(&records).await.unwrap();
316        sink.flush().await.unwrap();
317
318        assert_eq!(count, 2);
319
320        let content = tokio::fs::read_to_string(&path).await.unwrap();
321        let lines: Vec<&str> = content.trim().split('\n').collect();
322        // Header + 2 data rows.
323        assert_eq!(lines.len(), 3);
324    }
325
326    #[tokio::test]
327    async fn columns_union_across_first_batch_not_just_first_record() {
328        // H2 (audit #146): column order is the union of keys across the first
329        // batch's records. The first record lacks `email`; before the fix the
330        // header was fixed from record 0 and `email` was silently dropped from
331        // every row. After the fix `email` is a column and the second record's
332        // value appears (the first row leaves it empty).
333        let tmp = NamedTempFile::new().unwrap();
334        let path = tmp.path().to_str().unwrap().to_string();
335        let sink = CsvSink::new(CsvSinkConfig::new(&path));
336
337        let records = vec![
338            json!({ "id": 1, "name": "Alice" }),
339            json!({ "id": 2, "name": "Bob", "email": "bob@x.y" }),
340        ];
341        sink.write_batch(&records).await.unwrap();
342        sink.flush().await.unwrap();
343
344        let content = tokio::fs::read_to_string(&path).await.unwrap();
345        let lines: Vec<&str> = content.trim().split('\n').collect();
346        assert_eq!(lines.len(), 3, "header + 2 rows");
347        assert!(
348            lines[0].contains("email"),
349            "header must include the later-record-only column: {}",
350            lines[0]
351        );
352        // Row 2 carries the email value; row 1 leaves it empty.
353        assert!(
354            lines[2].contains("bob@x.y"),
355            "second row must carry the unioned column value: {}",
356            lines[2]
357        );
358    }
359
360    #[tokio::test]
361    async fn writes_csv_without_headers() {
362        let tmp = NamedTempFile::new().unwrap();
363        let path = tmp.path().to_str().unwrap().to_string();
364        let sink = CsvSink::new(CsvSinkConfig::new(&path).write_headers(false));
365
366        let records = vec![json!({"a": "1", "b": "2"})];
367        sink.write_batch(&records).await.unwrap();
368        sink.flush().await.unwrap();
369
370        let content = tokio::fs::read_to_string(&path).await.unwrap();
371        let lines: Vec<&str> = content.trim().split('\n').collect();
372        // No header, just 1 data row.
373        assert_eq!(lines.len(), 1);
374    }
375
376    #[tokio::test]
377    async fn empty_batch_returns_zero() {
378        let tmp = NamedTempFile::new().unwrap();
379        let path = tmp.path().to_str().unwrap().to_string();
380        let sink = CsvSink::new(CsvSinkConfig::new(&path));
381        let count = sink.write_batch(&[]).await.unwrap();
382        assert_eq!(count, 0);
383    }
384
385    #[tokio::test]
386    async fn multiple_batches_accumulate() {
387        let tmp = NamedTempFile::new().unwrap();
388        let path = tmp.path().to_str().unwrap().to_string();
389        let sink = CsvSink::new(CsvSinkConfig::new(&path));
390
391        sink.write_batch(&[json!({"x": "1"})]).await.unwrap();
392        sink.write_batch(&[json!({"x": "2"}), json!({"x": "3"})])
393            .await
394            .unwrap();
395        sink.flush().await.unwrap();
396
397        let content = tokio::fs::read_to_string(&path).await.unwrap();
398        let lines: Vec<&str> = content.trim().split('\n').collect();
399        // Header + 3 data rows.
400        assert_eq!(lines.len(), 4);
401    }
402
403    #[tokio::test]
404    async fn missing_fields_written_as_empty() {
405        let tmp = NamedTempFile::new().unwrap();
406        let path = tmp.path().to_str().unwrap().to_string();
407        let sink = CsvSink::new(CsvSinkConfig::new(&path));
408
409        let records = vec![
410            json!({"a": "1", "b": "2"}),
411            json!({"a": "3"}), // missing "b"
412        ];
413        sink.write_batch(&records).await.unwrap();
414        sink.flush().await.unwrap();
415
416        let content = tokio::fs::read_to_string(&path).await.unwrap();
417        let lines: Vec<&str> = content.trim().split('\n').collect();
418        assert_eq!(lines.len(), 3); // header + 2 rows
419    }
420
421    #[tokio::test]
422    async fn value_to_csv_field_handles_types() {
423        assert_eq!(CsvSink::value_to_csv_field(&json!(null)), "");
424        assert_eq!(CsvSink::value_to_csv_field(&json!("hello")), "hello");
425        assert_eq!(CsvSink::value_to_csv_field(&json!(42)), "42");
426        assert_eq!(CsvSink::value_to_csv_field(&json!(true)), "true");
427        assert_eq!(CsvSink::value_to_csv_field(&json!(2.72)), "2.72");
428    }
429
430    #[tokio::test]
431    async fn flush_without_write_is_noop() {
432        let tmp = NamedTempFile::new().unwrap();
433        let path = tmp.path().to_str().unwrap().to_string();
434        let sink = CsvSink::new(CsvSinkConfig::new(&path));
435        assert!(sink.flush().await.is_ok());
436    }
437
438    #[tokio::test]
439    async fn check_passes_when_parent_dir_exists() {
440        let dir = tempfile::tempdir().unwrap();
441        let path = dir.path().join("out.csv");
442        let path_str = path.to_str().unwrap().to_string();
443        let sink = CsvSink::new(CsvSinkConfig::new(&path_str));
444        let report = sink
445            .check(&faucet_core::check::CheckContext::default())
446            .await
447            .unwrap();
448        assert_eq!(report.failed_count(), 0);
449        assert_eq!(report.probes[0].name, "io");
450        // The probe must not have created the user's output file.
451        assert!(!path.exists(), "check() must not create the output file");
452    }
453
454    #[tokio::test]
455    async fn check_fails_when_parent_dir_missing() {
456        let dir = tempfile::tempdir().unwrap();
457        let path = dir.path().join("nope").join("out.csv");
458        let path_str = path.to_str().unwrap().to_string();
459        let sink = CsvSink::new(CsvSinkConfig::new(&path_str));
460        let report = sink
461            .check(&faucet_core::check::CheckContext::default())
462            .await
463            .unwrap();
464        assert_eq!(report.failed_count(), 1);
465        assert_eq!(report.probes[0].name, "io");
466    }
467
468    #[tokio::test]
469    async fn creates_missing_parent_directories() {
470        let dir = tempfile::tempdir().unwrap();
471        let nested = dir.path().join("a").join("b").join("out.csv");
472        let path_str = nested.to_str().unwrap().to_string();
473        let sink = CsvSink::new(CsvSinkConfig::new(&path_str));
474
475        let records = vec![json!({"id": "1", "name": "Alice"})];
476        let count = sink.write_batch(&records).await.unwrap();
477        sink.flush().await.unwrap();
478
479        assert_eq!(count, 1);
480        assert!(nested.exists(), "output file must exist after write");
481        let content = tokio::fs::read_to_string(&nested).await.unwrap();
482        let lines: Vec<&str> = content.trim().split('\n').collect();
483        // Header + 1 data row.
484        assert_eq!(lines.len(), 2);
485    }
486
487    #[cfg(feature = "compression")]
488    #[tokio::test]
489    async fn roundtrip_gzip() {
490        use faucet_core::CompressionConfig;
491        let tmp = NamedTempFile::with_suffix(".csv.gz").unwrap();
492        let path = tmp.path().to_str().unwrap().to_string();
493        let sink = CsvSink::new(CsvSinkConfig::new(&path).compression(CompressionConfig::Auto));
494
495        let records = vec![
496            json!({"id": "1", "name": "Alice"}),
497            json!({"id": "2", "name": "Bob"}),
498        ];
499        sink.write_batch(&records).await.unwrap();
500        sink.flush().await.unwrap();
501
502        let bytes = tokio::fs::read(&path).await.unwrap();
503        use std::io::Read;
504        let mut r =
505            faucet_core::compression::wrap_sync_reader(&bytes[..], faucet_core::Compression::Gzip);
506        let mut text = String::new();
507        r.read_to_string(&mut text).unwrap();
508        let lines: Vec<&str> = text.trim().split('\n').collect();
509        // Header + 2 rows.
510        assert_eq!(lines.len(), 3);
511    }
512
513    #[cfg(feature = "compression")]
514    #[tokio::test]
515    async fn roundtrip_zstd() {
516        use faucet_core::CompressionConfig;
517        let tmp = NamedTempFile::with_suffix(".csv.zst").unwrap();
518        let path = tmp.path().to_str().unwrap().to_string();
519        let sink = CsvSink::new(CsvSinkConfig::new(&path).compression(CompressionConfig::Auto));
520
521        sink.write_batch(&[json!({"x": "42"})]).await.unwrap();
522        sink.flush().await.unwrap();
523
524        let bytes = tokio::fs::read(&path).await.unwrap();
525        use std::io::Read;
526        let mut r =
527            faucet_core::compression::wrap_sync_reader(&bytes[..], faucet_core::Compression::Zstd);
528        let mut text = String::new();
529        r.read_to_string(&mut text).unwrap();
530        let lines: Vec<&str> = text.trim().split('\n').collect();
531        // Header + 1 row.
532        assert_eq!(lines.len(), 2);
533    }
534
535    #[tokio::test]
536    async fn write_flush_write_does_not_truncate() {
537        // Regression: flush() clears the writer; the next write_batch
538        // must reopen in append mode regardless of config.append (which
539        // defaults to false). Without the opened_once guard, the second
540        // open would truncate and lose the first batch's records.
541        let tmp = NamedTempFile::new().unwrap();
542        let path = tmp.path().to_str().unwrap().to_string();
543        let sink = CsvSink::new(CsvSinkConfig::new(&path));
544
545        sink.write_batch(&[json!({"id": "1"})]).await.unwrap();
546        sink.flush().await.unwrap();
547        sink.write_batch(&[json!({"id": "2"})]).await.unwrap();
548        sink.flush().await.unwrap();
549
550        let content = tokio::fs::read_to_string(&path).await.unwrap();
551        let lines: Vec<&str> = content.trim().split('\n').collect();
552        // Header + 2 data rows (header is written only on the first open).
553        assert_eq!(
554            lines.len(),
555            3,
556            "both batches must survive the mid-stream flush"
557        );
558    }
559
560    #[cfg(feature = "compression")]
561    #[tokio::test]
562    async fn write_flush_write_produces_multi_member_gzip_csv() {
563        // With compression, flush() finalises one gzip member; the
564        // next write_batch starts a fresh member appended after it.
565        // The decoder reads both members back correctly.
566        use faucet_core::CompressionConfig;
567        let tmp = NamedTempFile::with_suffix(".csv.gz").unwrap();
568        let path = tmp.path().to_str().unwrap().to_string();
569        let sink = CsvSink::new(CsvSinkConfig::new(&path).compression(CompressionConfig::Auto));
570        sink.write_batch(&[json!({"id": "1"})]).await.unwrap();
571        sink.flush().await.unwrap();
572        sink.write_batch(&[json!({"id": "2"})]).await.unwrap();
573        sink.flush().await.unwrap();
574
575        let bytes = tokio::fs::read(&path).await.unwrap();
576        use std::io::Read;
577        let mut r =
578            faucet_core::compression::wrap_sync_reader(&bytes[..], faucet_core::Compression::Gzip);
579        let mut text = String::new();
580        r.read_to_string(&mut text).unwrap();
581        let lines: Vec<&str> = text.trim().split('\n').collect();
582        // Header (from first open) + 2 data rows. The re-open uses
583        // append=true so no second header is written.
584        assert_eq!(lines.len(), 3);
585    }
586}