faucet_source_parquet/
config.rs1use schemars::JsonSchema;
4use serde::{Deserialize, Serialize};
5
6pub const DEFAULT_BATCH_SIZE: usize = faucet_core::DEFAULT_BATCH_SIZE;
11
12pub const DEFAULT_CONCURRENCY: usize = 4;
14
15fn default_batch_size() -> usize {
16 DEFAULT_BATCH_SIZE
17}
18
19fn default_concurrency() -> usize {
20 DEFAULT_CONCURRENCY
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
25pub struct ParquetSourceConfig {
26 pub source: ParquetLocation,
28
29 #[serde(default = "default_batch_size")]
46 pub batch_size: usize,
47
48 #[serde(default)]
54 pub columns: Option<Vec<String>>,
55
56 #[serde(default = "default_concurrency")]
59 pub concurrency: usize,
60}
61
62impl ParquetSourceConfig {
63 pub fn new(source: ParquetLocation) -> Self {
65 Self {
66 source,
67 batch_size: DEFAULT_BATCH_SIZE,
68 columns: None,
69 concurrency: DEFAULT_CONCURRENCY,
70 }
71 }
72
73 pub fn local(path: impl Into<String>) -> Self {
75 Self::new(ParquetLocation::LocalPath { path: path.into() })
76 }
77
78 pub fn glob(pattern: impl Into<String>) -> Self {
80 Self::new(ParquetLocation::Glob {
81 pattern: pattern.into(),
82 })
83 }
84
85 pub fn s3(s3: ParquetS3Config) -> Self {
87 Self::new(ParquetLocation::S3(s3))
88 }
89
90 pub fn batch_size(mut self, size: usize) -> Self {
95 self.batch_size = size;
96 self
97 }
98
99 pub fn with_batch_size(self, batch_size: usize) -> Self {
105 self.batch_size(batch_size)
106 }
107
108 pub fn columns<I, S>(mut self, columns: I) -> Self
110 where
111 I: IntoIterator<Item = S>,
112 S: Into<String>,
113 {
114 self.columns = Some(columns.into_iter().map(Into::into).collect());
115 self
116 }
117
118 pub fn concurrency(mut self, concurrency: usize) -> Self {
120 self.concurrency = concurrency;
121 self
122 }
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
127#[serde(tag = "type", rename_all = "snake_case")]
128pub enum ParquetLocation {
129 LocalPath { path: String },
131
132 Glob { pattern: String },
135
136 S3(ParquetS3Config),
139}
140
141#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
145pub struct ParquetS3Config {
146 pub bucket: String,
148
149 #[serde(default)]
151 pub key: Option<String>,
152
153 #[serde(default)]
155 pub prefix: Option<String>,
156
157 #[serde(default)]
159 pub region: Option<String>,
160
161 #[serde(default)]
163 pub endpoint_url: Option<String>,
164}
165
166impl ParquetS3Config {
167 pub fn object(bucket: impl Into<String>, key: impl Into<String>) -> Self {
169 Self {
170 bucket: bucket.into(),
171 key: Some(key.into()),
172 prefix: None,
173 region: None,
174 endpoint_url: None,
175 }
176 }
177
178 pub fn prefix(bucket: impl Into<String>, prefix: impl Into<String>) -> Self {
180 Self {
181 bucket: bucket.into(),
182 key: None,
183 prefix: Some(prefix.into()),
184 region: None,
185 endpoint_url: None,
186 }
187 }
188
189 pub fn region(mut self, region: impl Into<String>) -> Self {
191 self.region = Some(region.into());
192 self
193 }
194
195 pub fn endpoint_url(mut self, url: impl Into<String>) -> Self {
197 self.endpoint_url = Some(url.into());
198 self
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205
206 #[test]
207 fn defaults_are_sensible() {
208 let cfg = ParquetSourceConfig::local("/tmp/data.parquet");
209 assert_eq!(cfg.batch_size, DEFAULT_BATCH_SIZE);
210 assert_eq!(cfg.concurrency, DEFAULT_CONCURRENCY);
211 assert!(cfg.columns.is_none());
212 assert!(matches!(cfg.source, ParquetLocation::LocalPath { .. }));
213 }
214
215 #[test]
216 fn builder_methods_compose() {
217 let cfg = ParquetSourceConfig::glob("/tmp/*.parquet")
218 .batch_size(2048)
219 .concurrency(8)
220 .columns(["id", "name"]);
221 assert_eq!(cfg.batch_size, 2048);
222 assert_eq!(cfg.concurrency, 8);
223 assert_eq!(
224 cfg.columns.as_deref(),
225 Some(&["id".to_string(), "name".to_string()][..])
226 );
227 }
228
229 #[test]
230 fn s3_object_and_prefix_variants() {
231 let by_key = ParquetS3Config::object("my-bucket", "events/2024/data.parquet");
232 assert_eq!(by_key.key.as_deref(), Some("events/2024/data.parquet"));
233 assert!(by_key.prefix.is_none());
234
235 let by_prefix = ParquetS3Config::prefix("my-bucket", "events/2024/")
236 .region("us-east-1")
237 .endpoint_url("http://localhost:9000");
238 assert!(by_prefix.key.is_none());
239 assert_eq!(by_prefix.prefix.as_deref(), Some("events/2024/"));
240 assert_eq!(by_prefix.region.as_deref(), Some("us-east-1"));
241 assert_eq!(
242 by_prefix.endpoint_url.as_deref(),
243 Some("http://localhost:9000")
244 );
245 }
246
247 #[test]
248 fn batch_size_default_via_serde() {
249 let cfg: ParquetSourceConfig = serde_json::from_value(serde_json::json!({
250 "source": { "type": "local_path", "path": "/tmp/x.parquet" }
251 }))
252 .unwrap();
253 assert_eq!(cfg.batch_size, DEFAULT_BATCH_SIZE);
254 assert_eq!(cfg.concurrency, DEFAULT_CONCURRENCY);
255 assert!(cfg.columns.is_none());
256 }
257
258 #[test]
259 fn schema_generates_without_panicking() {
260 let _ = faucet_core::schema_for!(ParquetSourceConfig);
261 }
262
263 #[test]
264 fn batch_size_defaults_to_faucet_core_default_batch_size() {
265 let cfg = ParquetSourceConfig::local("/tmp/x.parquet");
266 assert_eq!(cfg.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
267 }
268
269 #[test]
270 fn with_batch_size_overrides_default() {
271 let cfg = ParquetSourceConfig::local("/tmp/x.parquet").with_batch_size(500);
272 assert_eq!(cfg.batch_size, 500);
273 }
274
275 #[test]
276 fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
277 let cfg = ParquetSourceConfig::local("/tmp/x.parquet").with_batch_size(0);
278 assert_eq!(cfg.batch_size, 0);
279 assert!(faucet_core::validate_batch_size(cfg.batch_size).is_ok());
280 }
281
282 #[test]
283 fn batch_size_above_max_is_rejected_by_validate_batch_size() {
284 let cfg = ParquetSourceConfig::local("/tmp/x.parquet")
285 .with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
286 assert!(faucet_core::validate_batch_size(cfg.batch_size).is_err());
287 }
288
289 #[test]
290 fn batch_size_deserializes_from_json() {
291 let json = r#"{
292 "source": { "type": "local_path", "path": "/tmp/x.parquet" },
293 "batch_size": 250
294 }"#;
295 let cfg: ParquetSourceConfig = serde_json::from_str(json).unwrap();
296 assert_eq!(cfg.batch_size, 250);
297 }
298
299 #[test]
300 fn batch_size_zero_deserializes_from_json() {
301 let json = r#"{
302 "source": { "type": "local_path", "path": "/tmp/x.parquet" },
303 "batch_size": 0
304 }"#;
305 let cfg: ParquetSourceConfig = serde_json::from_str(json).unwrap();
306 assert_eq!(cfg.batch_size, 0);
307 }
308}