faucet_sink_csv/
config.rs1use faucet_core::DEFAULT_BATCH_SIZE;
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6
7#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
9pub struct CsvSinkConfig {
10 pub path: String,
12 #[serde(default = "default_delimiter")]
14 pub delimiter: u8,
15 #[serde(default = "default_true")]
17 pub write_headers: bool,
18 #[serde(default)]
20 pub append: bool,
21 #[serde(default = "default_batch_size")]
33 pub batch_size: usize,
34 #[cfg(feature = "compression")]
39 #[serde(default)]
40 pub compression: faucet_core::CompressionConfig,
41}
42
43fn default_delimiter() -> u8 {
44 b','
45}
46
47fn default_true() -> bool {
48 true
49}
50
51fn default_batch_size() -> usize {
52 DEFAULT_BATCH_SIZE
53}
54
55impl CsvSinkConfig {
56 pub fn new(path: impl Into<String>) -> Self {
58 Self {
59 path: path.into(),
60 delimiter: b',',
61 write_headers: true,
62 append: false,
63 batch_size: DEFAULT_BATCH_SIZE,
64 #[cfg(feature = "compression")]
65 compression: faucet_core::CompressionConfig::Auto,
66 }
67 }
68
69 pub fn delimiter(mut self, d: u8) -> Self {
71 self.delimiter = d;
72 self
73 }
74
75 pub fn write_headers(mut self, v: bool) -> Self {
77 self.write_headers = v;
78 self
79 }
80
81 pub fn append(mut self, v: bool) -> Self {
83 self.append = v;
84 self
85 }
86
87 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
96 self.batch_size = batch_size;
97 self
98 }
99
100 #[cfg(feature = "compression")]
102 pub fn compression(mut self, c: faucet_core::CompressionConfig) -> Self {
103 self.compression = c;
104 self
105 }
106}
107
108#[cfg(test)]
109mod tests {
110 use super::*;
111
112 #[test]
113 fn default_config() {
114 let config = CsvSinkConfig::new("/tmp/out.csv");
115 assert_eq!(config.path, "/tmp/out.csv");
116 assert_eq!(config.delimiter, b',');
117 assert!(config.write_headers);
118 assert!(!config.append);
119 }
120
121 #[test]
122 fn builder_methods() {
123 let config = CsvSinkConfig::new("/tmp/out.tsv")
124 .delimiter(b'\t')
125 .write_headers(false)
126 .append(true);
127 assert_eq!(config.delimiter, b'\t');
128 assert!(!config.write_headers);
129 assert!(config.append);
130 }
131
132 #[test]
133 fn batch_size_defaults_to_default_batch_size() {
134 let config = CsvSinkConfig::new("/tmp/out.csv");
135 assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
136 }
137
138 #[test]
139 fn with_batch_size_overrides_default() {
140 let config = CsvSinkConfig::new("/tmp/out.csv").with_batch_size(250);
141 assert_eq!(config.batch_size, 250);
142 }
143
144 #[test]
145 fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
146 let config = CsvSinkConfig::new("/tmp/out.csv").with_batch_size(0);
147 assert_eq!(config.batch_size, 0);
148 assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
149 }
150
151 #[test]
152 fn batch_size_above_max_is_rejected_by_validate_batch_size() {
153 let config =
154 CsvSinkConfig::new("/tmp/out.csv").with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
155 assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
156 }
157
158 #[test]
159 fn batch_size_deserializes_from_json() {
160 let json = r#"{
161 "path": "/tmp/out.csv",
162 "delimiter": 44,
163 "write_headers": true,
164 "append": false,
165 "batch_size": 500
166 }"#;
167 let config: CsvSinkConfig = serde_json::from_str(json).unwrap();
168 assert_eq!(config.batch_size, 500);
169 }
170
171 #[test]
172 fn batch_size_defaults_when_missing_in_json() {
173 let json = r#"{"path": "/tmp/out.csv"}"#;
174 let config: CsvSinkConfig = serde_json::from_str(json).unwrap();
175 assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
176 }
177}