Skip to main content

faucet_source_parquet/
stream.rs

1//! Parquet source stream executor.
2//!
3//! Reads one or more Parquet files (local file, local glob, or S3 object /
4//! prefix) and yields each row as a `serde_json::Value::Object`. RecordBatches
5//! are streamed and converted incrementally — no whole-file buffering.
6
7use std::collections::HashMap;
8use std::path::PathBuf;
9use std::pin::Pin;
10use std::sync::Arc;
11
12use async_trait::async_trait;
13use faucet_core::{FaucetError, Stream, StreamPage};
14use futures::{StreamExt, TryStreamExt, stream};
15use object_store::ObjectStore;
16use object_store::aws::AmazonS3Builder;
17use object_store::path::Path as ObjectPath;
18use parquet::arrow::ProjectionMask;
19use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder};
20use serde_json::Value;
21
22use crate::config::{ParquetLocation, ParquetS3Config, ParquetSourceConfig};
23use crate::convert::record_batch_to_json;
24
25/// A source that reads Parquet files into JSON records.
26pub struct ParquetSource {
27    config: ParquetSourceConfig,
28    /// Eagerly-constructed object store used for S3 sources. `None` for
29    /// local file / glob sources.
30    s3_store: Option<Arc<dyn ObjectStore>>,
31}
32
33impl ParquetSource {
34    /// Build a new Parquet source from `config`.
35    ///
36    /// Performs eager validation (concurrency > 0, mutually exclusive S3
37    /// `key`/`prefix`) and pre-builds the S3 client when applicable so it can
38    /// be reused across concurrent file reads.
39    pub async fn new(config: ParquetSourceConfig) -> Result<Self, FaucetError> {
40        // `batch_size == 0` is the "no batching" sentinel — accepted, and
41        // means "let the file's native row-group size drive page cadence".
42        // See `ParquetSourceConfig::batch_size` for the full contract.
43        if config.concurrency == 0 {
44            return Err(FaucetError::Config(
45                "parquet source: concurrency must be > 0".into(),
46            ));
47        }
48
49        let s3_store = match &config.source {
50            ParquetLocation::S3(s3) => Some(build_s3_store(s3)?),
51            _ => None,
52        };
53
54        Ok(Self { config, s3_store })
55    }
56
57    /// Resolve the configured `source` into the concrete list of files to read.
58    ///
59    /// For S3 prefix mode this issues a list-objects call. For glob mode this
60    /// expands the pattern. The result is sorted for deterministic ordering.
61    async fn resolve_files(
62        &self,
63        context: &HashMap<String, Value>,
64    ) -> Result<Vec<FileTarget>, FaucetError> {
65        match &self.config.source {
66            ParquetLocation::LocalPath { path } => {
67                let resolved = substitute(path, context);
68                Ok(vec![FileTarget::Local(PathBuf::from(resolved))])
69            }
70            ParquetLocation::Glob { pattern } => {
71                let resolved = substitute(pattern, context);
72                expand_glob(&resolved)
73            }
74            ParquetLocation::S3(s3) => self.resolve_s3_files(s3, context).await,
75        }
76    }
77
78    async fn resolve_s3_files(
79        &self,
80        s3: &ParquetS3Config,
81        context: &HashMap<String, Value>,
82    ) -> Result<Vec<FileTarget>, FaucetError> {
83        match (&s3.key, &s3.prefix) {
84            (Some(_), Some(_)) => Err(FaucetError::Config(
85                "parquet source: S3 config cannot set both `key` and `prefix`".into(),
86            )),
87            (None, None) => Err(FaucetError::Config(
88                "parquet source: S3 config requires one of `key` or `prefix`".into(),
89            )),
90            (Some(key), None) => {
91                let key = substitute(key, context);
92                Ok(vec![FileTarget::S3(ObjectPath::from(key))])
93            }
94            (None, Some(prefix)) => {
95                let prefix = substitute(prefix, context);
96                let store = self.s3_store.as_ref().ok_or_else(|| {
97                    FaucetError::Source("parquet source: S3 store not initialised".into())
98                })?;
99                list_s3_prefix(store.as_ref(), &prefix).await
100            }
101        }
102    }
103
104    /// Read a single resolved file, returning the rows it yields plus the
105    /// Arrow schema used to decode it (so the caller can detect divergence
106    /// across multiple files).
107    async fn read_file(&self, target: &FileTarget) -> Result<FileOutput, FaucetError> {
108        let display = target.display();
109        match target {
110            FileTarget::Local(path) => {
111                let file = tokio::fs::File::open(path).await.map_err(|e| {
112                    FaucetError::Source(format!("failed to open parquet file '{display}': {e}"))
113                })?;
114                self.decode(file, &display).await
115            }
116            FileTarget::S3(path) => {
117                let store = self.s3_store.as_ref().ok_or_else(|| {
118                    FaucetError::Source("parquet source: S3 store not initialised".into())
119                })?;
120                let reader = ParquetObjectReader::new(store.clone(), path.clone());
121                self.decode(reader, &display).await
122            }
123        }
124    }
125
126    async fn decode<R>(&self, reader: R, display: &str) -> Result<FileOutput, FaucetError>
127    where
128        R: parquet::arrow::async_reader::AsyncFileReader + Send + Unpin + 'static,
129    {
130        let (mut batches, arrow_schema) = self.build_batch_stream(reader, display).await?;
131
132        let mut rows: Vec<Value> = Vec::new();
133        while let Some(batch) = batches.next().await {
134            let batch = batch.map_err(|e| {
135                FaucetError::Source(format!("parquet decode error in '{display}': {e}"))
136            })?;
137            let batch_rows = record_batch_to_json(&batch)?;
138            rows.extend(batch_rows);
139        }
140
141        Ok(FileOutput {
142            path: display.to_string(),
143            rows,
144            arrow_schema,
145        })
146    }
147
148    /// Build a per-file Arrow `RecordBatch` stream from a low-level
149    /// `AsyncFileReader`. Applies the configured projection and `batch_size`
150    /// hint (skipped when `batch_size == 0`, so the file's native row-group
151    /// size governs page cadence).
152    ///
153    /// Used by both [`decode`](Self::decode) (which materialises all rows
154    /// into a `FileOutput`) and [`stream_pages`](
155    /// faucet_core::Source::stream_pages) (which yields one `StreamPage`
156    /// per `RecordBatch`).
157    async fn build_batch_stream<R>(
158        &self,
159        reader: R,
160        display: &str,
161    ) -> Result<(BatchStream, arrow::datatypes::SchemaRef), FaucetError>
162    where
163        R: parquet::arrow::async_reader::AsyncFileReader + Send + Unpin + 'static,
164    {
165        let mut builder = ParquetRecordBatchStreamBuilder::new(reader)
166            .await
167            .map_err(|e| {
168                FaucetError::Source(format!(
169                    "failed to read parquet metadata for '{display}': {e}"
170                ))
171            })?;
172
173        // `batch_size == 0` is the sentinel meaning "use the file's native
174        // row-group size as the batch cadence" — i.e. don't override the
175        // Arrow reader's default, which already yields one batch per
176        // row-group.
177        if self.config.batch_size > 0 {
178            builder = builder.with_batch_size(self.config.batch_size);
179        }
180
181        if let Some(cols) = self.config.columns.as_deref() {
182            let parquet_schema = builder.parquet_schema();
183            validate_projection(cols, parquet_schema, display)?;
184            let mask = ProjectionMask::columns(parquet_schema, cols.iter().map(String::as_str));
185            builder = builder.with_projection(mask);
186        }
187
188        let arrow_schema = builder.schema().clone();
189
190        let stream = builder.build().map_err(|e| {
191            FaucetError::Source(format!(
192                "failed to build parquet stream for '{display}': {e}"
193            ))
194        })?;
195
196        Ok((Box::pin(stream), arrow_schema))
197    }
198
199    /// Open a per-file Arrow `RecordBatch` stream for a resolved target
200    /// (local or S3), returning the boxed stream, the Arrow schema, and a
201    /// display string for error messages.
202    async fn open_target_stream(
203        &self,
204        target: &FileTarget,
205    ) -> Result<(BatchStream, arrow::datatypes::SchemaRef, String), FaucetError> {
206        let display = target.display();
207        match target {
208            FileTarget::Local(path) => {
209                let file = tokio::fs::File::open(path).await.map_err(|e| {
210                    FaucetError::Source(format!("failed to open parquet file '{display}': {e}"))
211                })?;
212                let (stream, schema) = self.build_batch_stream(file, &display).await?;
213                Ok((stream, schema, display))
214            }
215            FileTarget::S3(path) => {
216                let store = self.s3_store.as_ref().ok_or_else(|| {
217                    FaucetError::Source("parquet source: S3 store not initialised".into())
218                })?;
219                let reader = ParquetObjectReader::new(store.clone(), path.clone());
220                let (stream, schema) = self.build_batch_stream(reader, &display).await?;
221                Ok((stream, schema, display))
222            }
223        }
224    }
225}
226
227/// Boxed Arrow `RecordBatch` stream returned by
228/// [`ParquetSource::build_batch_stream`].
229type BatchStream =
230    Pin<Box<dyn futures::Stream<Item = parquet::errors::Result<arrow::array::RecordBatch>> + Send>>;
231
232#[async_trait]
233impl faucet_core::Source for ParquetSource {
234    async fn fetch_with_context(
235        &self,
236        context: &HashMap<String, Value>,
237    ) -> Result<Vec<Value>, FaucetError> {
238        let targets = self.resolve_files(context).await?;
239
240        tracing::info!(files = targets.len(), "Parquet source resolved files");
241
242        if targets.is_empty() {
243            return Ok(Vec::new());
244        }
245
246        let concurrency = self.config.concurrency.max(1);
247
248        let outputs: Vec<FileOutput> = stream::iter(targets)
249            .map(|target| async move {
250                let out = self.read_file(&target).await?;
251                tracing::debug!(file = %out.path, rows = out.rows.len(), "Parquet file decoded");
252                Ok::<FileOutput, FaucetError>(out)
253            })
254            .buffer_unordered(concurrency)
255            .try_collect()
256            .await?;
257
258        if outputs.len() > 1 {
259            let first = &outputs[0];
260            for other in &outputs[1..] {
261                if first.arrow_schema != other.arrow_schema {
262                    return Err(FaucetError::Source(schema_mismatch_message(first, other)));
263                }
264            }
265        }
266
267        let total: usize = outputs.iter().map(|o| o.rows.len()).sum();
268        let mut all = Vec::with_capacity(total);
269        for out in outputs {
270            all.extend(out.rows);
271        }
272
273        tracing::info!(total_records = all.len(), "Parquet source fetch complete");
274        Ok(all)
275    }
276
277    /// Stream RecordBatches from each resolved file, yielding one
278    /// [`StreamPage`] per Arrow `RecordBatch` so client-side memory is
279    /// bounded at `O(batch_size * row_width)` regardless of total file size.
280    ///
281    /// The trait-level `batch_size` argument is ignored in favour of
282    /// [`ParquetSourceConfig::batch_size`] — the config is the user-facing
283    /// knob the README documents, and routing the pipeline-supplied hint
284    /// through it would silently override an explicit config value.
285    ///
286    /// **Cadence:**
287    /// - `batch_size > 0` — passed to
288    ///   [`ParquetRecordBatchStreamBuilder::with_batch_size`]. Arrow may
289    ///   emit a *smaller* batch at row-group boundaries, so an emitted page
290    ///   can be smaller than `batch_size`.
291    /// - `batch_size == 0` — the sentinel skips `with_batch_size`, so the
292    ///   file's native row-group size drives the page cadence (one page per
293    ///   row-group).
294    ///
295    /// **Multi-file scans** (glob / S3 prefix) iterate sequentially in
296    /// sorted order. The first file's Arrow schema is the reference; any
297    /// subsequent file with a different schema surfaces as
298    /// [`FaucetError::Source`] naming both paths and the first diverging
299    /// field — matching the eager `fetch_with_context` behaviour.
300    ///
301    /// Every page carries `bookmark: None` — the Parquet source has no
302    /// incremental-replication mode.
303    fn stream_pages<'a>(
304        &'a self,
305        context: &'a HashMap<String, Value>,
306        _batch_size: usize,
307    ) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>> {
308        Box::pin(async_stream::try_stream! {
309            let targets = self.resolve_files(context).await?;
310            tracing::info!(files = targets.len(), "Parquet source resolved files");
311
312            if targets.is_empty() {
313                return;
314            }
315
316            // Validate every file's schema UP FRONT, before yielding any rows.
317            // A divergent schema on a *later* file must fail before earlier
318            // files' rows are committed downstream — otherwise the streaming
319            // path performs a partial, non-atomic write and only then aborts
320            // (#146 M11). Opening a target reads just the Parquet footer
321            // metadata (not row data), so this is a cheap probe; we drop each
322            // probe stream immediately. The cost is one extra footer read per
323            // file, paid once before streaming begins.
324            let mut reference: Option<(String, arrow::datatypes::SchemaRef)> = None;
325            for target in &targets {
326                let (_, arrow_schema, display) = self.open_target_stream(target).await?;
327                if let Some((first_path, first_schema)) = &reference {
328                    if first_schema != &arrow_schema {
329                        Err(FaucetError::Source(schema_mismatch_message_pair(
330                            first_path,
331                            first_schema,
332                            &display,
333                            &arrow_schema,
334                        )))?;
335                    }
336                } else {
337                    reference = Some((display, arrow_schema));
338                }
339            }
340
341            // Schemas are consistent across all files; stream rows file by file.
342            let mut total_records = 0usize;
343            let mut total_pages = 0usize;
344            for target in &targets {
345                let (mut batches, _schema, display) = self.open_target_stream(target).await?;
346                while let Some(batch) = batches.next().await {
347                    let batch = batch.map_err(|e| {
348                        FaucetError::Source(format!(
349                            "parquet decode error in '{display}': {e}"
350                        ))
351                    })?;
352                    let rows = record_batch_to_json(&batch)?;
353                    if rows.is_empty() {
354                        continue;
355                    }
356                    total_records += rows.len();
357                    total_pages += 1;
358                    yield StreamPage { records: rows, bookmark: None };
359                }
360            }
361
362            tracing::info!(
363                pages = total_pages,
364                total_records,
365                batch_size = self.config.batch_size,
366                "Parquet source stream complete",
367            );
368        })
369    }
370
371    fn config_schema(&self) -> Value {
372        serde_json::to_value(faucet_core::schema_for!(ParquetSourceConfig))
373            .expect("schema serialization")
374    }
375}
376
377/// Per-file decode output, kept around long enough to validate cross-file
378/// schema consistency.
379struct FileOutput {
380    path: String,
381    rows: Vec<Value>,
382    arrow_schema: arrow::datatypes::SchemaRef,
383}
384
385/// Resolved file location ready for reading.
386#[derive(Debug, Clone)]
387enum FileTarget {
388    Local(PathBuf),
389    S3(ObjectPath),
390}
391
392impl FileTarget {
393    fn display(&self) -> String {
394        match self {
395            FileTarget::Local(p) => p.display().to_string(),
396            FileTarget::S3(p) => format!("s3://{p}"),
397        }
398    }
399}
400
401/// Apply context substitution only when there is something to substitute.
402fn substitute(template: &str, context: &HashMap<String, Value>) -> String {
403    if context.is_empty() {
404        template.to_string()
405    } else {
406        faucet_core::util::substitute_context(template, context)
407    }
408}
409
410/// Expand a glob pattern into a sorted list of local file paths.
411fn expand_glob(pattern: &str) -> Result<Vec<FileTarget>, FaucetError> {
412    let entries = glob::glob(pattern)
413        .map_err(|e| FaucetError::Config(format!("invalid glob '{pattern}': {e}")))?;
414
415    let mut paths = Vec::new();
416    for entry in entries {
417        let p = entry
418            .map_err(|e| FaucetError::Source(format!("glob entry error for '{pattern}': {e}")))?;
419        if p.is_file() {
420            paths.push(p);
421        }
422    }
423    paths.sort();
424    Ok(paths.into_iter().map(FileTarget::Local).collect())
425}
426
427/// List S3 objects under `prefix` and turn them into `FileTarget::S3` entries.
428async fn list_s3_prefix(
429    store: &dyn ObjectStore,
430    prefix: &str,
431) -> Result<Vec<FileTarget>, FaucetError> {
432    let prefix_path = if prefix.is_empty() {
433        None
434    } else {
435        Some(ObjectPath::from(prefix))
436    };
437
438    let mut listing = store.list(prefix_path.as_ref());
439    let mut keys = Vec::new();
440    while let Some(item) = listing.next().await {
441        let meta = item.map_err(|e| {
442            FaucetError::Source(format!("S3 list error for prefix '{prefix}': {e}"))
443        })?;
444        keys.push(meta.location);
445    }
446    keys.sort();
447    Ok(keys.into_iter().map(FileTarget::S3).collect())
448}
449
450/// Build an `AmazonS3` `object_store` client from a `ParquetS3Config`.
451fn build_s3_store(s3: &ParquetS3Config) -> Result<Arc<dyn ObjectStore>, FaucetError> {
452    if s3.bucket.trim().is_empty() {
453        return Err(FaucetError::Config(
454            "parquet source: S3 bucket must not be empty".into(),
455        ));
456    }
457
458    let mut builder = AmazonS3Builder::from_env().with_bucket_name(&s3.bucket);
459    if let Some(region) = &s3.region {
460        builder = builder.with_region(region);
461    }
462    if let Some(endpoint) = &s3.endpoint_url {
463        builder = builder.with_endpoint(endpoint);
464        if endpoint.starts_with("http://") {
465            builder = builder.with_allow_http(true);
466        }
467    }
468
469    let store = builder
470        .build()
471        .map_err(|e| FaucetError::Config(format!("failed to build S3 client: {e}")))?;
472    Ok(Arc::new(store))
473}
474
475/// Verify every requested column exists in the file's Parquet schema. The
476/// `ProjectionMask::columns` API silently ignores unknown names, so we
477/// pre-validate here to surface a clear error to the caller.
478fn validate_projection(
479    requested: &[String],
480    parquet_schema: &parquet::schema::types::SchemaDescriptor,
481    display: &str,
482) -> Result<(), FaucetError> {
483    let root = parquet_schema.root_schema();
484    let parquet::schema::types::Type::GroupType { fields, .. } = root else {
485        return Err(FaucetError::Source(format!(
486            "parquet root schema for '{display}' is not a group"
487        )));
488    };
489
490    let known: std::collections::HashSet<&str> = fields.iter().map(|f| f.name()).collect();
491
492    for name in requested {
493        if !known.contains(name.as_str()) {
494            return Err(FaucetError::Source(format!(
495                "parquet source: projected column '{name}' not found in file '{display}' \
496                 (available: {})",
497                known.iter().copied().collect::<Vec<_>>().join(", ")
498            )));
499        }
500    }
501
502    Ok(())
503}
504
505/// Compose a descriptive cross-file schema mismatch error.
506fn schema_mismatch_message(first: &FileOutput, other: &FileOutput) -> String {
507    schema_mismatch_message_pair(
508        &first.path,
509        &first.arrow_schema,
510        &other.path,
511        &other.arrow_schema,
512    )
513}
514
515/// Same as [`schema_mismatch_message`] but works on raw `(path, schema)`
516/// pairs so it can be called from the streaming path where no `FileOutput`
517/// exists.
518fn schema_mismatch_message_pair(
519    first_path: &str,
520    first_schema: &arrow::datatypes::SchemaRef,
521    other_path: &str,
522    other_schema: &arrow::datatypes::SchemaRef,
523) -> String {
524    let first_fields: Vec<String> = first_schema
525        .fields()
526        .iter()
527        .map(|f| format!("{}:{}", f.name(), f.data_type()))
528        .collect();
529    let other_fields: Vec<String> = other_schema
530        .fields()
531        .iter()
532        .map(|f| format!("{}:{}", f.name(), f.data_type()))
533        .collect();
534
535    // Identify the first diverging field for a focused hint.
536    let max_len = first_fields.len().max(other_fields.len());
537    let mut first_diff = None;
538    for i in 0..max_len {
539        let a = first_fields
540            .get(i)
541            .map(String::as_str)
542            .unwrap_or("<missing>");
543        let b = other_fields
544            .get(i)
545            .map(String::as_str)
546            .unwrap_or("<missing>");
547        if a != b {
548            first_diff = Some((i, a.to_string(), b.to_string()));
549            break;
550        }
551    }
552
553    let detail = match first_diff {
554        Some((i, a, b)) => format!(" (field #{i}: '{a}' vs '{b}')"),
555        None => String::new(),
556    };
557
558    format!("parquet source: schema mismatch between '{first_path}' and '{other_path}'{detail}")
559}
560
561#[cfg(test)]
562mod tests {
563    use super::*;
564    use crate::config::ParquetSourceConfig;
565
566    #[test]
567    fn substitute_passes_through_when_context_empty() {
568        let ctx = HashMap::new();
569        assert_eq!(substitute("/tmp/{x}.parquet", &ctx), "/tmp/{x}.parquet");
570    }
571
572    #[test]
573    fn substitute_replaces_placeholders() {
574        let mut ctx = HashMap::new();
575        ctx.insert("region".to_string(), Value::String("us".into()));
576        assert_eq!(
577            substitute("data/{region}/x.parquet", &ctx),
578            "data/us/x.parquet"
579        );
580    }
581
582    #[tokio::test]
583    async fn accepts_zero_batch_size_as_sentinel() {
584        // `batch_size = 0` is the "no batching" sentinel — page cadence
585        // falls back to the file's native row-group size. The source
586        // constructor must accept it.
587        let cfg = ParquetSourceConfig::local("/tmp/x.parquet").batch_size(0);
588        let source = ParquetSource::new(cfg)
589            .await
590            .expect("batch_size=0 must be accepted as the no-batching sentinel");
591        assert_eq!(source.config.batch_size, 0);
592    }
593
594    #[tokio::test]
595    async fn rejects_zero_concurrency() {
596        let cfg = ParquetSourceConfig::local("/tmp/x.parquet").concurrency(0);
597        match ParquetSource::new(cfg).await {
598            Err(FaucetError::Config(msg)) => assert!(msg.contains("concurrency")),
599            other => panic!("expected Config error, got {:?}", other.err()),
600        }
601    }
602
603    #[tokio::test]
604    async fn rejects_s3_with_both_key_and_prefix() {
605        let mut s3 = ParquetS3Config::object("b", "k.parquet");
606        s3.prefix = Some("p/".into());
607        let cfg = ParquetSourceConfig::s3(s3);
608        let source = ParquetSource::new(cfg).await.unwrap();
609        let err = source.resolve_files(&HashMap::new()).await.unwrap_err();
610        assert!(matches!(err, FaucetError::Config(_)));
611    }
612
613    #[tokio::test]
614    async fn rejects_s3_with_neither_key_nor_prefix() {
615        let s3 = ParquetS3Config {
616            bucket: "b".into(),
617            key: None,
618            prefix: None,
619            region: None,
620            endpoint_url: None,
621        };
622        let cfg = ParquetSourceConfig::s3(s3);
623        let source = ParquetSource::new(cfg).await.unwrap();
624        let err = source.resolve_files(&HashMap::new()).await.unwrap_err();
625        assert!(matches!(err, FaucetError::Config(_)));
626    }
627
628    #[test]
629    fn empty_bucket_rejected() {
630        let s3 = ParquetS3Config::object("", "k.parquet");
631        let err = build_s3_store(&s3).unwrap_err();
632        assert!(matches!(err, FaucetError::Config(_)));
633    }
634}