use faucet_core::DEFAULT_BATCH_SIZE;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct CsvSourceConfig {
pub path: String,
#[serde(default = "default_true")]
pub has_headers: bool,
#[serde(default = "default_delimiter")]
pub delimiter: u8,
#[serde(default = "default_quote")]
pub quote: u8,
#[serde(default = "default_batch_size")]
pub batch_size: usize,
#[cfg(feature = "compression")]
#[serde(default)]
pub compression: faucet_core::CompressionConfig,
}
fn default_true() -> bool {
true
}
fn default_delimiter() -> u8 {
b','
}
fn default_quote() -> u8 {
b'"'
}
fn default_batch_size() -> usize {
DEFAULT_BATCH_SIZE
}
impl CsvSourceConfig {
pub fn new(path: impl Into<String>) -> Self {
Self {
path: path.into(),
has_headers: true,
delimiter: b',',
quote: b'"',
batch_size: DEFAULT_BATCH_SIZE,
#[cfg(feature = "compression")]
compression: faucet_core::CompressionConfig::Auto,
}
}
pub fn has_headers(mut self, v: bool) -> Self {
self.has_headers = v;
self
}
pub fn delimiter(mut self, d: u8) -> Self {
self.delimiter = d;
self
}
pub fn quote(mut self, q: u8) -> Self {
self.quote = q;
self
}
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size;
self
}
#[cfg(feature = "compression")]
pub fn compression(mut self, c: faucet_core::CompressionConfig) -> Self {
self.compression = c;
self
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn default_config() {
let config = CsvSourceConfig::new("/tmp/data.csv");
assert_eq!(config.path, "/tmp/data.csv");
assert!(config.has_headers);
assert_eq!(config.delimiter, b',');
assert_eq!(config.quote, b'"');
}
#[test]
fn builder_methods() {
let config = CsvSourceConfig::new("/tmp/data.tsv")
.has_headers(false)
.delimiter(b'\t')
.quote(b'\'');
assert!(!config.has_headers);
assert_eq!(config.delimiter, b'\t');
assert_eq!(config.quote, b'\'');
}
#[test]
fn batch_size_defaults_to_default_batch_size() {
let config = CsvSourceConfig::new("/tmp/data.csv");
assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
}
#[test]
fn with_batch_size_overrides_default() {
let config = CsvSourceConfig::new("/tmp/data.csv").with_batch_size(500);
assert_eq!(config.batch_size, 500);
}
#[test]
fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
let config = CsvSourceConfig::new("/tmp/data.csv").with_batch_size(0);
assert_eq!(config.batch_size, 0);
assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
}
#[test]
fn batch_size_above_max_is_rejected_by_validate_batch_size() {
let config =
CsvSourceConfig::new("/tmp/data.csv").with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
}
#[test]
fn batch_size_deserializes_from_json() {
let json = r#"{
"path": "/tmp/data.csv",
"batch_size": 250
}"#;
let config: CsvSourceConfig = serde_json::from_str(json).unwrap();
assert_eq!(config.batch_size, 250);
}
}