1use std::collections::HashMap;
8use std::path::PathBuf;
9use std::pin::Pin;
10use std::sync::Arc;
11
12use async_trait::async_trait;
13use faucet_core::{FaucetError, Stream, StreamPage};
14use futures::{StreamExt, TryStreamExt, stream};
15use object_store::ObjectStore;
16use object_store::aws::AmazonS3Builder;
17use object_store::path::Path as ObjectPath;
18use parquet::arrow::ProjectionMask;
19use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder};
20use serde_json::Value;
21
22use crate::config::{ParquetLocation, ParquetS3Config, ParquetSourceConfig};
23use crate::convert::record_batch_to_json;
24
25pub struct ParquetSource {
27 config: ParquetSourceConfig,
28 s3_store: Option<Arc<dyn ObjectStore>>,
31}
32
33impl ParquetSource {
34 pub async fn new(config: ParquetSourceConfig) -> Result<Self, FaucetError> {
40 if config.concurrency == 0 {
44 return Err(FaucetError::Config(
45 "parquet source: concurrency must be > 0".into(),
46 ));
47 }
48
49 let s3_store = match &config.source {
50 ParquetLocation::S3(s3) => Some(build_s3_store(s3)?),
51 _ => None,
52 };
53
54 Ok(Self { config, s3_store })
55 }
56
57 async fn resolve_files(
62 &self,
63 context: &HashMap<String, Value>,
64 ) -> Result<Vec<FileTarget>, FaucetError> {
65 match &self.config.source {
66 ParquetLocation::LocalPath { path } => {
67 let resolved = substitute(path, context);
68 Ok(vec![FileTarget::Local(PathBuf::from(resolved))])
69 }
70 ParquetLocation::Glob { pattern } => {
71 let resolved = substitute(pattern, context);
72 expand_glob(&resolved)
73 }
74 ParquetLocation::S3(s3) => self.resolve_s3_files(s3, context).await,
75 }
76 }
77
78 async fn resolve_s3_files(
79 &self,
80 s3: &ParquetS3Config,
81 context: &HashMap<String, Value>,
82 ) -> Result<Vec<FileTarget>, FaucetError> {
83 match (&s3.key, &s3.prefix) {
84 (Some(_), Some(_)) => Err(FaucetError::Config(
85 "parquet source: S3 config cannot set both `key` and `prefix`".into(),
86 )),
87 (None, None) => Err(FaucetError::Config(
88 "parquet source: S3 config requires one of `key` or `prefix`".into(),
89 )),
90 (Some(key), None) => {
91 let key = substitute(key, context);
92 Ok(vec![FileTarget::S3(ObjectPath::from(key))])
93 }
94 (None, Some(prefix)) => {
95 let prefix = substitute(prefix, context);
96 let store = self.s3_store.as_ref().ok_or_else(|| {
97 FaucetError::Source("parquet source: S3 store not initialised".into())
98 })?;
99 list_s3_prefix(store.as_ref(), &prefix).await
100 }
101 }
102 }
103
104 async fn read_file(&self, target: &FileTarget) -> Result<FileOutput, FaucetError> {
108 let display = target.display();
109 match target {
110 FileTarget::Local(path) => {
111 let file = tokio::fs::File::open(path).await.map_err(|e| {
112 FaucetError::Source(format!("failed to open parquet file '{display}': {e}"))
113 })?;
114 self.decode(file, &display).await
115 }
116 FileTarget::S3(path) => {
117 let store = self.s3_store.as_ref().ok_or_else(|| {
118 FaucetError::Source("parquet source: S3 store not initialised".into())
119 })?;
120 let reader = ParquetObjectReader::new(store.clone(), path.clone());
121 self.decode(reader, &display).await
122 }
123 }
124 }
125
126 async fn decode<R>(&self, reader: R, display: &str) -> Result<FileOutput, FaucetError>
127 where
128 R: parquet::arrow::async_reader::AsyncFileReader + Send + Unpin + 'static,
129 {
130 let (mut batches, arrow_schema) = self.build_batch_stream(reader, display).await?;
131
132 let mut rows: Vec<Value> = Vec::new();
133 while let Some(batch) = batches.next().await {
134 let batch = batch.map_err(|e| {
135 FaucetError::Source(format!("parquet decode error in '{display}': {e}"))
136 })?;
137 let batch_rows = record_batch_to_json(&batch)?;
138 rows.extend(batch_rows);
139 }
140
141 Ok(FileOutput {
142 path: display.to_string(),
143 rows,
144 arrow_schema,
145 })
146 }
147
148 async fn build_batch_stream<R>(
158 &self,
159 reader: R,
160 display: &str,
161 ) -> Result<(BatchStream, arrow::datatypes::SchemaRef), FaucetError>
162 where
163 R: parquet::arrow::async_reader::AsyncFileReader + Send + Unpin + 'static,
164 {
165 let mut builder = ParquetRecordBatchStreamBuilder::new(reader)
166 .await
167 .map_err(|e| {
168 FaucetError::Source(format!(
169 "failed to read parquet metadata for '{display}': {e}"
170 ))
171 })?;
172
173 if self.config.batch_size > 0 {
178 builder = builder.with_batch_size(self.config.batch_size);
179 }
180
181 if let Some(cols) = self.config.columns.as_deref() {
182 let parquet_schema = builder.parquet_schema();
183 validate_projection(cols, parquet_schema, display)?;
184 let mask = ProjectionMask::columns(parquet_schema, cols.iter().map(String::as_str));
185 builder = builder.with_projection(mask);
186 }
187
188 let arrow_schema = builder.schema().clone();
189
190 let stream = builder.build().map_err(|e| {
191 FaucetError::Source(format!(
192 "failed to build parquet stream for '{display}': {e}"
193 ))
194 })?;
195
196 Ok((Box::pin(stream), arrow_schema))
197 }
198
199 async fn open_target_stream(
203 &self,
204 target: &FileTarget,
205 ) -> Result<(BatchStream, arrow::datatypes::SchemaRef, String), FaucetError> {
206 let display = target.display();
207 match target {
208 FileTarget::Local(path) => {
209 let file = tokio::fs::File::open(path).await.map_err(|e| {
210 FaucetError::Source(format!("failed to open parquet file '{display}': {e}"))
211 })?;
212 let (stream, schema) = self.build_batch_stream(file, &display).await?;
213 Ok((stream, schema, display))
214 }
215 FileTarget::S3(path) => {
216 let store = self.s3_store.as_ref().ok_or_else(|| {
217 FaucetError::Source("parquet source: S3 store not initialised".into())
218 })?;
219 let reader = ParquetObjectReader::new(store.clone(), path.clone());
220 let (stream, schema) = self.build_batch_stream(reader, &display).await?;
221 Ok((stream, schema, display))
222 }
223 }
224 }
225}
226
227type BatchStream =
230 Pin<Box<dyn futures::Stream<Item = parquet::errors::Result<arrow::array::RecordBatch>> + Send>>;
231
232#[async_trait]
233impl faucet_core::Source for ParquetSource {
234 async fn fetch_with_context(
235 &self,
236 context: &HashMap<String, Value>,
237 ) -> Result<Vec<Value>, FaucetError> {
238 let targets = self.resolve_files(context).await?;
239
240 tracing::info!(files = targets.len(), "Parquet source resolved files");
241
242 if targets.is_empty() {
243 return Ok(Vec::new());
244 }
245
246 let concurrency = self.config.concurrency.max(1);
247
248 let outputs: Vec<FileOutput> = stream::iter(targets)
249 .map(|target| async move {
250 let out = self.read_file(&target).await?;
251 tracing::debug!(file = %out.path, rows = out.rows.len(), "Parquet file decoded");
252 Ok::<FileOutput, FaucetError>(out)
253 })
254 .buffer_unordered(concurrency)
255 .try_collect()
256 .await?;
257
258 if outputs.len() > 1 {
259 let first = &outputs[0];
260 for other in &outputs[1..] {
261 if first.arrow_schema != other.arrow_schema {
262 return Err(FaucetError::Source(schema_mismatch_message(first, other)));
263 }
264 }
265 }
266
267 let total: usize = outputs.iter().map(|o| o.rows.len()).sum();
268 let mut all = Vec::with_capacity(total);
269 for out in outputs {
270 all.extend(out.rows);
271 }
272
273 tracing::info!(total_records = all.len(), "Parquet source fetch complete");
274 Ok(all)
275 }
276
277 fn stream_pages<'a>(
304 &'a self,
305 context: &'a HashMap<String, Value>,
306 _batch_size: usize,
307 ) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>> {
308 Box::pin(async_stream::try_stream! {
309 let targets = self.resolve_files(context).await?;
310 tracing::info!(files = targets.len(), "Parquet source resolved files");
311
312 if targets.is_empty() {
313 return;
314 }
315
316 let mut reference: Option<(String, arrow::datatypes::SchemaRef)> = None;
325 for target in &targets {
326 let (_, arrow_schema, display) = self.open_target_stream(target).await?;
327 if let Some((first_path, first_schema)) = &reference {
328 if first_schema != &arrow_schema {
329 Err(FaucetError::Source(schema_mismatch_message_pair(
330 first_path,
331 first_schema,
332 &display,
333 &arrow_schema,
334 )))?;
335 }
336 } else {
337 reference = Some((display, arrow_schema));
338 }
339 }
340
341 let mut total_records = 0usize;
343 let mut total_pages = 0usize;
344 for target in &targets {
345 let (mut batches, _schema, display) = self.open_target_stream(target).await?;
346 while let Some(batch) = batches.next().await {
347 let batch = batch.map_err(|e| {
348 FaucetError::Source(format!(
349 "parquet decode error in '{display}': {e}"
350 ))
351 })?;
352 let rows = record_batch_to_json(&batch)?;
353 if rows.is_empty() {
354 continue;
355 }
356 total_records += rows.len();
357 total_pages += 1;
358 yield StreamPage { records: rows, bookmark: None };
359 }
360 }
361
362 tracing::info!(
363 pages = total_pages,
364 total_records,
365 batch_size = self.config.batch_size,
366 "Parquet source stream complete",
367 );
368 })
369 }
370
371 fn config_schema(&self) -> Value {
372 serde_json::to_value(faucet_core::schema_for!(ParquetSourceConfig))
373 .expect("schema serialization")
374 }
375}
376
377struct FileOutput {
380 path: String,
381 rows: Vec<Value>,
382 arrow_schema: arrow::datatypes::SchemaRef,
383}
384
385#[derive(Debug, Clone)]
387enum FileTarget {
388 Local(PathBuf),
389 S3(ObjectPath),
390}
391
392impl FileTarget {
393 fn display(&self) -> String {
394 match self {
395 FileTarget::Local(p) => p.display().to_string(),
396 FileTarget::S3(p) => format!("s3://{p}"),
397 }
398 }
399}
400
401fn substitute(template: &str, context: &HashMap<String, Value>) -> String {
403 if context.is_empty() {
404 template.to_string()
405 } else {
406 faucet_core::util::substitute_context(template, context)
407 }
408}
409
410fn expand_glob(pattern: &str) -> Result<Vec<FileTarget>, FaucetError> {
412 let entries = glob::glob(pattern)
413 .map_err(|e| FaucetError::Config(format!("invalid glob '{pattern}': {e}")))?;
414
415 let mut paths = Vec::new();
416 for entry in entries {
417 let p = entry
418 .map_err(|e| FaucetError::Source(format!("glob entry error for '{pattern}': {e}")))?;
419 if p.is_file() {
420 paths.push(p);
421 }
422 }
423 paths.sort();
424 Ok(paths.into_iter().map(FileTarget::Local).collect())
425}
426
427async fn list_s3_prefix(
429 store: &dyn ObjectStore,
430 prefix: &str,
431) -> Result<Vec<FileTarget>, FaucetError> {
432 let prefix_path = if prefix.is_empty() {
433 None
434 } else {
435 Some(ObjectPath::from(prefix))
436 };
437
438 let mut listing = store.list(prefix_path.as_ref());
439 let mut keys = Vec::new();
440 while let Some(item) = listing.next().await {
441 let meta = item.map_err(|e| {
442 FaucetError::Source(format!("S3 list error for prefix '{prefix}': {e}"))
443 })?;
444 keys.push(meta.location);
445 }
446 keys.sort();
447 Ok(keys.into_iter().map(FileTarget::S3).collect())
448}
449
450fn build_s3_store(s3: &ParquetS3Config) -> Result<Arc<dyn ObjectStore>, FaucetError> {
452 if s3.bucket.trim().is_empty() {
453 return Err(FaucetError::Config(
454 "parquet source: S3 bucket must not be empty".into(),
455 ));
456 }
457
458 let mut builder = AmazonS3Builder::from_env().with_bucket_name(&s3.bucket);
459 if let Some(region) = &s3.region {
460 builder = builder.with_region(region);
461 }
462 if let Some(endpoint) = &s3.endpoint_url {
463 builder = builder.with_endpoint(endpoint);
464 if endpoint.starts_with("http://") {
465 builder = builder.with_allow_http(true);
466 }
467 }
468
469 let store = builder
470 .build()
471 .map_err(|e| FaucetError::Config(format!("failed to build S3 client: {e}")))?;
472 Ok(Arc::new(store))
473}
474
475fn validate_projection(
479 requested: &[String],
480 parquet_schema: &parquet::schema::types::SchemaDescriptor,
481 display: &str,
482) -> Result<(), FaucetError> {
483 let root = parquet_schema.root_schema();
484 let parquet::schema::types::Type::GroupType { fields, .. } = root else {
485 return Err(FaucetError::Source(format!(
486 "parquet root schema for '{display}' is not a group"
487 )));
488 };
489
490 let known: std::collections::HashSet<&str> = fields.iter().map(|f| f.name()).collect();
491
492 for name in requested {
493 if !known.contains(name.as_str()) {
494 return Err(FaucetError::Source(format!(
495 "parquet source: projected column '{name}' not found in file '{display}' \
496 (available: {})",
497 known.iter().copied().collect::<Vec<_>>().join(", ")
498 )));
499 }
500 }
501
502 Ok(())
503}
504
505fn schema_mismatch_message(first: &FileOutput, other: &FileOutput) -> String {
507 schema_mismatch_message_pair(
508 &first.path,
509 &first.arrow_schema,
510 &other.path,
511 &other.arrow_schema,
512 )
513}
514
515fn schema_mismatch_message_pair(
519 first_path: &str,
520 first_schema: &arrow::datatypes::SchemaRef,
521 other_path: &str,
522 other_schema: &arrow::datatypes::SchemaRef,
523) -> String {
524 let first_fields: Vec<String> = first_schema
525 .fields()
526 .iter()
527 .map(|f| format!("{}:{}", f.name(), f.data_type()))
528 .collect();
529 let other_fields: Vec<String> = other_schema
530 .fields()
531 .iter()
532 .map(|f| format!("{}:{}", f.name(), f.data_type()))
533 .collect();
534
535 let max_len = first_fields.len().max(other_fields.len());
537 let mut first_diff = None;
538 for i in 0..max_len {
539 let a = first_fields
540 .get(i)
541 .map(String::as_str)
542 .unwrap_or("<missing>");
543 let b = other_fields
544 .get(i)
545 .map(String::as_str)
546 .unwrap_or("<missing>");
547 if a != b {
548 first_diff = Some((i, a.to_string(), b.to_string()));
549 break;
550 }
551 }
552
553 let detail = match first_diff {
554 Some((i, a, b)) => format!(" (field #{i}: '{a}' vs '{b}')"),
555 None => String::new(),
556 };
557
558 format!("parquet source: schema mismatch between '{first_path}' and '{other_path}'{detail}")
559}
560
561#[cfg(test)]
562mod tests {
563 use super::*;
564 use crate::config::ParquetSourceConfig;
565
566 #[test]
567 fn substitute_passes_through_when_context_empty() {
568 let ctx = HashMap::new();
569 assert_eq!(substitute("/tmp/{x}.parquet", &ctx), "/tmp/{x}.parquet");
570 }
571
572 #[test]
573 fn substitute_replaces_placeholders() {
574 let mut ctx = HashMap::new();
575 ctx.insert("region".to_string(), Value::String("us".into()));
576 assert_eq!(
577 substitute("data/{region}/x.parquet", &ctx),
578 "data/us/x.parquet"
579 );
580 }
581
582 #[tokio::test]
583 async fn accepts_zero_batch_size_as_sentinel() {
584 let cfg = ParquetSourceConfig::local("/tmp/x.parquet").batch_size(0);
588 let source = ParquetSource::new(cfg)
589 .await
590 .expect("batch_size=0 must be accepted as the no-batching sentinel");
591 assert_eq!(source.config.batch_size, 0);
592 }
593
594 #[tokio::test]
595 async fn rejects_zero_concurrency() {
596 let cfg = ParquetSourceConfig::local("/tmp/x.parquet").concurrency(0);
597 match ParquetSource::new(cfg).await {
598 Err(FaucetError::Config(msg)) => assert!(msg.contains("concurrency")),
599 other => panic!("expected Config error, got {:?}", other.err()),
600 }
601 }
602
603 #[tokio::test]
604 async fn rejects_s3_with_both_key_and_prefix() {
605 let mut s3 = ParquetS3Config::object("b", "k.parquet");
606 s3.prefix = Some("p/".into());
607 let cfg = ParquetSourceConfig::s3(s3);
608 let source = ParquetSource::new(cfg).await.unwrap();
609 let err = source.resolve_files(&HashMap::new()).await.unwrap_err();
610 assert!(matches!(err, FaucetError::Config(_)));
611 }
612
613 #[tokio::test]
614 async fn rejects_s3_with_neither_key_nor_prefix() {
615 let s3 = ParquetS3Config {
616 bucket: "b".into(),
617 key: None,
618 prefix: None,
619 region: None,
620 endpoint_url: None,
621 };
622 let cfg = ParquetSourceConfig::s3(s3);
623 let source = ParquetSource::new(cfg).await.unwrap();
624 let err = source.resolve_files(&HashMap::new()).await.unwrap_err();
625 assert!(matches!(err, FaucetError::Config(_)));
626 }
627
628 #[test]
629 fn empty_bucket_rejected() {
630 let s3 = ParquetS3Config::object("", "k.parquet");
631 let err = build_s3_store(&s3).unwrap_err();
632 assert!(matches!(err, FaucetError::Config(_)));
633 }
634}