use crate::config::CsvSinkConfig;
use async_trait::async_trait;
use faucet_core::FaucetError;
use serde_json::Value;
use std::fs::OpenOptions;
use std::sync::Mutex;
#[cfg(feature = "compression")]
type SinkWriter = faucet_core::compression::SyncCompressWriter<std::fs::File>;
#[cfg(not(feature = "compression"))]
type SinkWriter = std::fs::File;
struct WriterState {
writer: csv::Writer<SinkWriter>,
columns: Vec<String>,
}
pub struct CsvSink {
config: CsvSinkConfig,
state: Mutex<Option<WriterState>>,
opened_once: std::sync::atomic::AtomicBool,
}
impl CsvSink {
pub fn new(config: CsvSinkConfig) -> Self {
Self {
config,
state: Mutex::new(None),
opened_once: std::sync::atomic::AtomicBool::new(false),
}
}
fn value_to_csv_field(value: &Value) -> String {
match value {
Value::Null => String::new(),
Value::String(s) => s.clone(),
Value::Bool(b) => b.to_string(),
Value::Number(n) => n.to_string(),
other => other.to_string(),
}
}
}
#[async_trait]
impl faucet_core::Sink for CsvSink {
fn config_schema(&self) -> serde_json::Value {
serde_json::to_value(faucet_core::schema_for!(CsvSinkConfig)).expect("schema serialization")
}
async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
if records.is_empty() {
return Ok(0);
}
let config = self.config.clone();
let records: Vec<Value> = records.to_vec();
let current_state = {
let mut guard = self
.state
.lock()
.map_err(|e| FaucetError::Sink(format!("CSV sink lock poisoned: {e}")))?;
guard.take()
};
let opened_before = self.opened_once.load(std::sync::atomic::Ordering::Relaxed);
let result = tokio::task::spawn_blocking(move || {
write_csv_blocking(config, current_state, &records, opened_before)
})
.await
.map_err(|e| FaucetError::Sink(format!("CSV write task failed: {e}")))?;
let (new_state, count) = result?;
self.opened_once
.store(true, std::sync::atomic::Ordering::Relaxed);
{
let mut guard = self
.state
.lock()
.map_err(|e| FaucetError::Sink(format!("CSV sink lock poisoned: {e}")))?;
*guard = Some(new_state);
}
Ok(count)
}
async fn flush(&self) -> Result<(), FaucetError> {
let state = {
let mut guard = self
.state
.lock()
.map_err(|e| FaucetError::Sink(format!("CSV sink lock poisoned: {e}")))?;
guard.take()
};
if let Some(state) = state {
tokio::task::spawn_blocking(move || -> Result<(), FaucetError> {
let WriterState { writer, .. } = state;
let inner = writer
.into_inner()
.map_err(|e| FaucetError::Sink(format!("CSV flush failed: {e}")))?;
#[cfg(feature = "compression")]
{
inner.finish().map_err(|e| {
FaucetError::Sink(format!("CSV compression finalise failed: {e}"))
})?;
}
#[cfg(not(feature = "compression"))]
{
let mut f = inner;
std::io::Write::flush(&mut f)
.map_err(|e| FaucetError::Sink(format!("CSV flush failed: {e}")))?;
}
Ok(())
})
.await
.map_err(|e| FaucetError::Sink(format!("CSV flush task failed: {e}")))??;
}
Ok(())
}
async fn check(
&self,
_ctx: &faucet_core::check::CheckContext,
) -> Result<faucet_core::check::CheckReport, FaucetError> {
use faucet_core::check::CheckReport;
let path = self.config.path.clone();
let probe = tokio::task::spawn_blocking(move || {
crate::probe::probe_parent_writable(&path, std::time::Instant::now())
})
.await
.map_err(|e| FaucetError::Sink(format!("CSV check task failed: {e}")))?;
Ok(CheckReport::single(probe))
}
}
fn write_csv_blocking(
config: CsvSinkConfig,
existing_state: Option<WriterState>,
records: &[Value],
opened_before: bool,
) -> Result<(WriterState, usize), FaucetError> {
let mut state = match existing_state {
Some(s) => s,
None => {
let mut columns: Vec<String> = Vec::new();
let mut seen: std::collections::HashSet<&str> = std::collections::HashSet::new();
for record in records {
match record {
Value::Object(map) => {
for k in map.keys() {
if seen.insert(k.as_str()) {
columns.push(k.clone());
}
}
}
_ => {
return Err(FaucetError::Sink(
"CSV sink expects JSON objects, got non-object record".into(),
));
}
}
}
let (append, truncate) = if opened_before {
(true, false)
} else {
(config.append, !config.append)
};
if let Some(parent) = std::path::Path::new(&config.path).parent()
&& !parent.as_os_str().is_empty()
{
std::fs::create_dir_all(parent).map_err(|e| {
FaucetError::Sink(format!(
"failed to create parent directory '{}': {e}",
parent.display()
))
})?;
}
let file = OpenOptions::new()
.create(true)
.write(true)
.append(append)
.truncate(truncate)
.open(&config.path)
.map_err(|e| {
FaucetError::Sink(format!("failed to open CSV file '{}': {e}", config.path))
})?;
#[cfg(feature = "compression")]
let inner: SinkWriter = {
let codec = config.compression.resolve(&config.path);
faucet_core::compression::warn_mismatch(&config.path, codec);
faucet_core::compression::sync_compress_writer(file, codec)
};
#[cfg(not(feature = "compression"))]
let inner: SinkWriter = file;
let mut writer = csv::WriterBuilder::new()
.delimiter(config.delimiter)
.from_writer(inner);
if config.write_headers && !append {
writer
.write_record(&columns)
.map_err(|e| FaucetError::Sink(format!("failed to write CSV headers: {e}")))?;
}
WriterState { writer, columns }
}
};
let mut count = 0;
for record in records {
let row: Vec<String> = state
.columns
.iter()
.map(|col| {
record
.get(col)
.map(CsvSink::value_to_csv_field)
.unwrap_or_default()
})
.collect();
state
.writer
.write_record(&row)
.map_err(|e| FaucetError::Sink(format!("CSV write error: {e}")))?;
count += 1;
}
tracing::debug!(records = count, path = %config.path, "CSV batch written");
Ok((state, count))
}
#[cfg(test)]
mod tests {
use super::*;
use faucet_core::Sink;
use serde_json::json;
use tempfile::NamedTempFile;
#[tokio::test]
async fn writes_csv_records() {
let tmp = NamedTempFile::new().unwrap();
let path = tmp.path().to_str().unwrap().to_string();
let sink = CsvSink::new(CsvSinkConfig::new(&path));
let records = vec![
json!({"id": 1, "name": "Alice"}),
json!({"id": 2, "name": "Bob"}),
];
let count = sink.write_batch(&records).await.unwrap();
sink.flush().await.unwrap();
assert_eq!(count, 2);
let content = tokio::fs::read_to_string(&path).await.unwrap();
let lines: Vec<&str> = content.trim().split('\n').collect();
assert_eq!(lines.len(), 3);
}
#[tokio::test]
async fn columns_union_across_first_batch_not_just_first_record() {
let tmp = NamedTempFile::new().unwrap();
let path = tmp.path().to_str().unwrap().to_string();
let sink = CsvSink::new(CsvSinkConfig::new(&path));
let records = vec![
json!({ "id": 1, "name": "Alice" }),
json!({ "id": 2, "name": "Bob", "email": "bob@x.y" }),
];
sink.write_batch(&records).await.unwrap();
sink.flush().await.unwrap();
let content = tokio::fs::read_to_string(&path).await.unwrap();
let lines: Vec<&str> = content.trim().split('\n').collect();
assert_eq!(lines.len(), 3, "header + 2 rows");
assert!(
lines[0].contains("email"),
"header must include the later-record-only column: {}",
lines[0]
);
assert!(
lines[2].contains("bob@x.y"),
"second row must carry the unioned column value: {}",
lines[2]
);
}
#[tokio::test]
async fn writes_csv_without_headers() {
let tmp = NamedTempFile::new().unwrap();
let path = tmp.path().to_str().unwrap().to_string();
let sink = CsvSink::new(CsvSinkConfig::new(&path).write_headers(false));
let records = vec![json!({"a": "1", "b": "2"})];
sink.write_batch(&records).await.unwrap();
sink.flush().await.unwrap();
let content = tokio::fs::read_to_string(&path).await.unwrap();
let lines: Vec<&str> = content.trim().split('\n').collect();
assert_eq!(lines.len(), 1);
}
#[tokio::test]
async fn empty_batch_returns_zero() {
let tmp = NamedTempFile::new().unwrap();
let path = tmp.path().to_str().unwrap().to_string();
let sink = CsvSink::new(CsvSinkConfig::new(&path));
let count = sink.write_batch(&[]).await.unwrap();
assert_eq!(count, 0);
}
#[tokio::test]
async fn multiple_batches_accumulate() {
let tmp = NamedTempFile::new().unwrap();
let path = tmp.path().to_str().unwrap().to_string();
let sink = CsvSink::new(CsvSinkConfig::new(&path));
sink.write_batch(&[json!({"x": "1"})]).await.unwrap();
sink.write_batch(&[json!({"x": "2"}), json!({"x": "3"})])
.await
.unwrap();
sink.flush().await.unwrap();
let content = tokio::fs::read_to_string(&path).await.unwrap();
let lines: Vec<&str> = content.trim().split('\n').collect();
assert_eq!(lines.len(), 4);
}
#[tokio::test]
async fn missing_fields_written_as_empty() {
let tmp = NamedTempFile::new().unwrap();
let path = tmp.path().to_str().unwrap().to_string();
let sink = CsvSink::new(CsvSinkConfig::new(&path));
let records = vec![
json!({"a": "1", "b": "2"}),
json!({"a": "3"}), ];
sink.write_batch(&records).await.unwrap();
sink.flush().await.unwrap();
let content = tokio::fs::read_to_string(&path).await.unwrap();
let lines: Vec<&str> = content.trim().split('\n').collect();
assert_eq!(lines.len(), 3); }
#[tokio::test]
async fn value_to_csv_field_handles_types() {
assert_eq!(CsvSink::value_to_csv_field(&json!(null)), "");
assert_eq!(CsvSink::value_to_csv_field(&json!("hello")), "hello");
assert_eq!(CsvSink::value_to_csv_field(&json!(42)), "42");
assert_eq!(CsvSink::value_to_csv_field(&json!(true)), "true");
assert_eq!(CsvSink::value_to_csv_field(&json!(2.72)), "2.72");
}
#[tokio::test]
async fn flush_without_write_is_noop() {
let tmp = NamedTempFile::new().unwrap();
let path = tmp.path().to_str().unwrap().to_string();
let sink = CsvSink::new(CsvSinkConfig::new(&path));
assert!(sink.flush().await.is_ok());
}
#[tokio::test]
async fn check_passes_when_parent_dir_exists() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("out.csv");
let path_str = path.to_str().unwrap().to_string();
let sink = CsvSink::new(CsvSinkConfig::new(&path_str));
let report = sink
.check(&faucet_core::check::CheckContext::default())
.await
.unwrap();
assert_eq!(report.failed_count(), 0);
assert_eq!(report.probes[0].name, "io");
assert!(!path.exists(), "check() must not create the output file");
}
#[tokio::test]
async fn check_fails_when_parent_dir_missing() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("nope").join("out.csv");
let path_str = path.to_str().unwrap().to_string();
let sink = CsvSink::new(CsvSinkConfig::new(&path_str));
let report = sink
.check(&faucet_core::check::CheckContext::default())
.await
.unwrap();
assert_eq!(report.failed_count(), 1);
assert_eq!(report.probes[0].name, "io");
}
#[tokio::test]
async fn creates_missing_parent_directories() {
let dir = tempfile::tempdir().unwrap();
let nested = dir.path().join("a").join("b").join("out.csv");
let path_str = nested.to_str().unwrap().to_string();
let sink = CsvSink::new(CsvSinkConfig::new(&path_str));
let records = vec![json!({"id": "1", "name": "Alice"})];
let count = sink.write_batch(&records).await.unwrap();
sink.flush().await.unwrap();
assert_eq!(count, 1);
assert!(nested.exists(), "output file must exist after write");
let content = tokio::fs::read_to_string(&nested).await.unwrap();
let lines: Vec<&str> = content.trim().split('\n').collect();
assert_eq!(lines.len(), 2);
}
#[cfg(feature = "compression")]
#[tokio::test]
async fn roundtrip_gzip() {
use faucet_core::CompressionConfig;
let tmp = NamedTempFile::with_suffix(".csv.gz").unwrap();
let path = tmp.path().to_str().unwrap().to_string();
let sink = CsvSink::new(CsvSinkConfig::new(&path).compression(CompressionConfig::Auto));
let records = vec![
json!({"id": "1", "name": "Alice"}),
json!({"id": "2", "name": "Bob"}),
];
sink.write_batch(&records).await.unwrap();
sink.flush().await.unwrap();
let bytes = tokio::fs::read(&path).await.unwrap();
use std::io::Read;
let mut r =
faucet_core::compression::wrap_sync_reader(&bytes[..], faucet_core::Compression::Gzip);
let mut text = String::new();
r.read_to_string(&mut text).unwrap();
let lines: Vec<&str> = text.trim().split('\n').collect();
assert_eq!(lines.len(), 3);
}
#[cfg(feature = "compression")]
#[tokio::test]
async fn roundtrip_zstd() {
use faucet_core::CompressionConfig;
let tmp = NamedTempFile::with_suffix(".csv.zst").unwrap();
let path = tmp.path().to_str().unwrap().to_string();
let sink = CsvSink::new(CsvSinkConfig::new(&path).compression(CompressionConfig::Auto));
sink.write_batch(&[json!({"x": "42"})]).await.unwrap();
sink.flush().await.unwrap();
let bytes = tokio::fs::read(&path).await.unwrap();
use std::io::Read;
let mut r =
faucet_core::compression::wrap_sync_reader(&bytes[..], faucet_core::Compression::Zstd);
let mut text = String::new();
r.read_to_string(&mut text).unwrap();
let lines: Vec<&str> = text.trim().split('\n').collect();
assert_eq!(lines.len(), 2);
}
#[tokio::test]
async fn write_flush_write_does_not_truncate() {
let tmp = NamedTempFile::new().unwrap();
let path = tmp.path().to_str().unwrap().to_string();
let sink = CsvSink::new(CsvSinkConfig::new(&path));
sink.write_batch(&[json!({"id": "1"})]).await.unwrap();
sink.flush().await.unwrap();
sink.write_batch(&[json!({"id": "2"})]).await.unwrap();
sink.flush().await.unwrap();
let content = tokio::fs::read_to_string(&path).await.unwrap();
let lines: Vec<&str> = content.trim().split('\n').collect();
assert_eq!(
lines.len(),
3,
"both batches must survive the mid-stream flush"
);
}
#[cfg(feature = "compression")]
#[tokio::test]
async fn write_flush_write_produces_multi_member_gzip_csv() {
use faucet_core::CompressionConfig;
let tmp = NamedTempFile::with_suffix(".csv.gz").unwrap();
let path = tmp.path().to_str().unwrap().to_string();
let sink = CsvSink::new(CsvSinkConfig::new(&path).compression(CompressionConfig::Auto));
sink.write_batch(&[json!({"id": "1"})]).await.unwrap();
sink.flush().await.unwrap();
sink.write_batch(&[json!({"id": "2"})]).await.unwrap();
sink.flush().await.unwrap();
let bytes = tokio::fs::read(&path).await.unwrap();
use std::io::Read;
let mut r =
faucet_core::compression::wrap_sync_reader(&bytes[..], faucet_core::Compression::Gzip);
let mut text = String::new();
r.read_to_string(&mut text).unwrap();
let lines: Vec<&str> = text.trim().split('\n').collect();
assert_eq!(lines.len(), 3);
}
}