use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
use tokio::sync::Mutex;
use tracing::info;
use crate::common::message::SharedMessage;
use crate::error::{Error, Result};
use crate::sink::Sink;
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum FileFormat {
#[default]
Jsonl,
Tsv,
Csv,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FileSinkConfig {
pub path: String,
#[serde(default = "default_append")]
pub append: bool,
#[serde(default)]
pub format: FileFormat,
#[serde(default)]
pub include_header: bool,
}
fn default_append() -> bool {
true
}
pub struct FileSink {
id: String,
writer: Mutex<BufWriter<File>>,
format: FileFormat,
header_state: Mutex<HeaderState>,
include_header: bool,
}
#[derive(Debug, Default)]
struct HeaderState {
written: bool,
fields: Option<Vec<String>>,
}
impl FileSink {
pub async fn new(id: impl Into<String>, config: FileSinkConfig) -> Result<Self> {
let path = PathBuf::from(&config.path);
if let Some(parent) = path.parent()
&& !parent.as_os_str().is_empty()
{
tokio::fs::create_dir_all(parent).await.map_err(|e| {
Error::config(format!(
"Failed to create parent directory for '{}': {}",
config.path, e
))
})?;
}
let file = OpenOptions::new()
.create(true)
.write(true)
.append(config.append)
.truncate(!config.append)
.open(&path)
.await
.map_err(|e| Error::config(format!("Failed to open file '{}': {}", config.path, e)))?;
let id = id.into();
let mode = if config.append { "append" } else { "overwrite" };
let format = config.format;
let writer = BufWriter::new(file);
let mut header_state = HeaderState::default();
if config.include_header
&& let Some(separator) = separator_for(format)
&& config.append
{
match tokio::fs::File::open(&path).await {
Ok(file) => {
let mut reader = BufReader::new(file);
let mut line = String::new();
let bytes = reader.read_line(&mut line).await.map_err(|e| {
Error::config(format!(
"Failed to read existing header for '{}': {}",
config.path, e
))
})?;
if bytes > 0 {
let trimmed = line.trim_end_matches(['\n', '\r']);
if !trimmed.is_empty() {
header_state.fields = Some(parse_header_line(trimmed, separator));
header_state.written = true;
}
}
}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
Err(err) => {
return Err(Error::config(format!(
"Failed to open file '{}' for header inspection: {}",
config.path, err
)));
}
}
}
info!(
sink_id = %id,
mode = %mode,
format = ?format,
include_header = %config.include_header,
path = %config.path,
"FileSink created"
);
Ok(Self {
id,
writer: Mutex::new(writer),
format,
header_state: Mutex::new(header_state),
include_header: config.include_header,
})
}
}
impl FileSink {
async fn write_line(writer: &mut BufWriter<File>, line: &str) -> Result<()> {
writer
.write_all(line.as_bytes())
.await
.map_err(|e| Error::sink(format!("Failed to write to file: {}", e)))?;
writer
.write_all(b"\n")
.await
.map_err(|e| Error::sink(format!("Failed to write newline: {}", e)))?;
writer
.flush()
.await
.map_err(|e| Error::sink(format!("Failed to flush file: {}", e)))
}
}
#[async_trait]
impl Sink for FileSink {
fn id(&self) -> &str {
&self.id
}
async fn process(&self, msg: SharedMessage) -> Result<()> {
let mut header_line = None;
let line = if let Some(separator) = separator_for(self.format) {
let mut state = self.header_state.lock().await;
if self.include_header && !state.written {
if state.fields.is_none() {
state.fields = generate_header_fields(&msg.payload);
}
if let Some(ref fields) = state.fields {
header_line = Some(format_header(fields, separator));
state.written = true;
}
}
if let Some(ref fields) = state.fields {
format_separated_with_fields(&msg.payload, separator, fields)
} else {
format_separated(&msg.payload, separator)
}
} else {
serde_json::to_string(msg.as_ref())
.map_err(|e| Error::sink(format!("Failed to serialize message: {}", e)))?
};
let mut writer = self.writer.lock().await;
if let Some(header) = header_line {
Self::write_line(&mut writer, &header).await?;
}
Self::write_line(&mut writer, &line).await?;
tracing::debug!(sink_id = %self.id, "Written message to file");
Ok(())
}
}
fn separator_for(format: FileFormat) -> Option<char> {
match format {
FileFormat::Tsv => Some('\t'),
FileFormat::Csv => Some(','),
FileFormat::Jsonl => None,
}
}
fn parse_header_line(line: &str, separator: char) -> Vec<String> {
let mut fields = Vec::new();
let mut current = String::new();
let mut in_quotes = false;
let mut chars = line.chars().peekable();
while let Some(ch) = chars.next() {
if in_quotes {
if ch == '"' {
if chars.peek() == Some(&'"') {
current.push('"');
chars.next();
} else {
in_quotes = false;
}
} else {
current.push(ch);
}
} else if ch == '"' {
in_quotes = true;
} else if ch == separator {
fields.push(current);
current = String::new();
} else {
current.push(ch);
}
}
fields.push(current);
fields
}
fn generate_header_fields(value: &serde_json::Value) -> Option<Vec<String>> {
match value {
serde_json::Value::Object(map) => {
let headers: Vec<String> = map.keys().cloned().collect();
Some(headers)
}
serde_json::Value::Array(arr) => {
let headers: Vec<String> = (0..arr.len()).map(|i| format!("col_{}", i)).collect();
Some(headers)
}
_ => None,
}
}
fn format_header(fields: &[String], separator: char) -> String {
let sep_str = separator.to_string();
fields
.iter()
.map(|k| format_header_value(k, separator))
.collect::<Vec<_>>()
.join(&sep_str)
}
fn format_header_value(value: &str, separator: char) -> String {
if value.contains(separator) || value.contains('"') || value.contains('\n') {
format!("\"{}\"", value.replace('"', "\"\""))
} else {
value.to_string()
}
}
fn format_separated_with_fields(
value: &serde_json::Value,
separator: char,
fields: &[String],
) -> String {
let sep_str = separator.to_string();
if fields.is_empty() {
return String::new();
}
let mut columns = Vec::with_capacity(fields.len());
match value {
serde_json::Value::Object(map) => {
for field in fields {
let cell = map
.get(field)
.map(|v| format_value(v, separator))
.unwrap_or_default();
columns.push(cell);
}
}
serde_json::Value::Array(arr) => {
for idx in 0..fields.len() {
let cell = arr
.get(idx)
.map(|v| format_value(v, separator))
.unwrap_or_default();
columns.push(cell);
}
}
_ => {
let first = format_value(value, separator);
columns.push(first);
while columns.len() < fields.len() {
columns.push(String::new());
}
}
}
columns.join(&sep_str)
}
fn format_separated(value: &serde_json::Value, separator: char) -> String {
let sep_str = separator.to_string();
match value {
serde_json::Value::Object(map) => map
.values()
.map(|v| format_value(v, separator))
.collect::<Vec<_>>()
.join(&sep_str),
serde_json::Value::Array(arr) => arr
.iter()
.map(|v| format_value(v, separator))
.collect::<Vec<_>>()
.join(&sep_str),
_ => format_value(value, separator),
}
}
fn format_value(value: &serde_json::Value, separator: char) -> String {
match value {
serde_json::Value::Null => String::new(),
serde_json::Value::Bool(b) => b.to_string(),
serde_json::Value::Number(n) => n.to_string(),
serde_json::Value::String(s) => {
if s.contains(separator) || s.contains('"') || s.contains('\n') {
format!("\"{}\"", s.replace('"', "\"\""))
} else {
s.clone()
}
}
serde_json::Value::Array(_) | serde_json::Value::Object(_) => {
let json = serde_json::to_string(value).unwrap_or_default();
format!("\"{}\"", json.replace('"', "\"\""))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::common::message::Message;
use std::sync::Arc;
#[test]
fn test_config_deserialize_minimal() {
let yaml = r#"path: "/tmp/output.jsonl""#;
let cfg: FileSinkConfig = serde_yaml::from_str(yaml).unwrap();
assert_eq!(cfg.path, "/tmp/output.jsonl");
assert!(cfg.append); assert_eq!(cfg.format, FileFormat::Jsonl); }
#[test]
fn test_config_deserialize_full() {
let yaml = r#"
path: "/var/log/pipeline.jsonl"
append: false
format: jsonl
"#;
let cfg: FileSinkConfig = serde_yaml::from_str(yaml).unwrap();
assert_eq!(cfg.path, "/var/log/pipeline.jsonl");
assert!(!cfg.append);
assert_eq!(cfg.format, FileFormat::Jsonl);
}
#[test]
fn test_config_deserialize_tsv_format() {
let yaml = r#"
path: "/tmp/output.tsv"
format: tsv
"#;
let cfg: FileSinkConfig = serde_yaml::from_str(yaml).unwrap();
assert_eq!(cfg.format, FileFormat::Tsv);
}
#[test]
fn test_config_deserialize_csv_format() {
let yaml = r#"
path: "/tmp/output.csv"
format: csv
"#;
let cfg: FileSinkConfig = serde_yaml::from_str(yaml).unwrap();
assert_eq!(cfg.format, FileFormat::Csv);
}
#[tokio::test]
async fn test_new_creates_file() {
let temp_dir = tempfile::tempdir().unwrap();
let file_path = temp_dir.path().join("test.jsonl");
let config = FileSinkConfig {
path: file_path.to_string_lossy().to_string(),
append: true,
format: FileFormat::Jsonl,
include_header: false,
};
let sink = FileSink::new("test_sink", config).await.unwrap();
assert_eq!(sink.id(), "test_sink");
assert!(file_path.exists());
}
#[tokio::test]
async fn test_new_creates_parent_dirs() {
let temp_dir = tempfile::tempdir().unwrap();
let file_path = temp_dir.path().join("nested/dir/test.jsonl");
let config = FileSinkConfig {
path: file_path.to_string_lossy().to_string(),
append: true,
format: FileFormat::Jsonl,
include_header: false,
};
let sink = FileSink::new("test_sink", config).await.unwrap();
assert_eq!(sink.id(), "test_sink");
assert!(file_path.exists());
}
#[tokio::test]
async fn test_process_writes_jsonl() {
let temp_dir = tempfile::tempdir().unwrap();
let file_path = temp_dir.path().join("output.jsonl");
let config = FileSinkConfig {
path: file_path.to_string_lossy().to_string(),
append: true,
format: FileFormat::Jsonl,
include_header: false,
};
let sink = FileSink::new("test_sink", config).await.unwrap();
let msg = Arc::new(Message::new("source", serde_json::json!({"key": "value"})));
sink.process(msg).await.unwrap();
let content = tokio::fs::read_to_string(&file_path).await.unwrap();
let lines: Vec<&str> = content.lines().collect();
assert_eq!(lines.len(), 1);
let parsed: serde_json::Value = serde_json::from_str(lines[0]).unwrap();
assert_eq!(parsed["payload"]["key"], "value");
}
#[tokio::test]
async fn test_process_appends_multiple_messages() {
let temp_dir = tempfile::tempdir().unwrap();
let file_path = temp_dir.path().join("output.jsonl");
let config = FileSinkConfig {
path: file_path.to_string_lossy().to_string(),
append: true,
format: FileFormat::Jsonl,
include_header: false,
};
let sink = FileSink::new("test_sink", config).await.unwrap();
for i in 0..3 {
let msg = Arc::new(Message::new("source", serde_json::json!({"index": i})));
sink.process(msg).await.unwrap();
}
let content = tokio::fs::read_to_string(&file_path).await.unwrap();
let lines: Vec<&str> = content.lines().collect();
assert_eq!(lines.len(), 3);
}
#[tokio::test]
async fn test_process_writes_tsv() {
let temp_dir = tempfile::tempdir().unwrap();
let file_path = temp_dir.path().join("output.tsv");
let config = FileSinkConfig {
path: file_path.to_string_lossy().to_string(),
append: true,
format: FileFormat::Tsv,
include_header: false,
};
let sink = FileSink::new("test_sink", config).await.unwrap();
let msg = Arc::new(Message::new(
"source",
serde_json::json!({"name": "Alice", "age": 30, "active": true}),
));
sink.process(msg).await.unwrap();
let content = tokio::fs::read_to_string(&file_path).await.unwrap();
let line = content.trim();
assert!(line.contains("Alice"));
assert!(line.contains("30"));
assert!(line.contains("true"));
assert!(line.contains('\t'));
}
#[tokio::test]
async fn test_process_writes_csv() {
let temp_dir = tempfile::tempdir().unwrap();
let file_path = temp_dir.path().join("output.csv");
let config = FileSinkConfig {
path: file_path.to_string_lossy().to_string(),
append: true,
format: FileFormat::Csv,
include_header: false,
};
let sink = FileSink::new("test_sink", config).await.unwrap();
let msg = Arc::new(Message::new(
"source",
serde_json::json!({"name": "Bob", "score": 95.5}),
));
sink.process(msg).await.unwrap();
let content = tokio::fs::read_to_string(&file_path).await.unwrap();
let line = content.trim();
assert!(line.contains("Bob"));
assert!(line.contains("95.5"));
assert!(line.contains(','));
}
#[test]
fn test_format_separated_object() {
let value = serde_json::json!({"a": 1, "b": "hello"});
let result = format_separated(&value, ',');
assert!(result.contains(','));
assert!(result.contains('1'));
assert!(result.contains("hello"));
}
#[test]
fn test_format_separated_array() {
let value = serde_json::json!([1, "two", true]);
let result = format_separated(&value, '\t');
assert_eq!(result, "1\ttwo\ttrue");
}
#[test]
fn test_format_value_escaping() {
let result = format_value(&serde_json::json!("hello,world"), ',');
assert_eq!(result, "\"hello,world\"");
let result = format_value(&serde_json::json!("say \"hi\""), ',');
assert_eq!(result, "\"say \"\"hi\"\"\"");
let result = format_value(&serde_json::json!("line1\nline2"), ',');
assert_eq!(result, "\"line1\nline2\"");
let result = format_value(&serde_json::json!("normal"), ',');
assert_eq!(result, "normal");
}
#[test]
fn test_format_value_types() {
assert_eq!(format_value(&serde_json::Value::Null, ','), "");
assert_eq!(format_value(&serde_json::json!(true), ','), "true");
assert_eq!(format_value(&serde_json::json!(false), ','), "false");
assert_eq!(format_value(&serde_json::json!(42), ','), "42");
assert_eq!(format_value(&serde_json::json!(3.15), ','), "3.15");
}
#[tokio::test]
async fn test_csv_with_header() {
let temp_dir = tempfile::tempdir().unwrap();
let file_path = temp_dir.path().join("output_with_header.csv");
let config = FileSinkConfig {
path: file_path.to_string_lossy().to_string(),
append: false,
format: FileFormat::Csv,
include_header: true,
};
let sink = FileSink::new("test_sink", config).await.unwrap();
for i in 0..3 {
let msg = Arc::new(Message::new(
"source",
serde_json::json!({"name": format!("user{}", i), "score": i * 10}),
));
sink.process(msg).await.unwrap();
}
let content = tokio::fs::read_to_string(&file_path).await.unwrap();
let lines: Vec<&str> = content.lines().collect();
assert_eq!(lines.len(), 4);
let header = lines[0];
assert!(header.contains("name"));
assert!(header.contains("score"));
}
#[tokio::test]
async fn test_csv_header_order_is_stable() {
let temp_dir = tempfile::tempdir().unwrap();
let file_path = temp_dir.path().join("ordered.csv");
let config = FileSinkConfig {
path: file_path.to_string_lossy().to_string(),
append: false,
format: FileFormat::Csv,
include_header: true,
};
let sink = FileSink::new("test_sink", config).await.unwrap();
let msg1 = Arc::new(Message::new("source", serde_json::json!({"b": 2, "a": 1})));
let msg2 = Arc::new(Message::new(
"source",
serde_json::json!({"a": 10, "b": 20}),
));
sink.process(msg1).await.unwrap();
sink.process(msg2).await.unwrap();
let content = tokio::fs::read_to_string(&file_path).await.unwrap();
let lines: Vec<&str> = content.lines().collect();
assert_eq!(lines.len(), 3);
assert_eq!(lines[0], "a,b");
assert_eq!(lines[1], "1,2");
assert_eq!(lines[2], "10,20");
}
#[tokio::test]
async fn test_csv_append_uses_existing_header() {
let temp_dir = tempfile::tempdir().unwrap();
let file_path = temp_dir.path().join("existing.csv");
std::fs::write(&file_path, "name,score\n").unwrap();
let config = FileSinkConfig {
path: file_path.to_string_lossy().to_string(),
append: true,
format: FileFormat::Csv,
include_header: true,
};
let sink = FileSink::new("test_sink", config).await.unwrap();
let msg = Arc::new(Message::new(
"source",
serde_json::json!({"score": 10, "name": "bob"}),
));
sink.process(msg).await.unwrap();
let content = tokio::fs::read_to_string(&file_path).await.unwrap();
let lines: Vec<&str> = content.lines().collect();
assert_eq!(lines.len(), 2);
assert_eq!(lines[0], "name,score");
assert_eq!(lines[1], "bob,10");
}
#[tokio::test]
async fn test_tsv_with_header() {
let temp_dir = tempfile::tempdir().unwrap();
let file_path = temp_dir.path().join("output_with_header.tsv");
let config = FileSinkConfig {
path: file_path.to_string_lossy().to_string(),
append: false,
format: FileFormat::Tsv,
include_header: true,
};
let sink = FileSink::new("test_sink", config).await.unwrap();
let msg = Arc::new(Message::new(
"source",
serde_json::json!({"city": "Shanghai", "population": 24000000}),
));
sink.process(msg).await.unwrap();
let content = tokio::fs::read_to_string(&file_path).await.unwrap();
let lines: Vec<&str> = content.lines().collect();
assert_eq!(lines.len(), 2);
let header = lines[0];
assert!(header.contains("city"));
assert!(header.contains("population"));
assert!(header.contains('\t'));
let data = lines[1];
assert!(data.contains("Shanghai"));
assert!(data.contains("24000000"));
}
#[test]
fn test_generate_header_fields_object() {
let value = serde_json::json!({"name": "test", "age": 30});
let fields = generate_header_fields(&value).unwrap();
let header = format_header(&fields, ',');
assert!(header.contains("name"));
assert!(header.contains("age"));
}
#[test]
fn test_generate_header_fields_array() {
let value = serde_json::json!([1, 2, 3]);
let fields = generate_header_fields(&value).unwrap();
let header = format_header(&fields, '\t');
assert!(header.contains("col_0"));
assert!(header.contains("col_1"));
assert!(header.contains("col_2"));
}
#[test]
fn test_separator_for_jsonl_returns_none() {
assert!(separator_for(FileFormat::Jsonl).is_none());
}
#[test]
fn test_config_deserialize_with_header() {
let yaml = r#"
path: "/tmp/output.csv"
format: csv
include_header: true
"#;
let cfg: FileSinkConfig = serde_yaml::from_str(yaml).unwrap();
assert_eq!(cfg.format, FileFormat::Csv);
assert!(cfg.include_header);
}
}