iceberg_rust/arrow/
read.rs1use std::{convert, sync::Arc};
6
7use arrow::record_batch::RecordBatch;
8use futures::{stream, Stream, StreamExt};
9use iceberg_rust_spec::util;
10use object_store::ObjectStore;
11use parquet::{
12 arrow::{async_reader::ParquetObjectReader, ParquetRecordBatchStreamBuilder},
13 errors::ParquetError,
14};
15
16use crate::error::Error;
17
18use iceberg_rust_spec::spec::manifest::{FileFormat, ManifestEntry};
19
20pub async fn read(
22 manifest_files: impl Iterator<Item = ManifestEntry>,
23 object_store: Arc<dyn ObjectStore>,
24) -> impl Stream<Item = Result<RecordBatch, ParquetError>> {
25 stream::iter(manifest_files)
26 .then(move |manifest| {
27 let object_store = object_store.clone();
28 async move {
29 let data_file = manifest.data_file();
30 match data_file.file_format() {
31 FileFormat::Parquet => {
32 let object_reader = ParquetObjectReader::new(
33 object_store,
34 util::strip_prefix(data_file.file_path()).into(),
35 )
36 .with_file_size((*data_file.file_size_in_bytes()) as u64);
37 Ok::<_, Error>(
38 ParquetRecordBatchStreamBuilder::new(object_reader)
39 .await?
40 .build()?,
41 )
42 }
43 _ => Err(Error::NotSupported("fileformat".to_string())),
44 }
45 }
46 })
47 .filter_map(|x| async move { x.ok() })
48 .flat_map_unordered(None, convert::identity)
49}