1use crate::config::CsvSinkConfig;
4use async_trait::async_trait;
5use faucet_core::FaucetError;
6use serde_json::Value;
7use std::fs::OpenOptions;
8use std::sync::Mutex;
9
10#[cfg(feature = "compression")]
15type SinkWriter = faucet_core::compression::SyncCompressWriter<std::fs::File>;
16#[cfg(not(feature = "compression"))]
17type SinkWriter = std::fs::File;
18
19struct WriterState {
21 writer: csv::Writer<SinkWriter>,
22 columns: Vec<String>,
23}
24
25pub struct CsvSink {
38 config: CsvSinkConfig,
39 state: Mutex<Option<WriterState>>,
40 opened_once: std::sync::atomic::AtomicBool,
47}
48
49impl CsvSink {
50 pub fn new(config: CsvSinkConfig) -> Self {
52 Self {
53 config,
54 state: Mutex::new(None),
55 opened_once: std::sync::atomic::AtomicBool::new(false),
56 }
57 }
58
59 fn value_to_csv_field(value: &Value) -> String {
61 match value {
62 Value::Null => String::new(),
63 Value::String(s) => s.clone(),
64 Value::Bool(b) => b.to_string(),
65 Value::Number(n) => n.to_string(),
66 other => other.to_string(),
68 }
69 }
70}
71
72#[async_trait]
73impl faucet_core::Sink for CsvSink {
74 fn config_schema(&self) -> serde_json::Value {
75 serde_json::to_value(faucet_core::schema_for!(CsvSinkConfig)).expect("schema serialization")
76 }
77
78 async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
79 if records.is_empty() {
80 return Ok(0);
81 }
82
83 let config = self.config.clone();
84 let records: Vec<Value> = records.to_vec();
85
86 let current_state = {
89 let mut guard = self
90 .state
91 .lock()
92 .map_err(|e| FaucetError::Sink(format!("CSV sink lock poisoned: {e}")))?;
93 guard.take()
94 };
95
96 let opened_before = self.opened_once.load(std::sync::atomic::Ordering::Relaxed);
97
98 let result = tokio::task::spawn_blocking(move || {
99 write_csv_blocking(config, current_state, &records, opened_before)
100 })
101 .await
102 .map_err(|e| FaucetError::Sink(format!("CSV write task failed: {e}")))?;
103
104 let (new_state, count) = result?;
105
106 self.opened_once
108 .store(true, std::sync::atomic::Ordering::Relaxed);
109
110 {
112 let mut guard = self
113 .state
114 .lock()
115 .map_err(|e| FaucetError::Sink(format!("CSV sink lock poisoned: {e}")))?;
116 *guard = Some(new_state);
117 }
118
119 Ok(count)
120 }
121
122 async fn flush(&self) -> Result<(), FaucetError> {
123 let state = {
128 let mut guard = self
129 .state
130 .lock()
131 .map_err(|e| FaucetError::Sink(format!("CSV sink lock poisoned: {e}")))?;
132 guard.take()
133 };
134 if let Some(state) = state {
135 tokio::task::spawn_blocking(move || -> Result<(), FaucetError> {
136 let WriterState { writer, .. } = state;
137 let inner = writer
141 .into_inner()
142 .map_err(|e| FaucetError::Sink(format!("CSV flush failed: {e}")))?;
143 #[cfg(feature = "compression")]
144 {
145 inner.finish().map_err(|e| {
147 FaucetError::Sink(format!("CSV compression finalise failed: {e}"))
148 })?;
149 }
150 #[cfg(not(feature = "compression"))]
151 {
152 let mut f = inner;
153 std::io::Write::flush(&mut f)
154 .map_err(|e| FaucetError::Sink(format!("CSV flush failed: {e}")))?;
155 }
156 Ok(())
157 })
158 .await
159 .map_err(|e| FaucetError::Sink(format!("CSV flush task failed: {e}")))??;
160 }
161 Ok(())
162 }
163
164 async fn check(
169 &self,
170 _ctx: &faucet_core::check::CheckContext,
171 ) -> Result<faucet_core::check::CheckReport, FaucetError> {
172 use faucet_core::check::CheckReport;
173 let path = self.config.path.clone();
174 let probe = tokio::task::spawn_blocking(move || {
177 crate::probe::probe_parent_writable(&path, std::time::Instant::now())
178 })
179 .await
180 .map_err(|e| FaucetError::Sink(format!("CSV check task failed: {e}")))?;
181 Ok(CheckReport::single(probe))
182 }
183}
184
185fn write_csv_blocking(
187 config: CsvSinkConfig,
188 existing_state: Option<WriterState>,
189 records: &[Value],
190 opened_before: bool,
191) -> Result<(WriterState, usize), FaucetError> {
192 let mut state = match existing_state {
193 Some(s) => s,
194 None => {
195 let mut columns: Vec<String> = Vec::new();
202 let mut seen: std::collections::HashSet<&str> = std::collections::HashSet::new();
203 for record in records {
204 match record {
205 Value::Object(map) => {
206 for k in map.keys() {
207 if seen.insert(k.as_str()) {
208 columns.push(k.clone());
209 }
210 }
211 }
212 _ => {
213 return Err(FaucetError::Sink(
214 "CSV sink expects JSON objects, got non-object record".into(),
215 ));
216 }
217 }
218 }
219
220 let (append, truncate) = if opened_before {
224 (true, false)
225 } else {
226 (config.append, !config.append)
227 };
228
229 if let Some(parent) = std::path::Path::new(&config.path).parent()
230 && !parent.as_os_str().is_empty()
231 {
232 std::fs::create_dir_all(parent).map_err(|e| {
233 FaucetError::Sink(format!(
234 "failed to create parent directory '{}': {e}",
235 parent.display()
236 ))
237 })?;
238 }
239 let file = OpenOptions::new()
240 .create(true)
241 .write(true)
242 .append(append)
243 .truncate(truncate)
244 .open(&config.path)
245 .map_err(|e| {
246 FaucetError::Sink(format!("failed to open CSV file '{}': {e}", config.path))
247 })?;
248
249 #[cfg(feature = "compression")]
250 let inner: SinkWriter = {
251 let codec = config.compression.resolve(&config.path);
252 faucet_core::compression::warn_mismatch(&config.path, codec);
253 faucet_core::compression::sync_compress_writer(file, codec)
254 };
255 #[cfg(not(feature = "compression"))]
256 let inner: SinkWriter = file;
257
258 let mut writer = csv::WriterBuilder::new()
259 .delimiter(config.delimiter)
260 .from_writer(inner);
261
262 if config.write_headers && !append {
264 writer
265 .write_record(&columns)
266 .map_err(|e| FaucetError::Sink(format!("failed to write CSV headers: {e}")))?;
267 }
268
269 WriterState { writer, columns }
270 }
271 };
272
273 let mut count = 0;
274 for record in records {
275 let row: Vec<String> = state
276 .columns
277 .iter()
278 .map(|col| {
279 record
280 .get(col)
281 .map(CsvSink::value_to_csv_field)
282 .unwrap_or_default()
283 })
284 .collect();
285
286 state
287 .writer
288 .write_record(&row)
289 .map_err(|e| FaucetError::Sink(format!("CSV write error: {e}")))?;
290 count += 1;
291 }
292
293 tracing::debug!(records = count, path = %config.path, "CSV batch written");
294
295 Ok((state, count))
296}
297
298#[cfg(test)]
299mod tests {
300 use super::*;
301 use faucet_core::Sink;
302 use serde_json::json;
303 use tempfile::NamedTempFile;
304
305 #[tokio::test]
306 async fn writes_csv_records() {
307 let tmp = NamedTempFile::new().unwrap();
308 let path = tmp.path().to_str().unwrap().to_string();
309 let sink = CsvSink::new(CsvSinkConfig::new(&path));
310
311 let records = vec![
312 json!({"id": 1, "name": "Alice"}),
313 json!({"id": 2, "name": "Bob"}),
314 ];
315 let count = sink.write_batch(&records).await.unwrap();
316 sink.flush().await.unwrap();
317
318 assert_eq!(count, 2);
319
320 let content = tokio::fs::read_to_string(&path).await.unwrap();
321 let lines: Vec<&str> = content.trim().split('\n').collect();
322 assert_eq!(lines.len(), 3);
324 }
325
326 #[tokio::test]
327 async fn columns_union_across_first_batch_not_just_first_record() {
328 let tmp = NamedTempFile::new().unwrap();
334 let path = tmp.path().to_str().unwrap().to_string();
335 let sink = CsvSink::new(CsvSinkConfig::new(&path));
336
337 let records = vec![
338 json!({ "id": 1, "name": "Alice" }),
339 json!({ "id": 2, "name": "Bob", "email": "bob@x.y" }),
340 ];
341 sink.write_batch(&records).await.unwrap();
342 sink.flush().await.unwrap();
343
344 let content = tokio::fs::read_to_string(&path).await.unwrap();
345 let lines: Vec<&str> = content.trim().split('\n').collect();
346 assert_eq!(lines.len(), 3, "header + 2 rows");
347 assert!(
348 lines[0].contains("email"),
349 "header must include the later-record-only column: {}",
350 lines[0]
351 );
352 assert!(
354 lines[2].contains("bob@x.y"),
355 "second row must carry the unioned column value: {}",
356 lines[2]
357 );
358 }
359
360 #[tokio::test]
361 async fn writes_csv_without_headers() {
362 let tmp = NamedTempFile::new().unwrap();
363 let path = tmp.path().to_str().unwrap().to_string();
364 let sink = CsvSink::new(CsvSinkConfig::new(&path).write_headers(false));
365
366 let records = vec![json!({"a": "1", "b": "2"})];
367 sink.write_batch(&records).await.unwrap();
368 sink.flush().await.unwrap();
369
370 let content = tokio::fs::read_to_string(&path).await.unwrap();
371 let lines: Vec<&str> = content.trim().split('\n').collect();
372 assert_eq!(lines.len(), 1);
374 }
375
376 #[tokio::test]
377 async fn empty_batch_returns_zero() {
378 let tmp = NamedTempFile::new().unwrap();
379 let path = tmp.path().to_str().unwrap().to_string();
380 let sink = CsvSink::new(CsvSinkConfig::new(&path));
381 let count = sink.write_batch(&[]).await.unwrap();
382 assert_eq!(count, 0);
383 }
384
385 #[tokio::test]
386 async fn multiple_batches_accumulate() {
387 let tmp = NamedTempFile::new().unwrap();
388 let path = tmp.path().to_str().unwrap().to_string();
389 let sink = CsvSink::new(CsvSinkConfig::new(&path));
390
391 sink.write_batch(&[json!({"x": "1"})]).await.unwrap();
392 sink.write_batch(&[json!({"x": "2"}), json!({"x": "3"})])
393 .await
394 .unwrap();
395 sink.flush().await.unwrap();
396
397 let content = tokio::fs::read_to_string(&path).await.unwrap();
398 let lines: Vec<&str> = content.trim().split('\n').collect();
399 assert_eq!(lines.len(), 4);
401 }
402
403 #[tokio::test]
404 async fn missing_fields_written_as_empty() {
405 let tmp = NamedTempFile::new().unwrap();
406 let path = tmp.path().to_str().unwrap().to_string();
407 let sink = CsvSink::new(CsvSinkConfig::new(&path));
408
409 let records = vec![
410 json!({"a": "1", "b": "2"}),
411 json!({"a": "3"}), ];
413 sink.write_batch(&records).await.unwrap();
414 sink.flush().await.unwrap();
415
416 let content = tokio::fs::read_to_string(&path).await.unwrap();
417 let lines: Vec<&str> = content.trim().split('\n').collect();
418 assert_eq!(lines.len(), 3); }
420
421 #[tokio::test]
422 async fn value_to_csv_field_handles_types() {
423 assert_eq!(CsvSink::value_to_csv_field(&json!(null)), "");
424 assert_eq!(CsvSink::value_to_csv_field(&json!("hello")), "hello");
425 assert_eq!(CsvSink::value_to_csv_field(&json!(42)), "42");
426 assert_eq!(CsvSink::value_to_csv_field(&json!(true)), "true");
427 assert_eq!(CsvSink::value_to_csv_field(&json!(2.72)), "2.72");
428 }
429
430 #[tokio::test]
431 async fn flush_without_write_is_noop() {
432 let tmp = NamedTempFile::new().unwrap();
433 let path = tmp.path().to_str().unwrap().to_string();
434 let sink = CsvSink::new(CsvSinkConfig::new(&path));
435 assert!(sink.flush().await.is_ok());
436 }
437
438 #[tokio::test]
439 async fn check_passes_when_parent_dir_exists() {
440 let dir = tempfile::tempdir().unwrap();
441 let path = dir.path().join("out.csv");
442 let path_str = path.to_str().unwrap().to_string();
443 let sink = CsvSink::new(CsvSinkConfig::new(&path_str));
444 let report = sink
445 .check(&faucet_core::check::CheckContext::default())
446 .await
447 .unwrap();
448 assert_eq!(report.failed_count(), 0);
449 assert_eq!(report.probes[0].name, "io");
450 assert!(!path.exists(), "check() must not create the output file");
452 }
453
454 #[tokio::test]
455 async fn check_fails_when_parent_dir_missing() {
456 let dir = tempfile::tempdir().unwrap();
457 let path = dir.path().join("nope").join("out.csv");
458 let path_str = path.to_str().unwrap().to_string();
459 let sink = CsvSink::new(CsvSinkConfig::new(&path_str));
460 let report = sink
461 .check(&faucet_core::check::CheckContext::default())
462 .await
463 .unwrap();
464 assert_eq!(report.failed_count(), 1);
465 assert_eq!(report.probes[0].name, "io");
466 }
467
468 #[tokio::test]
469 async fn creates_missing_parent_directories() {
470 let dir = tempfile::tempdir().unwrap();
471 let nested = dir.path().join("a").join("b").join("out.csv");
472 let path_str = nested.to_str().unwrap().to_string();
473 let sink = CsvSink::new(CsvSinkConfig::new(&path_str));
474
475 let records = vec![json!({"id": "1", "name": "Alice"})];
476 let count = sink.write_batch(&records).await.unwrap();
477 sink.flush().await.unwrap();
478
479 assert_eq!(count, 1);
480 assert!(nested.exists(), "output file must exist after write");
481 let content = tokio::fs::read_to_string(&nested).await.unwrap();
482 let lines: Vec<&str> = content.trim().split('\n').collect();
483 assert_eq!(lines.len(), 2);
485 }
486
487 #[cfg(feature = "compression")]
488 #[tokio::test]
489 async fn roundtrip_gzip() {
490 use faucet_core::CompressionConfig;
491 let tmp = NamedTempFile::with_suffix(".csv.gz").unwrap();
492 let path = tmp.path().to_str().unwrap().to_string();
493 let sink = CsvSink::new(CsvSinkConfig::new(&path).compression(CompressionConfig::Auto));
494
495 let records = vec![
496 json!({"id": "1", "name": "Alice"}),
497 json!({"id": "2", "name": "Bob"}),
498 ];
499 sink.write_batch(&records).await.unwrap();
500 sink.flush().await.unwrap();
501
502 let bytes = tokio::fs::read(&path).await.unwrap();
503 use std::io::Read;
504 let mut r =
505 faucet_core::compression::wrap_sync_reader(&bytes[..], faucet_core::Compression::Gzip);
506 let mut text = String::new();
507 r.read_to_string(&mut text).unwrap();
508 let lines: Vec<&str> = text.trim().split('\n').collect();
509 assert_eq!(lines.len(), 3);
511 }
512
513 #[cfg(feature = "compression")]
514 #[tokio::test]
515 async fn roundtrip_zstd() {
516 use faucet_core::CompressionConfig;
517 let tmp = NamedTempFile::with_suffix(".csv.zst").unwrap();
518 let path = tmp.path().to_str().unwrap().to_string();
519 let sink = CsvSink::new(CsvSinkConfig::new(&path).compression(CompressionConfig::Auto));
520
521 sink.write_batch(&[json!({"x": "42"})]).await.unwrap();
522 sink.flush().await.unwrap();
523
524 let bytes = tokio::fs::read(&path).await.unwrap();
525 use std::io::Read;
526 let mut r =
527 faucet_core::compression::wrap_sync_reader(&bytes[..], faucet_core::Compression::Zstd);
528 let mut text = String::new();
529 r.read_to_string(&mut text).unwrap();
530 let lines: Vec<&str> = text.trim().split('\n').collect();
531 assert_eq!(lines.len(), 2);
533 }
534
535 #[tokio::test]
536 async fn write_flush_write_does_not_truncate() {
537 let tmp = NamedTempFile::new().unwrap();
542 let path = tmp.path().to_str().unwrap().to_string();
543 let sink = CsvSink::new(CsvSinkConfig::new(&path));
544
545 sink.write_batch(&[json!({"id": "1"})]).await.unwrap();
546 sink.flush().await.unwrap();
547 sink.write_batch(&[json!({"id": "2"})]).await.unwrap();
548 sink.flush().await.unwrap();
549
550 let content = tokio::fs::read_to_string(&path).await.unwrap();
551 let lines: Vec<&str> = content.trim().split('\n').collect();
552 assert_eq!(
554 lines.len(),
555 3,
556 "both batches must survive the mid-stream flush"
557 );
558 }
559
560 #[cfg(feature = "compression")]
561 #[tokio::test]
562 async fn write_flush_write_produces_multi_member_gzip_csv() {
563 use faucet_core::CompressionConfig;
567 let tmp = NamedTempFile::with_suffix(".csv.gz").unwrap();
568 let path = tmp.path().to_str().unwrap().to_string();
569 let sink = CsvSink::new(CsvSinkConfig::new(&path).compression(CompressionConfig::Auto));
570 sink.write_batch(&[json!({"id": "1"})]).await.unwrap();
571 sink.flush().await.unwrap();
572 sink.write_batch(&[json!({"id": "2"})]).await.unwrap();
573 sink.flush().await.unwrap();
574
575 let bytes = tokio::fs::read(&path).await.unwrap();
576 use std::io::Read;
577 let mut r =
578 faucet_core::compression::wrap_sync_reader(&bytes[..], faucet_core::Compression::Gzip);
579 let mut text = String::new();
580 r.read_to_string(&mut text).unwrap();
581 let lines: Vec<&str> = text.trim().split('\n').collect();
582 assert_eq!(lines.len(), 3);
585 }
586}