faucet_source_csv/
config.rs1use faucet_core::DEFAULT_BATCH_SIZE;
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6
7#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
9pub struct CsvSourceConfig {
10 pub path: String,
12 #[serde(default = "default_true")]
14 pub has_headers: bool,
15 #[serde(default = "default_delimiter")]
17 pub delimiter: u8,
18 #[serde(default = "default_quote")]
20 pub quote: u8,
21 #[serde(default = "default_batch_size")]
30 pub batch_size: usize,
31 #[cfg(feature = "compression")]
36 #[serde(default)]
37 pub compression: faucet_core::CompressionConfig,
38}
39
40fn default_true() -> bool {
41 true
42}
43
44fn default_delimiter() -> u8 {
45 b','
46}
47
48fn default_quote() -> u8 {
49 b'"'
50}
51
52fn default_batch_size() -> usize {
53 DEFAULT_BATCH_SIZE
54}
55
56impl CsvSourceConfig {
57 pub fn new(path: impl Into<String>) -> Self {
59 Self {
60 path: path.into(),
61 has_headers: true,
62 delimiter: b',',
63 quote: b'"',
64 batch_size: DEFAULT_BATCH_SIZE,
65 #[cfg(feature = "compression")]
66 compression: faucet_core::CompressionConfig::Auto,
67 }
68 }
69
70 pub fn has_headers(mut self, v: bool) -> Self {
72 self.has_headers = v;
73 self
74 }
75
76 pub fn delimiter(mut self, d: u8) -> Self {
78 self.delimiter = d;
79 self
80 }
81
82 pub fn quote(mut self, q: u8) -> Self {
84 self.quote = q;
85 self
86 }
87
88 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
93 self.batch_size = batch_size;
94 self
95 }
96
97 #[cfg(feature = "compression")]
99 pub fn compression(mut self, c: faucet_core::CompressionConfig) -> Self {
100 self.compression = c;
101 self
102 }
103}
104
105#[cfg(test)]
106mod tests {
107 use super::*;
108
109 #[test]
110 fn default_config() {
111 let config = CsvSourceConfig::new("/tmp/data.csv");
112 assert_eq!(config.path, "/tmp/data.csv");
113 assert!(config.has_headers);
114 assert_eq!(config.delimiter, b',');
115 assert_eq!(config.quote, b'"');
116 }
117
118 #[test]
119 fn builder_methods() {
120 let config = CsvSourceConfig::new("/tmp/data.tsv")
121 .has_headers(false)
122 .delimiter(b'\t')
123 .quote(b'\'');
124 assert!(!config.has_headers);
125 assert_eq!(config.delimiter, b'\t');
126 assert_eq!(config.quote, b'\'');
127 }
128
129 #[test]
130 fn batch_size_defaults_to_default_batch_size() {
131 let config = CsvSourceConfig::new("/tmp/data.csv");
132 assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
133 }
134
135 #[test]
136 fn with_batch_size_overrides_default() {
137 let config = CsvSourceConfig::new("/tmp/data.csv").with_batch_size(500);
138 assert_eq!(config.batch_size, 500);
139 }
140
141 #[test]
142 fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
143 let config = CsvSourceConfig::new("/tmp/data.csv").with_batch_size(0);
144 assert_eq!(config.batch_size, 0);
145 assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
146 }
147
148 #[test]
149 fn batch_size_above_max_is_rejected_by_validate_batch_size() {
150 let config =
151 CsvSourceConfig::new("/tmp/data.csv").with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
152 assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
153 }
154
155 #[test]
156 fn batch_size_deserializes_from_json() {
157 let json = r#"{
158 "path": "/tmp/data.csv",
159 "batch_size": 250
160 }"#;
161 let config: CsvSourceConfig = serde_json::from_str(json).unwrap();
162 assert_eq!(config.batch_size, 250);
163 }
164}