Skip to main content

faucet_source_parquet/
config.rs

1//! Parquet source configuration.
2
3use schemars::JsonSchema;
4use serde::{Deserialize, Serialize};
5
6/// Default Arrow `RecordBatch` size used when decoding Parquet row groups.
7///
8/// Aliases [`faucet_core::DEFAULT_BATCH_SIZE`] so the parquet source's
9/// per-page size matches the rest of the streaming pipeline by default.
10pub const DEFAULT_BATCH_SIZE: usize = faucet_core::DEFAULT_BATCH_SIZE;
11
12/// Default parallel-file-read concurrency for glob / S3 prefix dispatch.
13pub const DEFAULT_CONCURRENCY: usize = 4;
14
15fn default_batch_size() -> usize {
16    DEFAULT_BATCH_SIZE
17}
18
19fn default_concurrency() -> usize {
20    DEFAULT_CONCURRENCY
21}
22
23/// Configuration for the Parquet source connector.
24#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
25pub struct ParquetSourceConfig {
26    /// Where to read Parquet from — a local file, a local glob pattern, or S3.
27    pub source: ParquetLocation,
28
29    /// Arrow `RecordBatch` size used as the per-page hint when streaming.
30    ///
31    /// Passed verbatim to
32    /// `ParquetRecordBatchStreamBuilder::with_batch_size`, which is itself
33    /// only a hint — Arrow may emit smaller batches at row-group boundaries,
34    /// so a single emitted [`faucet_core::StreamPage`] can hold fewer rows
35    /// than this number. Larger values improve throughput at the cost of
36    /// memory.
37    ///
38    /// `batch_size = 0` is the "no batching" sentinel: the call to
39    /// `with_batch_size` is skipped and the underlying file's native
40    /// row-group size drives the page cadence (one page per row-group).
41    /// Useful for sinks like SQL `COPY` / BigQuery load jobs that prefer one
42    /// large request to many small ones.
43    ///
44    /// Defaults to [`DEFAULT_BATCH_SIZE`].
45    #[serde(default = "default_batch_size")]
46    pub batch_size: usize,
47
48    /// Optional column projection (column pruning).
49    ///
50    /// When set, only the named columns are decoded — non-projected columns
51    /// are skipped entirely at the Parquet level, reducing I/O. Errors out
52    /// if a name does not exist in the file's schema.
53    #[serde(default)]
54    pub columns: Option<Vec<String>>,
55
56    /// Maximum number of files to read concurrently for `Glob` / S3 prefix
57    /// modes. Ignored for single-file modes.
58    #[serde(default = "default_concurrency")]
59    pub concurrency: usize,
60}
61
62impl ParquetSourceConfig {
63    /// Build a config from a `ParquetLocation` with sensible defaults.
64    pub fn new(source: ParquetLocation) -> Self {
65        Self {
66            source,
67            batch_size: DEFAULT_BATCH_SIZE,
68            columns: None,
69            concurrency: DEFAULT_CONCURRENCY,
70        }
71    }
72
73    /// Convenience: build a config that reads one local file path.
74    pub fn local(path: impl Into<String>) -> Self {
75        Self::new(ParquetLocation::LocalPath { path: path.into() })
76    }
77
78    /// Convenience: build a config that reads files matching a local glob.
79    pub fn glob(pattern: impl Into<String>) -> Self {
80        Self::new(ParquetLocation::Glob {
81            pattern: pattern.into(),
82        })
83    }
84
85    /// Convenience: build a config that reads from an S3 location.
86    pub fn s3(s3: ParquetS3Config) -> Self {
87        Self::new(ParquetLocation::S3(s3))
88    }
89
90    /// Override the Arrow `RecordBatch` size.
91    ///
92    /// Pass `0` to opt out of batching — the underlying file's native
93    /// row-group size drives the page cadence instead.
94    pub fn batch_size(mut self, size: usize) -> Self {
95        self.batch_size = size;
96        self
97    }
98
99    /// Set the per-page row-count hint for
100    /// [`Source::stream_pages`](faucet_core::Source::stream_pages).
101    ///
102    /// Alias for [`Self::batch_size`] — provided so the builder reads the
103    /// same as every other faucet source.
104    pub fn with_batch_size(self, batch_size: usize) -> Self {
105        self.batch_size(batch_size)
106    }
107
108    /// Restrict decoding to the named columns.
109    pub fn columns<I, S>(mut self, columns: I) -> Self
110    where
111        I: IntoIterator<Item = S>,
112        S: Into<String>,
113    {
114        self.columns = Some(columns.into_iter().map(Into::into).collect());
115        self
116    }
117
118    /// Override the parallel-file-read concurrency.
119    pub fn concurrency(mut self, concurrency: usize) -> Self {
120        self.concurrency = concurrency;
121        self
122    }
123}
124
125/// Where to read Parquet data from.
126#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
127#[serde(tag = "type", rename_all = "snake_case")]
128pub enum ParquetLocation {
129    /// A single local file path.
130    LocalPath { path: String },
131
132    /// A local glob pattern that expands to zero or more files. All matched
133    /// files must share the same Arrow schema.
134    Glob { pattern: String },
135
136    /// An S3 location — either a single object key, or a prefix that
137    /// expands to multiple objects.
138    S3(ParquetS3Config),
139}
140
141/// S3 location parameters.
142///
143/// Exactly one of `key` or `prefix` must be set.
144#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
145pub struct ParquetS3Config {
146    /// S3 bucket name.
147    pub bucket: String,
148
149    /// A single object key (mutually exclusive with `prefix`).
150    #[serde(default)]
151    pub key: Option<String>,
152
153    /// An object key prefix to list and read (mutually exclusive with `key`).
154    #[serde(default)]
155    pub prefix: Option<String>,
156
157    /// AWS region. `None` uses the default-region resolution chain.
158    #[serde(default)]
159    pub region: Option<String>,
160
161    /// Custom endpoint URL for S3-compatible services (MinIO, LocalStack, …).
162    #[serde(default)]
163    pub endpoint_url: Option<String>,
164}
165
166impl ParquetS3Config {
167    /// New S3 config targeting a single object key.
168    pub fn object(bucket: impl Into<String>, key: impl Into<String>) -> Self {
169        Self {
170            bucket: bucket.into(),
171            key: Some(key.into()),
172            prefix: None,
173            region: None,
174            endpoint_url: None,
175        }
176    }
177
178    /// New S3 config targeting all objects under a prefix.
179    pub fn prefix(bucket: impl Into<String>, prefix: impl Into<String>) -> Self {
180        Self {
181            bucket: bucket.into(),
182            key: None,
183            prefix: Some(prefix.into()),
184            region: None,
185            endpoint_url: None,
186        }
187    }
188
189    /// Set the AWS region.
190    pub fn region(mut self, region: impl Into<String>) -> Self {
191        self.region = Some(region.into());
192        self
193    }
194
195    /// Set a custom endpoint URL for S3-compatible services.
196    pub fn endpoint_url(mut self, url: impl Into<String>) -> Self {
197        self.endpoint_url = Some(url.into());
198        self
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205
206    #[test]
207    fn defaults_are_sensible() {
208        let cfg = ParquetSourceConfig::local("/tmp/data.parquet");
209        assert_eq!(cfg.batch_size, DEFAULT_BATCH_SIZE);
210        assert_eq!(cfg.concurrency, DEFAULT_CONCURRENCY);
211        assert!(cfg.columns.is_none());
212        assert!(matches!(cfg.source, ParquetLocation::LocalPath { .. }));
213    }
214
215    #[test]
216    fn builder_methods_compose() {
217        let cfg = ParquetSourceConfig::glob("/tmp/*.parquet")
218            .batch_size(2048)
219            .concurrency(8)
220            .columns(["id", "name"]);
221        assert_eq!(cfg.batch_size, 2048);
222        assert_eq!(cfg.concurrency, 8);
223        assert_eq!(
224            cfg.columns.as_deref(),
225            Some(&["id".to_string(), "name".to_string()][..])
226        );
227    }
228
229    #[test]
230    fn s3_object_and_prefix_variants() {
231        let by_key = ParquetS3Config::object("my-bucket", "events/2024/data.parquet");
232        assert_eq!(by_key.key.as_deref(), Some("events/2024/data.parquet"));
233        assert!(by_key.prefix.is_none());
234
235        let by_prefix = ParquetS3Config::prefix("my-bucket", "events/2024/")
236            .region("us-east-1")
237            .endpoint_url("http://localhost:9000");
238        assert!(by_prefix.key.is_none());
239        assert_eq!(by_prefix.prefix.as_deref(), Some("events/2024/"));
240        assert_eq!(by_prefix.region.as_deref(), Some("us-east-1"));
241        assert_eq!(
242            by_prefix.endpoint_url.as_deref(),
243            Some("http://localhost:9000")
244        );
245    }
246
247    #[test]
248    fn batch_size_default_via_serde() {
249        let cfg: ParquetSourceConfig = serde_json::from_value(serde_json::json!({
250            "source": { "type": "local_path", "path": "/tmp/x.parquet" }
251        }))
252        .unwrap();
253        assert_eq!(cfg.batch_size, DEFAULT_BATCH_SIZE);
254        assert_eq!(cfg.concurrency, DEFAULT_CONCURRENCY);
255        assert!(cfg.columns.is_none());
256    }
257
258    #[test]
259    fn schema_generates_without_panicking() {
260        let _ = faucet_core::schema_for!(ParquetSourceConfig);
261    }
262
263    #[test]
264    fn batch_size_defaults_to_faucet_core_default_batch_size() {
265        let cfg = ParquetSourceConfig::local("/tmp/x.parquet");
266        assert_eq!(cfg.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
267    }
268
269    #[test]
270    fn with_batch_size_overrides_default() {
271        let cfg = ParquetSourceConfig::local("/tmp/x.parquet").with_batch_size(500);
272        assert_eq!(cfg.batch_size, 500);
273    }
274
275    #[test]
276    fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
277        let cfg = ParquetSourceConfig::local("/tmp/x.parquet").with_batch_size(0);
278        assert_eq!(cfg.batch_size, 0);
279        assert!(faucet_core::validate_batch_size(cfg.batch_size).is_ok());
280    }
281
282    #[test]
283    fn batch_size_above_max_is_rejected_by_validate_batch_size() {
284        let cfg = ParquetSourceConfig::local("/tmp/x.parquet")
285            .with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
286        assert!(faucet_core::validate_batch_size(cfg.batch_size).is_err());
287    }
288
289    #[test]
290    fn batch_size_deserializes_from_json() {
291        let json = r#"{
292            "source": { "type": "local_path", "path": "/tmp/x.parquet" },
293            "batch_size": 250
294        }"#;
295        let cfg: ParquetSourceConfig = serde_json::from_str(json).unwrap();
296        assert_eq!(cfg.batch_size, 250);
297    }
298
299    #[test]
300    fn batch_size_zero_deserializes_from_json() {
301        let json = r#"{
302            "source": { "type": "local_path", "path": "/tmp/x.parquet" },
303            "batch_size": 0
304        }"#;
305        let cfg: ParquetSourceConfig = serde_json::from_str(json).unwrap();
306        assert_eq!(cfg.batch_size, 0);
307    }
308}