iceberg_rust/arrow/
read.rs

1/*!
2 * Functions to read arrow record batches from an iceberg table
3*/
4
5use std::{convert, sync::Arc};
6
7use arrow::record_batch::RecordBatch;
8use futures::{stream, Stream, StreamExt};
9use iceberg_rust_spec::util;
10use object_store::ObjectStore;
11use parquet::{
12    arrow::{async_reader::ParquetObjectReader, ParquetRecordBatchStreamBuilder},
13    errors::ParquetError,
14};
15
16use crate::error::Error;
17
18use iceberg_rust_spec::spec::manifest::{FileFormat, ManifestEntry};
19
20/// Read a parquet file into a stream of arrow recordbatches. The record batches are read asynchronously and are unordered
21pub async fn read(
22    manifest_files: impl Iterator<Item = ManifestEntry>,
23    object_store: Arc<dyn ObjectStore>,
24) -> impl Stream<Item = Result<RecordBatch, ParquetError>> {
25    stream::iter(manifest_files)
26        .then(move |manifest| {
27            let object_store = object_store.clone();
28            async move {
29                let data_file = manifest.data_file();
30                match data_file.file_format() {
31                    FileFormat::Parquet => {
32                        let object_reader = ParquetObjectReader::new(
33                            object_store,
34                            util::strip_prefix(data_file.file_path()).into(),
35                        )
36                        .with_file_size((*data_file.file_size_in_bytes()) as u64);
37                        Ok::<_, Error>(
38                            ParquetRecordBatchStreamBuilder::new(object_reader)
39                                .await?
40                                .build()?,
41                        )
42                    }
43                    _ => Err(Error::NotSupported("fileformat".to_string())),
44                }
45            }
46        })
47        .filter_map(|x| async move { x.ok() })
48        .flat_map_unordered(None, convert::identity)
49}