Skip to main content

parquet/arrow/async_reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! `async` API for reading Parquet files as [`RecordBatch`]es
19//!
20//! See the [crate-level documentation](crate) for more details.
21//!
22//! See example on [`ParquetRecordBatchStreamBuilder::new`]
23
24use std::fmt::Formatter;
25use std::io::SeekFrom;
26use std::ops::Range;
27use std::pin::Pin;
28use std::sync::Arc;
29use std::task::{Context, Poll};
30
31use bytes::Bytes;
32use futures::future::{BoxFuture, FutureExt};
33use futures::stream::Stream;
34use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
35
36use arrow_array::RecordBatch;
37use arrow_schema::{Schema, SchemaRef};
38
39use crate::arrow::arrow_reader::{
40    ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
41};
42
43use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
44use crate::bloom_filter::{
45    SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
46};
47use crate::errors::{ParquetError, Result};
48use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
49
50mod metadata;
51pub use metadata::*;
52
53#[cfg(feature = "object_store")]
54mod store;
55
56use crate::DecodeResult;
57use crate::arrow::push_decoder::{NoInput, ParquetPushDecoder, ParquetPushDecoderBuilder};
58#[cfg(feature = "object_store")]
59pub use store::*;
60
61/// The asynchronous interface used by [`ParquetRecordBatchStream`] to read parquet files
62///
63/// Notes:
64///
65/// 1. There is a default implementation for types that implement [`AsyncRead`]
66///    and [`AsyncSeek`], for example [`tokio::fs::File`].
67///
68/// 2. [`ParquetObjectReader`], available when the `object_store` crate feature
69///    is enabled, implements this interface for [`ObjectStore`].
70///
71/// [`ObjectStore`]: object_store::ObjectStore
72///
73/// [`tokio::fs::File`]: https://docs.rs/tokio/latest/tokio/fs/struct.File.html
74pub trait AsyncFileReader: Send {
75    /// Retrieve the bytes in `range`
76    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>>;
77
78    /// Retrieve multiple byte ranges. The default implementation will call `get_bytes` sequentially
79    fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
80        async move {
81            let mut result = Vec::with_capacity(ranges.len());
82
83            for range in ranges.into_iter() {
84                let data = self.get_bytes(range).await?;
85                result.push(data);
86            }
87
88            Ok(result)
89        }
90        .boxed()
91    }
92
93    /// Return a future which results in the [`ParquetMetaData`] for this Parquet file.
94    ///
95    /// This is an asynchronous operation as it may involve reading the file
96    /// footer and potentially other metadata from disk or a remote source.
97    ///
98    /// Reading data from Parquet requires the metadata to understand the
99    /// schema, row groups, and location of pages within the file. This metadata
100    /// is stored primarily in the footer of the Parquet file, and can be read using
101    /// [`ParquetMetaDataReader`].
102    ///
103    /// However, implementations can significantly speed up reading Parquet by
104    /// supplying cached metadata or pre-fetched metadata via this API.
105    ///
106    /// # Parameters
107    /// * `options`: Optional [`ArrowReaderOptions`] that may contain decryption
108    ///   and other options that affect how the metadata is read.
109    fn get_metadata<'a>(
110        &'a mut self,
111        options: Option<&'a ArrowReaderOptions>,
112    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>>;
113}
114
115/// This allows Box<dyn AsyncFileReader + '_> to be used as an AsyncFileReader,
116impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
117    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
118        self.as_mut().get_bytes(range)
119    }
120
121    fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
122        self.as_mut().get_byte_ranges(ranges)
123    }
124
125    fn get_metadata<'a>(
126        &'a mut self,
127        options: Option<&'a ArrowReaderOptions>,
128    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
129        self.as_mut().get_metadata(options)
130    }
131}
132
133impl<T: AsyncFileReader + MetadataFetch + AsyncRead + AsyncSeek + Unpin> MetadataSuffixFetch for T {
134    fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
135        async move {
136            self.seek(SeekFrom::End(-(suffix as i64))).await?;
137            let mut buf = Vec::with_capacity(suffix);
138            self.take(suffix as _).read_to_end(&mut buf).await?;
139            Ok(buf.into())
140        }
141        .boxed()
142    }
143}
144
145impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
146    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
147        async move {
148            self.seek(SeekFrom::Start(range.start)).await?;
149
150            let to_read = range.end - range.start;
151            let mut buffer = Vec::with_capacity(to_read.try_into()?);
152            let read = self.take(to_read).read_to_end(&mut buffer).await?;
153            if read as u64 != to_read {
154                return Err(eof_err!("expected to read {} bytes, got {}", to_read, read));
155            }
156
157            Ok(buffer.into())
158        }
159        .boxed()
160    }
161
162    fn get_metadata<'a>(
163        &'a mut self,
164        options: Option<&'a ArrowReaderOptions>,
165    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
166        async move {
167            let metadata_opts = options.map(|o| o.metadata_options().clone());
168            let mut metadata_reader =
169                ParquetMetaDataReader::new().with_metadata_options(metadata_opts);
170
171            if let Some(opts) = options {
172                metadata_reader = metadata_reader
173                    .with_column_index_policy(opts.column_index_policy())
174                    .with_offset_index_policy(opts.offset_index_policy());
175            }
176
177            #[cfg(feature = "encryption")]
178            let metadata_reader = metadata_reader.with_decryption_properties(
179                options.and_then(|o| o.file_decryption_properties.as_ref().map(Arc::clone)),
180            );
181
182            let parquet_metadata = metadata_reader.load_via_suffix_and_finish(self).await?;
183            Ok(Arc::new(parquet_metadata))
184        }
185        .boxed()
186    }
187}
188
189impl ArrowReaderMetadata {
190    /// Returns a new [`ArrowReaderMetadata`] for this builder
191    ///
192    /// See [`ParquetRecordBatchStreamBuilder::new_with_metadata`] for how this can be used
193    pub async fn load_async<T: AsyncFileReader>(
194        input: &mut T,
195        options: ArrowReaderOptions,
196    ) -> Result<Self> {
197        let metadata = input.get_metadata(Some(&options)).await?;
198        Self::try_new(metadata, options)
199    }
200}
201
202#[doc(hidden)]
203/// Newtype (wrapper) used within [`ArrowReaderBuilder`] to distinguish sync readers from async
204///
205/// Allows sharing the same builder for different readers while keeping the same
206/// ParquetRecordBatchStreamBuilder API
207pub struct AsyncReader<T>(T);
208
209/// A builder for reading parquet files from an `async` source as  [`ParquetRecordBatchStream`]
210///
211/// This can be used to decode a Parquet file in streaming fashion (without
212/// downloading the whole file at once) from a remote source, such as an object store.
213///
214/// This builder handles reading the parquet file metadata, allowing consumers
215/// to use this information to select what specific columns, row groups, etc.
216/// they wish to be read by the resulting stream.
217///
218/// See examples on [`ParquetRecordBatchStreamBuilder::new`], including how to
219/// issue multiple I/O requests in parallel using multiple streams.
220///
221/// # See also:
222/// * [`ParquetPushDecoderBuilder`] for lower level control over buffering and
223///   decoding.
224/// * [`ParquetRecordBatchStream::next_row_group`] for I/O prefetching
225///
226///
227/// See [`ArrowReaderBuilder`] for additional member functions
228pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>;
229
230impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
231    /// Create a new [`ParquetRecordBatchStreamBuilder`] for reading from the
232    /// specified source.
233    ///
234    /// # Examples:
235    /// * [Basic example reading from an async source](#example)
236    /// * [Configuring options and reading metadata](#example-configuring-options-and-reading-metadata)
237    /// * [Reading Row Groups in Parallel](#example-reading-row-groups-in-parallel)
238    ///
239    /// # Example
240    /// ```
241    /// # #[tokio::main(flavor="current_thread")]
242    /// # async fn main() {
243    /// #
244    /// # use arrow_array::RecordBatch;
245    /// # use arrow::util::pretty::pretty_format_batches;
246    /// # use futures::TryStreamExt;
247    /// #
248    /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
249    /// #
250    /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
251    /// #     let formatted = pretty_format_batches(batches).unwrap().to_string();
252    /// #     let actual_lines: Vec<_> = formatted.trim().lines().collect();
253    /// #     assert_eq!(
254    /// #          &actual_lines, expected_lines,
255    /// #          "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
256    /// #          expected_lines, actual_lines
257    /// #      );
258    /// #  }
259    /// #
260    /// # let testdata = arrow::util::test_util::parquet_test_data();
261    /// # let path = format!("{}/alltypes_plain.parquet", testdata);
262    /// // Use tokio::fs::File to read data using an async I/O. This can be replaced with
263    /// // another async I/O reader such as a reader from an object store.
264    /// let file = tokio::fs::File::open(path).await.unwrap();
265    ///
266    /// // Configure options for reading from the async source
267    /// let builder = ParquetRecordBatchStreamBuilder::new(file)
268    ///     .await
269    ///     .unwrap();
270    /// // Building the stream opens the parquet file (reads metadata, etc) and returns
271    /// // a stream that can be used to incrementally read the data in batches
272    /// let stream = builder.build().unwrap();
273    /// // In this example, we collect the stream into a Vec<RecordBatch>
274    /// // but real applications would likely process the batches as they are read
275    /// let results = stream.try_collect::<Vec<_>>().await.unwrap();
276    /// // Demonstrate the results are as expected
277    /// assert_batches_eq(
278    ///     &results,
279    ///     &[
280    ///       "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
281    ///       "| id | bool_col | tinyint_col | smallint_col | int_col | bigint_col | float_col | double_col | date_string_col  | string_col | timestamp_col       |",
282    ///       "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
283    ///       "| 4  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30332f30312f3039 | 30         | 2009-03-01T00:00:00 |",
284    ///       "| 5  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30332f30312f3039 | 31         | 2009-03-01T00:01:00 |",
285    ///       "| 6  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30342f30312f3039 | 30         | 2009-04-01T00:00:00 |",
286    ///       "| 7  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30342f30312f3039 | 31         | 2009-04-01T00:01:00 |",
287    ///       "| 2  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30322f30312f3039 | 30         | 2009-02-01T00:00:00 |",
288    ///       "| 3  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30322f30312f3039 | 31         | 2009-02-01T00:01:00 |",
289    ///       "| 0  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30312f30312f3039 | 30         | 2009-01-01T00:00:00 |",
290    ///       "| 1  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30312f30312f3039 | 31         | 2009-01-01T00:01:00 |",
291    ///       "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
292    ///      ],
293    ///  );
294    /// # }
295    /// ```
296    ///
297    /// # Example Configuring Options and Reading Metadata
298    ///
299    /// There are many options that control the behavior of the reader, such as
300    /// `with_batch_size`, `with_projection`, `with_filter`, etc...
301    ///
302    /// ```
303    /// # #[tokio::main(flavor="current_thread")]
304    /// # async fn main() {
305    /// #
306    /// # use arrow_array::RecordBatch;
307    /// # use arrow::util::pretty::pretty_format_batches;
308    /// # use futures::TryStreamExt;
309    /// #
310    /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
311    /// #
312    /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
313    /// #     let formatted = pretty_format_batches(batches).unwrap().to_string();
314    /// #     let actual_lines: Vec<_> = formatted.trim().lines().collect();
315    /// #     assert_eq!(
316    /// #          &actual_lines, expected_lines,
317    /// #          "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
318    /// #          expected_lines, actual_lines
319    /// #      );
320    /// #  }
321    /// #
322    /// # let testdata = arrow::util::test_util::parquet_test_data();
323    /// # let path = format!("{}/alltypes_plain.parquet", testdata);
324    /// // As before, use tokio::fs::File to read data using an async I/O.
325    /// let file = tokio::fs::File::open(path).await.unwrap();
326    ///
327    /// // Configure options for reading from the async source, in this case we set the batch size
328    /// // to 3 which produces 3 rows at a time.
329    /// let builder = ParquetRecordBatchStreamBuilder::new(file)
330    ///     .await
331    ///     .unwrap()
332    ///     .with_batch_size(3);
333    ///
334    /// // We can also read the metadata to inspect the schema and other metadata
335    /// // before actually reading the data
336    /// let file_metadata = builder.metadata().file_metadata();
337    /// // Specify that we only want to read the 1st, 2nd, and 6th columns
338    /// let mask = ProjectionMask::roots(file_metadata.schema_descr(), [1, 2, 6]);
339    ///
340    /// let stream = builder.with_projection(mask).build().unwrap();
341    /// let results = stream.try_collect::<Vec<_>>().await.unwrap();
342    /// // Print out the results
343    /// assert_batches_eq(
344    ///     &results,
345    ///     &[
346    ///         "+----------+-------------+-----------+",
347    ///         "| bool_col | tinyint_col | float_col |",
348    ///         "+----------+-------------+-----------+",
349    ///         "| true     | 0           | 0.0       |",
350    ///         "| false    | 1           | 1.1       |",
351    ///         "| true     | 0           | 0.0       |",
352    ///         "| false    | 1           | 1.1       |",
353    ///         "| true     | 0           | 0.0       |",
354    ///         "| false    | 1           | 1.1       |",
355    ///         "| true     | 0           | 0.0       |",
356    ///         "| false    | 1           | 1.1       |",
357    ///         "+----------+-------------+-----------+",
358    ///      ],
359    ///  );
360    ///
361    /// // The results has 8 rows, so since we set the batch size to 3, we expect
362    /// // 3 batches, two with 3 rows each and the last batch with 2 rows.
363    /// assert_eq!(results.len(), 3);
364    /// # }
365    /// ```
366    ///
367    /// # Example reading Row Groups in Parallel
368    ///
369    /// Each [`ParquetRecordBatchStream`] is independent and can be used to read
370    /// from the same underlying source in parallel. Use
371    /// [`ParquetRecordBatchStream::next_row_group`] with a single stream to
372    /// begin prefetching the next Row Group. To read a file in parallel, create
373    /// a stream for each subset of the file. For example, you can read each
374    /// row group in parallel by creating a stream for each row group using the
375    /// [`ParquetRecordBatchStreamBuilder::with_row_groups`] API as shown below
376    ///
377    /// ```
378    /// # use std::sync::Arc;
379    /// # use arrow_array::{ArrayRef, Int32Array, RecordBatch};
380    /// # use arrow::util::pretty::pretty_format_batches;
381    /// # use futures::{StreamExt, TryStreamExt};
382    /// # use tempfile::NamedTempFile;
383    /// # use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask};
384    /// # use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
385    /// # use parquet::file::metadata::ParquetMetaDataReader;
386    /// # use parquet::file::properties::{WriterProperties};
387    /// # // write to a temporary file with 10 RowGroups and read back with async API
388    /// # fn write_file() -> parquet::errors::Result<NamedTempFile> {
389    /// #   let mut file = NamedTempFile::new().unwrap();
390    /// #   let small_batch = RecordBatch::try_from_iter([
391    /// #      ("id", Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4])) as ArrayRef),
392    /// #   ]).unwrap();
393    /// #   let props = WriterProperties::builder()
394    /// #     .set_max_row_group_row_count(Some(5))
395    /// #     .set_write_batch_size(5)
396    /// #     .build();
397    /// #   let mut writer = ArrowWriter::try_new(&mut file, small_batch.schema(), Some(props))?;
398    /// #   for i in 0..10 {
399    /// #     writer.write(&small_batch)?
400    /// #   };
401    /// #   writer.close()?;
402    /// #   Ok(file)
403    /// # }
404    /// # #[tokio::main(flavor="current_thread")]
405    /// # async fn main() -> parquet::errors::Result<()> {
406    /// # let t = write_file()?;
407    /// # let path = t.path();
408    /// // This example uses a tokio::fs::File as the async source, but it
409    /// // could be any async source such as an object store reader)
410    /// let mut file = tokio::fs::File::open(path).await?;
411    /// // To read Row Groups in parallel, create a separate stream builder for each Row Group.
412    /// // First get the metadata to find the row group information
413    /// let file_size = file.metadata().await?.len();
414    /// let metadata = ParquetMetaDataReader::new().load_and_finish(&mut file, file_size).await?;
415    /// assert_eq!(metadata.num_row_groups(), 10); // file has 10 row groups with 5 rows each
416    /// // Create a stream reader for each row group
417    /// let reader_metadata = ArrowReaderMetadata::try_new(
418    ///   Arc::new(metadata),
419    ///   ArrowReaderOptions::new()
420    /// )?;
421    /// let mut streams = vec![];
422    ///  for row_group_index in 0..10 {
423    ///   // Each stream needs its own source instance to issue
424    ///   // parallel IO requests, so clone the file for each stream
425    ///   let this_file = file.try_clone().await?;
426    ///   let stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
427    ///        this_file,
428    ///        reader_metadata.clone()
429    ///      )
430    ///      .with_row_groups(vec![row_group_index]) // read only this row group
431    ///      .build()?;
432    ///     streams.push(stream);
433    /// }
434    /// // Each reader can now be polled independently and in parallel, for
435    /// // example using StreamExt::buffered to read from 3 at a time
436    /// let results = futures::stream::iter(streams)
437    ///  .map(|stream| async move { stream })
438    ///  .buffered(3)
439    ///  .flatten()
440    ///  .try_collect::<Vec<_>>().await?;
441    /// // read all 50 rows (10 row groups x 5 rows per group)
442    /// assert_eq!(50, results.iter().map(|s| s.num_rows()).sum::<usize>());
443    /// # Ok(())
444    /// # }
445    /// ```
446    pub async fn new(input: T) -> Result<Self> {
447        Self::new_with_options(input, Default::default()).await
448    }
449
450    /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided async source
451    /// and [`ArrowReaderOptions`].
452    pub async fn new_with_options(mut input: T, options: ArrowReaderOptions) -> Result<Self> {
453        let metadata = ArrowReaderMetadata::load_async(&mut input, options).await?;
454        Ok(Self::new_with_metadata(input, metadata))
455    }
456
457    /// Create a [`ParquetRecordBatchStreamBuilder`] from the provided [`ArrowReaderMetadata`]
458    ///
459    /// This allows loading metadata once and using it to create multiple builders with
460    /// potentially different settings, that can be read in parallel.
461    ///
462    /// # Example of reading from multiple streams in parallel
463    ///
464    /// ```
465    /// # use std::fs::metadata;
466    /// # use std::sync::Arc;
467    /// # use bytes::Bytes;
468    /// # use arrow_array::{Int32Array, RecordBatch};
469    /// # use arrow_schema::{DataType, Field, Schema};
470    /// # use parquet::arrow::arrow_reader::ArrowReaderMetadata;
471    /// # use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder};
472    /// # use tempfile::tempfile;
473    /// # use futures::StreamExt;
474    /// # #[tokio::main(flavor="current_thread")]
475    /// # async fn main() {
476    /// #
477    /// # let mut file = tempfile().unwrap();
478    /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
479    /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
480    /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
481    /// # writer.write(&batch).unwrap();
482    /// # writer.close().unwrap();
483    /// // open file with parquet data
484    /// let mut file = tokio::fs::File::from_std(file);
485    /// // load metadata once
486    /// let meta = ArrowReaderMetadata::load_async(&mut file, Default::default()).await.unwrap();
487    /// // create two readers, a and b, from the same underlying file
488    /// // without reading the metadata again
489    /// let mut a = ParquetRecordBatchStreamBuilder::new_with_metadata(
490    ///     file.try_clone().await.unwrap(),
491    ///     meta.clone()
492    /// ).build().unwrap();
493    /// let mut b = ParquetRecordBatchStreamBuilder::new_with_metadata(file, meta).build().unwrap();
494    ///
495    /// // Can read batches from both readers in parallel
496    /// assert_eq!(
497    ///   a.next().await.unwrap().unwrap(),
498    ///   b.next().await.unwrap().unwrap(),
499    /// );
500    /// # }
501    /// ```
502    pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
503        Self::new_builder(AsyncReader(input), metadata)
504    }
505
506    /// Read bloom filter for a column in a row group
507    ///
508    /// Returns `None` if the column does not have a bloom filter
509    ///
510    /// We should call this function after other forms pruning, such as projection and predicate pushdown.
511    pub async fn get_row_group_column_bloom_filter(
512        &mut self,
513        row_group_idx: usize,
514        column_idx: usize,
515    ) -> Result<Option<Sbbf>> {
516        let metadata = self.metadata.row_group(row_group_idx);
517        let column_metadata = metadata.column(column_idx);
518
519        let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
520            offset
521                .try_into()
522                .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
523        } else {
524            return Ok(None);
525        };
526
527        let buffer = match column_metadata.bloom_filter_length() {
528            Some(length) => self.input.0.get_bytes(offset..offset + length as u64),
529            None => self
530                .input
531                .0
532                .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE as u64),
533        }
534        .await?;
535
536        let (header, bitset_offset) =
537            chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
538
539        match header.algorithm {
540            BloomFilterAlgorithm::BLOCK => {
541                // this match exists to future proof the singleton algorithm enum
542            }
543        }
544        match header.compression {
545            BloomFilterCompression::UNCOMPRESSED => {
546                // this match exists to future proof the singleton compression enum
547            }
548        }
549        match header.hash {
550            BloomFilterHash::XXHASH => {
551                // this match exists to future proof the singleton hash enum
552            }
553        }
554
555        let bitset = match column_metadata.bloom_filter_length() {
556            Some(_) => buffer.slice(
557                (TryInto::<usize>::try_into(bitset_offset).unwrap()
558                    - TryInto::<usize>::try_into(offset).unwrap())..,
559            ),
560            None => {
561                let bitset_length: u64 = header.num_bytes.try_into().map_err(|_| {
562                    ParquetError::General("Bloom filter length is invalid".to_string())
563                })?;
564                self.input
565                    .0
566                    .get_bytes(bitset_offset..bitset_offset + bitset_length)
567                    .await?
568            }
569        };
570        Ok(Some(Sbbf::new(&bitset)))
571    }
572
573    /// Build a new [`ParquetRecordBatchStream`]
574    ///
575    /// See examples on [`ParquetRecordBatchStreamBuilder::new`]
576    pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
577        let Self {
578            input,
579            metadata,
580            schema,
581            fields,
582            batch_size,
583            row_groups,
584            projection,
585            filter,
586            selection,
587            row_selection_policy: selection_strategy,
588            limit,
589            offset,
590            metrics,
591            max_predicate_cache_size,
592        } = self;
593
594        // Ensure schema of ParquetRecordBatchStream respects projection, and does
595        // not store metadata (same as for ParquetRecordBatchReader and emitted RecordBatches)
596        let projection_len = projection.mask.as_ref().map_or(usize::MAX, |m| m.len());
597        let projected_fields = schema
598            .fields
599            .filter_leaves(|idx, _| idx < projection_len && projection.leaf_included(idx));
600        let projected_schema = Arc::new(Schema::new(projected_fields));
601
602        let decoder = ParquetPushDecoderBuilder {
603            input: NoInput,
604            metadata,
605            schema,
606            fields,
607            projection,
608            filter,
609            selection,
610            row_selection_policy: selection_strategy,
611            batch_size,
612            row_groups,
613            limit,
614            offset,
615            metrics,
616            max_predicate_cache_size,
617        }
618        .build()?;
619
620        let request_state = RequestState::None { input: input.0 };
621
622        Ok(ParquetRecordBatchStream {
623            schema: projected_schema,
624            decoder,
625            request_state,
626        })
627    }
628}
629
630/// State machine that tracks outstanding requests to fetch data
631///
632/// The parameter `T` is the input, typically an `AsyncFileReader`
633enum RequestState<T> {
634    /// No outstanding requests
635    None {
636        input: T,
637    },
638    /// There is an outstanding request for data
639    Outstanding {
640        /// Ranges that have been requested
641        ranges: Vec<Range<u64>>,
642        /// Future that will resolve (input, requested_ranges)
643        ///
644        /// Note the future owns the reader while the request is outstanding
645        /// and returns it upon completion
646        future: BoxFuture<'static, Result<(T, Vec<Bytes>)>>,
647    },
648    Done,
649}
650
651impl<T> RequestState<T>
652where
653    T: AsyncFileReader + Unpin + Send + 'static,
654{
655    /// Issue a request to fetch `ranges`, returning the Outstanding state
656    fn begin_request(mut input: T, ranges: Vec<Range<u64>>) -> Self {
657        let ranges_captured = ranges.clone();
658
659        // Note this must move the input *into* the future
660        // because the get_byte_ranges future has a lifetime
661        // (aka can have references internally) and thus must
662        // own the input while the request is outstanding.
663        let future = async move {
664            let data = input.get_byte_ranges(ranges_captured).await?;
665            Ok((input, data))
666        }
667        .boxed();
668        RequestState::Outstanding { ranges, future }
669    }
670}
671
672impl<T> std::fmt::Debug for RequestState<T> {
673    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
674        match self {
675            RequestState::None { input: _ } => f
676                .debug_struct("RequestState::None")
677                .field("input", &"...")
678                .finish(),
679            RequestState::Outstanding { ranges, .. } => f
680                .debug_struct("RequestState::Outstanding")
681                .field("ranges", &ranges)
682                .finish(),
683            RequestState::Done => {
684                write!(f, "RequestState::Done")
685            }
686        }
687    }
688}
689
690/// An asynchronous [`Stream`]of [`RecordBatch`] constructed using [`ParquetRecordBatchStreamBuilder`] to read parquet files.
691///
692/// `ParquetRecordBatchStream` also provides [`ParquetRecordBatchStream::next_row_group`] for fetching row groups,
693/// allowing users to decode record batches separately from I/O.
694///
695/// # I/O Buffering
696///
697/// `ParquetRecordBatchStream` buffers *all* data pages selected after predicates
698/// (projection + filtering, etc) and decodes the rows from those buffered pages.
699///
700/// For example, if all rows and columns are selected, the entire row group is
701/// buffered in memory during decode. This minimizes the number of IO operations
702/// required, which is especially important for object stores, where IO operations
703/// have latencies in the hundreds of milliseconds
704///
705/// See [`ParquetPushDecoderBuilder`] for an API with lower level control over
706/// buffering.
707///
708/// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
709pub struct ParquetRecordBatchStream<T> {
710    /// Output schema of the stream
711    schema: SchemaRef,
712    /// Input and Outstanding IO request, if any
713    request_state: RequestState<T>,
714    /// Decoding state machine (no IO)
715    decoder: ParquetPushDecoder,
716}
717
718impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
719    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
720        f.debug_struct("ParquetRecordBatchStream")
721            .field("request_state", &self.request_state)
722            .finish()
723    }
724}
725
726impl<T> ParquetRecordBatchStream<T> {
727    /// Returns the projected [`SchemaRef`] for reading the parquet file.
728    ///
729    /// Note that the schema metadata will be stripped here. See
730    /// [`ParquetRecordBatchStreamBuilder::schema`] if the metadata is desired.
731    pub fn schema(&self) -> &SchemaRef {
732        &self.schema
733    }
734}
735
736impl<T> ParquetRecordBatchStream<T>
737where
738    T: AsyncFileReader + Unpin + Send + 'static,
739{
740    /// Fetches the next row group from the stream.
741    ///
742    /// Users can continue to call this function to get row groups and decode them concurrently.
743    ///
744    /// ## Notes
745    ///
746    /// ParquetRecordBatchStream should be used either as a `Stream` or with `next_row_group`; they should not be used simultaneously.
747    ///
748    /// ## Returns
749    ///
750    /// - `Ok(None)` if the stream has ended.
751    /// - `Err(error)` if the stream has errored. All subsequent calls will return `Ok(None)`.
752    /// - `Ok(Some(reader))` which holds all the data for the row group.
753    pub async fn next_row_group(&mut self) -> Result<Option<ParquetRecordBatchReader>> {
754        loop {
755            // Take ownership of request state to process, leaving self in a
756            // valid state
757            let request_state = std::mem::replace(&mut self.request_state, RequestState::Done);
758            match request_state {
759                // No outstanding requests, proceed to setup next row group
760                RequestState::None { input } => {
761                    match self.decoder.try_next_reader()? {
762                        DecodeResult::NeedsData(ranges) => {
763                            self.request_state = RequestState::begin_request(input, ranges);
764                            continue; // poll again (as the input might be ready immediately)
765                        }
766                        DecodeResult::Data(reader) => {
767                            self.request_state = RequestState::None { input };
768                            return Ok(Some(reader));
769                        }
770                        DecodeResult::Finished => return Ok(None),
771                    }
772                }
773                RequestState::Outstanding { ranges, future } => {
774                    let (input, data) = future.await?;
775                    // Push the requested data to the decoder and try again
776                    self.decoder.push_ranges(ranges, data)?;
777                    self.request_state = RequestState::None { input };
778                    continue; // try and decode on next iteration
779                }
780                RequestState::Done => {
781                    self.request_state = RequestState::Done;
782                    return Ok(None);
783                }
784            }
785        }
786    }
787}
788
789impl<T> Stream for ParquetRecordBatchStream<T>
790where
791    T: AsyncFileReader + Unpin + Send + 'static,
792{
793    type Item = Result<RecordBatch>;
794    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
795        match self.poll_next_inner(cx) {
796            Ok(res) => {
797                // Successfully decoded a batch, or reached end of stream.
798                // convert Option<RecordBatch> to Option<Result<RecordBatch>>
799                res.map(|res| Ok(res).transpose())
800            }
801            Err(e) => {
802                self.request_state = RequestState::Done;
803                Poll::Ready(Some(Err(e)))
804            }
805        }
806    }
807}
808
809impl<T> ParquetRecordBatchStream<T>
810where
811    T: AsyncFileReader + Unpin + Send + 'static,
812{
813    /// Inner state machine
814    ///
815    /// Note this is separate from poll_next so we can use ? operator to check for errors
816    /// as it returns `Result<Poll<Option<RecordBatch>>>`
817    fn poll_next_inner(&mut self, cx: &mut Context<'_>) -> Result<Poll<Option<RecordBatch>>> {
818        loop {
819            let request_state = std::mem::replace(&mut self.request_state, RequestState::Done);
820            match request_state {
821                RequestState::None { input } => {
822                    // No outstanding requests, proceed to decode the next batch
823                    match self.decoder.try_decode()? {
824                        DecodeResult::NeedsData(ranges) => {
825                            self.request_state = RequestState::begin_request(input, ranges);
826                            continue; // poll again (as the input might be ready immediately)
827                        }
828                        DecodeResult::Data(batch) => {
829                            self.request_state = RequestState::None { input };
830                            return Ok(Poll::Ready(Some(batch)));
831                        }
832                        DecodeResult::Finished => {
833                            self.request_state = RequestState::Done;
834                            return Ok(Poll::Ready(None));
835                        }
836                    }
837                }
838                RequestState::Outstanding { ranges, mut future } => match future.poll_unpin(cx) {
839                    // Data was ready, push it to the decoder and continue
840                    Poll::Ready(result) => {
841                        let (input, data) = result?;
842                        // Push the requested data to the decoder
843                        self.decoder.push_ranges(ranges, data)?;
844                        self.request_state = RequestState::None { input };
845                        continue; // next iteration will try to decode the next batch
846                    }
847                    Poll::Pending => {
848                        self.request_state = RequestState::Outstanding { ranges, future };
849                        return Ok(Poll::Pending);
850                    }
851                },
852                RequestState::Done => {
853                    // Stream is done (error or end), return None
854                    self.request_state = RequestState::Done;
855                    return Ok(Poll::Ready(None));
856                }
857            }
858        }
859    }
860}
861
862#[cfg(test)]
863mod tests {
864    use super::*;
865    use crate::arrow::arrow_reader::tests::test_row_numbers_with_multiple_row_groups_helper;
866    use crate::arrow::arrow_reader::{
867        ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
868    };
869    use crate::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
870    use crate::arrow::schema::virtual_type::RowNumber;
871    use crate::arrow::{ArrowWriter, AsyncArrowWriter, ProjectionMask};
872    use crate::file::metadata::PageIndexPolicy;
873    use crate::file::metadata::ParquetMetaDataReader;
874    use crate::file::properties::WriterProperties;
875    use arrow::compute::kernels::cmp::eq;
876    use arrow::error::Result as ArrowResult;
877    use arrow_array::builder::{Float32Builder, ListBuilder, StringBuilder};
878    use arrow_array::cast::AsArray;
879    use arrow_array::types::Int32Type;
880    use arrow_array::{
881        Array, ArrayRef, BooleanArray, Int32Array, RecordBatchReader, Scalar, StringArray,
882        StructArray, UInt64Array,
883    };
884    use arrow_schema::{DataType, Field, Schema};
885    use futures::{StreamExt, TryStreamExt};
886    use rand::{Rng, rng};
887    use std::collections::HashMap;
888    use std::sync::{Arc, Mutex};
889    use tempfile::tempfile;
890
891    #[derive(Clone)]
892    struct TestReader {
893        data: Bytes,
894        metadata: Option<Arc<ParquetMetaData>>,
895        requests: Arc<Mutex<Vec<Range<usize>>>>,
896    }
897
898    impl TestReader {
899        fn new(data: Bytes) -> Self {
900            Self {
901                data,
902                metadata: Default::default(),
903                requests: Default::default(),
904            }
905        }
906    }
907
908    impl AsyncFileReader for TestReader {
909        fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
910            let range = range.clone();
911            self.requests
912                .lock()
913                .unwrap()
914                .push(range.start as usize..range.end as usize);
915            futures::future::ready(Ok(self
916                .data
917                .slice(range.start as usize..range.end as usize)))
918            .boxed()
919        }
920
921        fn get_metadata<'a>(
922            &'a mut self,
923            options: Option<&'a ArrowReaderOptions>,
924        ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
925            let mut metadata_reader = ParquetMetaDataReader::new();
926            if let Some(opts) = options {
927                metadata_reader = metadata_reader
928                    .with_column_index_policy(opts.column_index_policy())
929                    .with_offset_index_policy(opts.offset_index_policy());
930            }
931            self.metadata = Some(Arc::new(
932                metadata_reader.parse_and_finish(&self.data).unwrap(),
933            ));
934            futures::future::ready(Ok(self.metadata.clone().unwrap().clone())).boxed()
935        }
936    }
937
938    #[tokio::test]
939    async fn test_async_reader() {
940        let testdata = arrow::util::test_util::parquet_test_data();
941        let path = format!("{testdata}/alltypes_plain.parquet");
942        let data = Bytes::from(std::fs::read(path).unwrap());
943
944        let async_reader = TestReader::new(data.clone());
945
946        let requests = async_reader.requests.clone();
947        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
948            .await
949            .unwrap();
950
951        let metadata = builder.metadata().clone();
952        assert_eq!(metadata.num_row_groups(), 1);
953
954        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
955        let stream = builder
956            .with_projection(mask.clone())
957            .with_batch_size(1024)
958            .build()
959            .unwrap();
960
961        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
962
963        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
964            .unwrap()
965            .with_projection(mask)
966            .with_batch_size(104)
967            .build()
968            .unwrap()
969            .collect::<ArrowResult<Vec<_>>>()
970            .unwrap();
971
972        assert_eq!(async_batches, sync_batches);
973
974        let requests = requests.lock().unwrap();
975        let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
976        let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
977
978        assert_eq!(
979            &requests[..],
980            &[
981                offset_1 as usize..(offset_1 + length_1) as usize,
982                offset_2 as usize..(offset_2 + length_2) as usize
983            ]
984        );
985    }
986
987    #[tokio::test]
988    async fn test_async_reader_with_next_row_group() {
989        let testdata = arrow::util::test_util::parquet_test_data();
990        let path = format!("{testdata}/alltypes_plain.parquet");
991        let data = Bytes::from(std::fs::read(path).unwrap());
992
993        let async_reader = TestReader::new(data.clone());
994
995        let requests = async_reader.requests.clone();
996        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
997            .await
998            .unwrap();
999
1000        let metadata = builder.metadata().clone();
1001        assert_eq!(metadata.num_row_groups(), 1);
1002
1003        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1004        let mut stream = builder
1005            .with_projection(mask.clone())
1006            .with_batch_size(1024)
1007            .build()
1008            .unwrap();
1009
1010        let mut readers = vec![];
1011        while let Some(reader) = stream.next_row_group().await.unwrap() {
1012            readers.push(reader);
1013        }
1014
1015        let async_batches: Vec<_> = readers
1016            .into_iter()
1017            .flat_map(|r| r.map(|v| v.unwrap()).collect::<Vec<_>>())
1018            .collect();
1019
1020        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1021            .unwrap()
1022            .with_projection(mask)
1023            .with_batch_size(104)
1024            .build()
1025            .unwrap()
1026            .collect::<ArrowResult<Vec<_>>>()
1027            .unwrap();
1028
1029        assert_eq!(async_batches, sync_batches);
1030
1031        let requests = requests.lock().unwrap();
1032        let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
1033        let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
1034
1035        assert_eq!(
1036            &requests[..],
1037            &[
1038                offset_1 as usize..(offset_1 + length_1) as usize,
1039                offset_2 as usize..(offset_2 + length_2) as usize
1040            ]
1041        );
1042    }
1043
1044    #[tokio::test]
1045    async fn test_async_reader_with_index() {
1046        let testdata = arrow::util::test_util::parquet_test_data();
1047        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1048        let data = Bytes::from(std::fs::read(path).unwrap());
1049
1050        let async_reader = TestReader::new(data.clone());
1051
1052        let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1053        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1054            .await
1055            .unwrap();
1056
1057        // The builder should have page and offset indexes loaded now
1058        let metadata_with_index = builder.metadata();
1059        assert_eq!(metadata_with_index.num_row_groups(), 1);
1060
1061        // Check offset indexes are present for all columns
1062        let offset_index = metadata_with_index.offset_index().unwrap();
1063        let column_index = metadata_with_index.column_index().unwrap();
1064
1065        assert_eq!(offset_index.len(), metadata_with_index.num_row_groups());
1066        assert_eq!(column_index.len(), metadata_with_index.num_row_groups());
1067
1068        let num_columns = metadata_with_index
1069            .file_metadata()
1070            .schema_descr()
1071            .num_columns();
1072
1073        // Check page indexes are present for all columns
1074        offset_index
1075            .iter()
1076            .for_each(|x| assert_eq!(x.len(), num_columns));
1077        column_index
1078            .iter()
1079            .for_each(|x| assert_eq!(x.len(), num_columns));
1080
1081        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1082        let stream = builder
1083            .with_projection(mask.clone())
1084            .with_batch_size(1024)
1085            .build()
1086            .unwrap();
1087
1088        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1089
1090        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1091            .unwrap()
1092            .with_projection(mask)
1093            .with_batch_size(1024)
1094            .build()
1095            .unwrap()
1096            .collect::<ArrowResult<Vec<_>>>()
1097            .unwrap();
1098
1099        assert_eq!(async_batches, sync_batches);
1100    }
1101
1102    #[tokio::test]
1103    async fn test_async_reader_with_limit() {
1104        let testdata = arrow::util::test_util::parquet_test_data();
1105        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1106        let data = Bytes::from(std::fs::read(path).unwrap());
1107
1108        let metadata = ParquetMetaDataReader::new()
1109            .parse_and_finish(&data)
1110            .unwrap();
1111        let metadata = Arc::new(metadata);
1112
1113        assert_eq!(metadata.num_row_groups(), 1);
1114
1115        let async_reader = TestReader::new(data.clone());
1116
1117        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1118            .await
1119            .unwrap();
1120
1121        assert_eq!(builder.metadata().num_row_groups(), 1);
1122
1123        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
1124        let stream = builder
1125            .with_projection(mask.clone())
1126            .with_batch_size(1024)
1127            .with_limit(1)
1128            .build()
1129            .unwrap();
1130
1131        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1132
1133        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1134            .unwrap()
1135            .with_projection(mask)
1136            .with_batch_size(1024)
1137            .with_limit(1)
1138            .build()
1139            .unwrap()
1140            .collect::<ArrowResult<Vec<_>>>()
1141            .unwrap();
1142
1143        assert_eq!(async_batches, sync_batches);
1144    }
1145
1146    #[tokio::test]
1147    async fn test_async_reader_skip_pages() {
1148        let testdata = arrow::util::test_util::parquet_test_data();
1149        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1150        let data = Bytes::from(std::fs::read(path).unwrap());
1151
1152        let async_reader = TestReader::new(data.clone());
1153
1154        let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1155        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1156            .await
1157            .unwrap();
1158
1159        assert_eq!(builder.metadata().num_row_groups(), 1);
1160
1161        let selection = RowSelection::from(vec![
1162            RowSelector::skip(21),   // Skip first page
1163            RowSelector::select(21), // Select page to boundary
1164            RowSelector::skip(41),   // Skip multiple pages
1165            RowSelector::select(41), // Select multiple pages
1166            RowSelector::skip(25),   // Skip page across boundary
1167            RowSelector::select(25), // Select across page boundary
1168            RowSelector::skip(7116), // Skip to final page boundary
1169            RowSelector::select(10), // Select final page
1170        ]);
1171
1172        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![9]);
1173
1174        let stream = builder
1175            .with_projection(mask.clone())
1176            .with_row_selection(selection.clone())
1177            .build()
1178            .expect("building stream");
1179
1180        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1181
1182        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
1183            .unwrap()
1184            .with_projection(mask)
1185            .with_batch_size(1024)
1186            .with_row_selection(selection)
1187            .build()
1188            .unwrap()
1189            .collect::<ArrowResult<Vec<_>>>()
1190            .unwrap();
1191
1192        assert_eq!(async_batches, sync_batches);
1193    }
1194
1195    #[tokio::test]
1196    async fn test_fuzz_async_reader_selection() {
1197        let testdata = arrow::util::test_util::parquet_test_data();
1198        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1199        let data = Bytes::from(std::fs::read(path).unwrap());
1200
1201        let mut rand = rng();
1202
1203        for _ in 0..100 {
1204            let mut expected_rows = 0;
1205            let mut total_rows = 0;
1206            let mut skip = false;
1207            let mut selectors = vec![];
1208
1209            while total_rows < 7300 {
1210                let row_count: usize = rand.random_range(1..100);
1211
1212                let row_count = row_count.min(7300 - total_rows);
1213
1214                selectors.push(RowSelector { row_count, skip });
1215
1216                total_rows += row_count;
1217                if !skip {
1218                    expected_rows += row_count;
1219                }
1220
1221                skip = !skip;
1222            }
1223
1224            let selection = RowSelection::from(selectors);
1225
1226            let async_reader = TestReader::new(data.clone());
1227
1228            let options =
1229                ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1230            let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1231                .await
1232                .unwrap();
1233
1234            assert_eq!(builder.metadata().num_row_groups(), 1);
1235
1236            let col_idx: usize = rand.random_range(0..13);
1237            let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1238
1239            let stream = builder
1240                .with_projection(mask.clone())
1241                .with_row_selection(selection.clone())
1242                .build()
1243                .expect("building stream");
1244
1245            let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1246
1247            let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1248
1249            assert_eq!(actual_rows, expected_rows);
1250        }
1251    }
1252
1253    #[tokio::test]
1254    async fn test_async_reader_zero_row_selector() {
1255        //See https://github.com/apache/arrow-rs/issues/2669
1256        let testdata = arrow::util::test_util::parquet_test_data();
1257        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1258        let data = Bytes::from(std::fs::read(path).unwrap());
1259
1260        let mut rand = rng();
1261
1262        let mut expected_rows = 0;
1263        let mut total_rows = 0;
1264        let mut skip = false;
1265        let mut selectors = vec![];
1266
1267        selectors.push(RowSelector {
1268            row_count: 0,
1269            skip: false,
1270        });
1271
1272        while total_rows < 7300 {
1273            let row_count: usize = rand.random_range(1..100);
1274
1275            let row_count = row_count.min(7300 - total_rows);
1276
1277            selectors.push(RowSelector { row_count, skip });
1278
1279            total_rows += row_count;
1280            if !skip {
1281                expected_rows += row_count;
1282            }
1283
1284            skip = !skip;
1285        }
1286
1287        let selection = RowSelection::from(selectors);
1288
1289        let async_reader = TestReader::new(data.clone());
1290
1291        let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1292        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1293            .await
1294            .unwrap();
1295
1296        assert_eq!(builder.metadata().num_row_groups(), 1);
1297
1298        let col_idx: usize = rand.random_range(0..13);
1299        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]);
1300
1301        let stream = builder
1302            .with_projection(mask.clone())
1303            .with_row_selection(selection.clone())
1304            .build()
1305            .expect("building stream");
1306
1307        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
1308
1309        let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum();
1310
1311        assert_eq!(actual_rows, expected_rows);
1312    }
1313
1314    #[tokio::test]
1315    async fn test_limit_multiple_row_groups() {
1316        let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1317        let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1318        let c = Int32Array::from_iter(0..6);
1319        let data = RecordBatch::try_from_iter([
1320            ("a", Arc::new(a) as ArrayRef),
1321            ("b", Arc::new(b) as ArrayRef),
1322            ("c", Arc::new(c) as ArrayRef),
1323        ])
1324        .unwrap();
1325
1326        let mut buf = Vec::with_capacity(1024);
1327        let props = WriterProperties::builder()
1328            .set_max_row_group_row_count(Some(3))
1329            .build();
1330        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
1331        writer.write(&data).unwrap();
1332        writer.close().unwrap();
1333
1334        let data: Bytes = buf.into();
1335        let metadata = ParquetMetaDataReader::new()
1336            .parse_and_finish(&data)
1337            .unwrap();
1338
1339        assert_eq!(metadata.num_row_groups(), 2);
1340
1341        let test = TestReader::new(data);
1342
1343        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1344            .await
1345            .unwrap()
1346            .with_batch_size(1024)
1347            .with_limit(4)
1348            .build()
1349            .unwrap();
1350
1351        let batches: Vec<_> = stream.try_collect().await.unwrap();
1352        // Expect one batch for each row group
1353        assert_eq!(batches.len(), 2);
1354
1355        let batch = &batches[0];
1356        // First batch should contain all rows
1357        assert_eq!(batch.num_rows(), 3);
1358        assert_eq!(batch.num_columns(), 3);
1359        let col2 = batch.column(2).as_primitive::<Int32Type>();
1360        assert_eq!(col2.values(), &[0, 1, 2]);
1361
1362        let batch = &batches[1];
1363        // Second batch should trigger the limit and only have one row
1364        assert_eq!(batch.num_rows(), 1);
1365        assert_eq!(batch.num_columns(), 3);
1366        let col2 = batch.column(2).as_primitive::<Int32Type>();
1367        assert_eq!(col2.values(), &[3]);
1368
1369        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1370            .await
1371            .unwrap()
1372            .with_offset(2)
1373            .with_limit(3)
1374            .build()
1375            .unwrap();
1376
1377        let batches: Vec<_> = stream.try_collect().await.unwrap();
1378        // Expect one batch for each row group
1379        assert_eq!(batches.len(), 2);
1380
1381        let batch = &batches[0];
1382        // First batch should contain one row
1383        assert_eq!(batch.num_rows(), 1);
1384        assert_eq!(batch.num_columns(), 3);
1385        let col2 = batch.column(2).as_primitive::<Int32Type>();
1386        assert_eq!(col2.values(), &[2]);
1387
1388        let batch = &batches[1];
1389        // Second batch should contain two rows
1390        assert_eq!(batch.num_rows(), 2);
1391        assert_eq!(batch.num_columns(), 3);
1392        let col2 = batch.column(2).as_primitive::<Int32Type>();
1393        assert_eq!(col2.values(), &[3, 4]);
1394
1395        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
1396            .await
1397            .unwrap()
1398            .with_offset(4)
1399            .with_limit(20)
1400            .build()
1401            .unwrap();
1402
1403        let batches: Vec<_> = stream.try_collect().await.unwrap();
1404        // Should skip first row group
1405        assert_eq!(batches.len(), 1);
1406
1407        let batch = &batches[0];
1408        // First batch should contain two rows
1409        assert_eq!(batch.num_rows(), 2);
1410        assert_eq!(batch.num_columns(), 3);
1411        let col2 = batch.column(2).as_primitive::<Int32Type>();
1412        assert_eq!(col2.values(), &[4, 5]);
1413    }
1414
1415    #[tokio::test]
1416    async fn test_batch_size_overallocate() {
1417        let testdata = arrow::util::test_util::parquet_test_data();
1418        // `alltypes_plain.parquet` only have 8 rows
1419        let path = format!("{testdata}/alltypes_plain.parquet");
1420        let data = Bytes::from(std::fs::read(path).unwrap());
1421
1422        let async_reader = TestReader::new(data.clone());
1423
1424        let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
1425            .await
1426            .unwrap();
1427
1428        let file_rows = builder.metadata().file_metadata().num_rows() as usize;
1429
1430        let builder = builder
1431            .with_projection(ProjectionMask::all())
1432            .with_batch_size(1024);
1433
1434        // even though the batch size is set to 1024, it should adjust to the max
1435        // number of rows in the file (8)
1436        assert_ne!(1024, file_rows);
1437        assert_eq!(builder.batch_size, file_rows);
1438
1439        let _stream = builder.build().unwrap();
1440    }
1441
1442    #[tokio::test]
1443    async fn test_parquet_record_batch_stream_schema() {
1444        fn get_all_field_names(schema: &Schema) -> Vec<&String> {
1445            schema.flattened_fields().iter().map(|f| f.name()).collect()
1446        }
1447
1448        // ParquetRecordBatchReaderBuilder::schema differs from
1449        // ParquetRecordBatchReader::schema and RecordBatch::schema in the returned
1450        // schema contents (in terms of custom metadata attached to schema, and fields
1451        // returned). Test to ensure this remains consistent behaviour.
1452        //
1453        // Ensure same for asynchronous versions of the above.
1454
1455        // Prep data, for a schema with nested fields, with custom metadata
1456        let mut metadata = HashMap::with_capacity(1);
1457        metadata.insert("key".to_string(), "value".to_string());
1458
1459        let nested_struct_array = StructArray::from(vec![
1460            (
1461                Arc::new(Field::new("d", DataType::Utf8, true)),
1462                Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
1463            ),
1464            (
1465                Arc::new(Field::new("e", DataType::Utf8, true)),
1466                Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef,
1467            ),
1468        ]);
1469        let struct_array = StructArray::from(vec![
1470            (
1471                Arc::new(Field::new("a", DataType::Int32, true)),
1472                Arc::new(Int32Array::from(vec![-1, 1])) as ArrayRef,
1473            ),
1474            (
1475                Arc::new(Field::new("b", DataType::UInt64, true)),
1476                Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef,
1477            ),
1478            (
1479                Arc::new(Field::new(
1480                    "c",
1481                    nested_struct_array.data_type().clone(),
1482                    true,
1483                )),
1484                Arc::new(nested_struct_array) as ArrayRef,
1485            ),
1486        ]);
1487
1488        let schema =
1489            Arc::new(Schema::new(struct_array.fields().clone()).with_metadata(metadata.clone()));
1490        let record_batch = RecordBatch::from(struct_array)
1491            .with_schema(schema.clone())
1492            .unwrap();
1493
1494        // Write parquet with custom metadata in schema
1495        let mut file = tempfile().unwrap();
1496        let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
1497        writer.write(&record_batch).unwrap();
1498        writer.close().unwrap();
1499
1500        let all_fields = ["a", "b", "c", "d", "e"];
1501        // (leaf indices in mask, expected names in output schema all fields)
1502        let projections = [
1503            (vec![], vec![]),
1504            (vec![0], vec!["a"]),
1505            (vec![0, 1], vec!["a", "b"]),
1506            (vec![0, 1, 2], vec!["a", "b", "c", "d"]),
1507            (vec![0, 1, 2, 3], vec!["a", "b", "c", "d", "e"]),
1508        ];
1509
1510        // Ensure we're consistent for each of these projections
1511        for (indices, expected_projected_names) in projections {
1512            let assert_schemas = |builder: SchemaRef, reader: SchemaRef, batch: SchemaRef| {
1513                // Builder schema should preserve all fields and metadata
1514                assert_eq!(get_all_field_names(&builder), all_fields);
1515                assert_eq!(builder.metadata, metadata);
1516                // Reader & batch schema should show only projected fields, and no metadata
1517                assert_eq!(get_all_field_names(&reader), expected_projected_names);
1518                assert_eq!(reader.metadata, HashMap::default());
1519                assert_eq!(get_all_field_names(&batch), expected_projected_names);
1520                assert_eq!(batch.metadata, HashMap::default());
1521            };
1522
1523            let builder =
1524                ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
1525            let sync_builder_schema = builder.schema().clone();
1526            let mask = ProjectionMask::leaves(builder.parquet_schema(), indices.clone());
1527            let mut reader = builder.with_projection(mask).build().unwrap();
1528            let sync_reader_schema = reader.schema();
1529            let batch = reader.next().unwrap().unwrap();
1530            let sync_batch_schema = batch.schema();
1531            assert_schemas(sync_builder_schema, sync_reader_schema, sync_batch_schema);
1532
1533            // asynchronous should be same
1534            let file = tokio::fs::File::from(file.try_clone().unwrap());
1535            let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap();
1536            let async_builder_schema = builder.schema().clone();
1537            let mask = ProjectionMask::leaves(builder.parquet_schema(), indices);
1538            let mut reader = builder.with_projection(mask).build().unwrap();
1539            let async_reader_schema = reader.schema().clone();
1540            let batch = reader.next().await.unwrap().unwrap();
1541            let async_batch_schema = batch.schema();
1542            assert_schemas(
1543                async_builder_schema,
1544                async_reader_schema,
1545                async_batch_schema,
1546            );
1547        }
1548    }
1549
1550    #[tokio::test]
1551    async fn test_nested_skip() {
1552        let schema = Arc::new(Schema::new(vec![
1553            Field::new("col_1", DataType::UInt64, false),
1554            Field::new_list("col_2", Field::new_list_field(DataType::Utf8, true), true),
1555        ]));
1556
1557        // Default writer properties
1558        let props = WriterProperties::builder()
1559            .set_data_page_row_count_limit(256)
1560            .set_write_batch_size(256)
1561            .set_max_row_group_row_count(Some(1024));
1562
1563        // Write data
1564        let mut file = tempfile().unwrap();
1565        let mut writer =
1566            ArrowWriter::try_new(&mut file, schema.clone(), Some(props.build())).unwrap();
1567
1568        let mut builder = ListBuilder::new(StringBuilder::new());
1569        for id in 0..1024 {
1570            match id % 3 {
1571                0 => builder.append_value([Some("val_1".to_string()), Some(format!("id_{id}"))]),
1572                1 => builder.append_value([Some(format!("id_{id}"))]),
1573                _ => builder.append_null(),
1574            }
1575        }
1576        let refs = vec![
1577            Arc::new(UInt64Array::from_iter_values(0..1024)) as ArrayRef,
1578            Arc::new(builder.finish()) as ArrayRef,
1579        ];
1580
1581        let batch = RecordBatch::try_new(schema.clone(), refs).unwrap();
1582        writer.write(&batch).unwrap();
1583        writer.close().unwrap();
1584
1585        let selections = [
1586            RowSelection::from(vec![
1587                RowSelector::skip(313),
1588                RowSelector::select(1),
1589                RowSelector::skip(709),
1590                RowSelector::select(1),
1591            ]),
1592            RowSelection::from(vec![
1593                RowSelector::skip(255),
1594                RowSelector::select(1),
1595                RowSelector::skip(767),
1596                RowSelector::select(1),
1597            ]),
1598            RowSelection::from(vec![
1599                RowSelector::select(255),
1600                RowSelector::skip(1),
1601                RowSelector::select(767),
1602                RowSelector::skip(1),
1603            ]),
1604            RowSelection::from(vec![
1605                RowSelector::skip(254),
1606                RowSelector::select(1),
1607                RowSelector::select(1),
1608                RowSelector::skip(767),
1609                RowSelector::select(1),
1610            ]),
1611        ];
1612
1613        for selection in selections {
1614            let expected = selection.row_count();
1615            // Read data
1616            let mut reader = ParquetRecordBatchStreamBuilder::new_with_options(
1617                tokio::fs::File::from_std(file.try_clone().unwrap()),
1618                ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
1619            )
1620            .await
1621            .unwrap();
1622
1623            reader = reader.with_row_selection(selection);
1624
1625            let mut stream = reader.build().unwrap();
1626
1627            let mut total_rows = 0;
1628            while let Some(rb) = stream.next().await {
1629                let rb = rb.unwrap();
1630                total_rows += rb.num_rows();
1631            }
1632            assert_eq!(total_rows, expected);
1633        }
1634    }
1635
1636    #[tokio::test]
1637    #[allow(deprecated)]
1638    async fn empty_offset_index_doesnt_panic_in_read_row_group() {
1639        use tokio::fs::File;
1640        let testdata = arrow::util::test_util::parquet_test_data();
1641        let path = format!("{testdata}/alltypes_plain.parquet");
1642        let mut file = File::open(&path).await.unwrap();
1643        let file_size = file.metadata().await.unwrap().len();
1644        let mut metadata = ParquetMetaDataReader::new()
1645            .with_page_indexes(true)
1646            .load_and_finish(&mut file, file_size)
1647            .await
1648            .unwrap();
1649
1650        metadata.set_offset_index(Some(vec![]));
1651        let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1652        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
1653        let reader =
1654            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
1655                .build()
1656                .unwrap();
1657
1658        let result = reader.try_collect::<Vec<_>>().await.unwrap();
1659        assert_eq!(result.len(), 1);
1660    }
1661
1662    #[tokio::test]
1663    #[allow(deprecated)]
1664    async fn non_empty_offset_index_doesnt_panic_in_read_row_group() {
1665        use tokio::fs::File;
1666        let testdata = arrow::util::test_util::parquet_test_data();
1667        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1668        let mut file = File::open(&path).await.unwrap();
1669        let file_size = file.metadata().await.unwrap().len();
1670        let metadata = ParquetMetaDataReader::new()
1671            .with_page_indexes(true)
1672            .load_and_finish(&mut file, file_size)
1673            .await
1674            .unwrap();
1675
1676        let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1677        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
1678        let reader =
1679            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
1680                .build()
1681                .unwrap();
1682
1683        let result = reader.try_collect::<Vec<_>>().await.unwrap();
1684        assert_eq!(result.len(), 8);
1685    }
1686
1687    #[tokio::test]
1688    #[allow(deprecated)]
1689    async fn empty_offset_index_doesnt_panic_in_column_chunks() {
1690        use tempfile::TempDir;
1691        use tokio::fs::File;
1692        fn write_metadata_to_local_file(
1693            metadata: ParquetMetaData,
1694            file: impl AsRef<std::path::Path>,
1695        ) {
1696            use crate::file::metadata::ParquetMetaDataWriter;
1697            use std::fs::File;
1698            let file = File::create(file).unwrap();
1699            ParquetMetaDataWriter::new(file, &metadata)
1700                .finish()
1701                .unwrap()
1702        }
1703
1704        fn read_metadata_from_local_file(file: impl AsRef<std::path::Path>) -> ParquetMetaData {
1705            use std::fs::File;
1706            let file = File::open(file).unwrap();
1707            ParquetMetaDataReader::new()
1708                .with_page_indexes(true)
1709                .parse_and_finish(&file)
1710                .unwrap()
1711        }
1712
1713        let testdata = arrow::util::test_util::parquet_test_data();
1714        let path = format!("{testdata}/alltypes_plain.parquet");
1715        let mut file = File::open(&path).await.unwrap();
1716        let file_size = file.metadata().await.unwrap().len();
1717        let metadata = ParquetMetaDataReader::new()
1718            .with_page_indexes(true)
1719            .load_and_finish(&mut file, file_size)
1720            .await
1721            .unwrap();
1722
1723        let tempdir = TempDir::new().unwrap();
1724        let metadata_path = tempdir.path().join("thrift_metadata.dat");
1725        write_metadata_to_local_file(metadata, &metadata_path);
1726        let metadata = read_metadata_from_local_file(&metadata_path);
1727
1728        let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1729        let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
1730        let reader =
1731            ParquetRecordBatchStreamBuilder::new_with_metadata(file, arrow_reader_metadata)
1732                .build()
1733                .unwrap();
1734
1735        // Panics here
1736        let result = reader.try_collect::<Vec<_>>().await.unwrap();
1737        assert_eq!(result.len(), 1);
1738    }
1739
1740    #[tokio::test]
1741    async fn test_cached_array_reader_sparse_offset_error() {
1742        use futures::TryStreamExt;
1743
1744        use crate::arrow::arrow_reader::{ArrowPredicateFn, RowFilter, RowSelection, RowSelector};
1745        use arrow_array::{BooleanArray, RecordBatch};
1746
1747        let testdata = arrow::util::test_util::parquet_test_data();
1748        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1749        let data = Bytes::from(std::fs::read(path).unwrap());
1750
1751        let async_reader = TestReader::new(data);
1752
1753        // Enable page index so the fetch logic loads only required pages
1754        let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1755        let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
1756            .await
1757            .unwrap();
1758
1759        // Skip the first 22 rows (entire first Parquet page) and then select the
1760        // next 3 rows (22, 23, 24). This means the fetch step will not include
1761        // the first page starting at file offset 0.
1762        let selection = RowSelection::from(vec![RowSelector::skip(22), RowSelector::select(3)]);
1763
1764        // Trivial predicate on column 0 that always returns `true`. Using the
1765        // same column in both predicate and projection activates the caching
1766        // layer (Producer/Consumer pattern).
1767        let parquet_schema = builder.parquet_schema();
1768        let proj = ProjectionMask::leaves(parquet_schema, vec![0]);
1769        let always_true = ArrowPredicateFn::new(proj.clone(), |batch: RecordBatch| {
1770            Ok(BooleanArray::from(vec![true; batch.num_rows()]))
1771        });
1772        let filter = RowFilter::new(vec![Box::new(always_true)]);
1773
1774        // Build the stream with batch size 8 so the cache reads whole batches
1775        // that straddle the requested row range (rows 0-7, 8-15, 16-23, …).
1776        let stream = builder
1777            .with_batch_size(8)
1778            .with_projection(proj)
1779            .with_row_selection(selection)
1780            .with_row_filter(filter)
1781            .build()
1782            .unwrap();
1783
1784        // Collecting the stream should fail with the sparse column chunk offset
1785        // error we want to reproduce.
1786        let _result: Vec<_> = stream.try_collect().await.unwrap();
1787    }
1788
1789    #[tokio::test]
1790    async fn test_predicate_cache_disabled() {
1791        let k = Int32Array::from_iter_values(0..10);
1792        let data = RecordBatch::try_from_iter([("k", Arc::new(k) as ArrayRef)]).unwrap();
1793
1794        let mut buf = Vec::new();
1795        // both the page row limit and batch size are set to 1 to create one page per row
1796        let props = WriterProperties::builder()
1797            .set_data_page_row_count_limit(1)
1798            .set_write_batch_size(1)
1799            .set_max_row_group_row_count(Some(10))
1800            .set_write_page_header_statistics(true)
1801            .build();
1802        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap();
1803        writer.write(&data).unwrap();
1804        writer.close().unwrap();
1805
1806        let data = Bytes::from(buf);
1807        let metadata = ParquetMetaDataReader::new()
1808            .with_page_index_policy(PageIndexPolicy::Required)
1809            .parse_and_finish(&data)
1810            .unwrap();
1811        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1812
1813        // the filter is not clone-able, so we use a lambda to simplify
1814        let build_filter = || {
1815            let scalar = Int32Array::from_iter_values([5]);
1816            let predicate = ArrowPredicateFn::new(
1817                ProjectionMask::leaves(&parquet_schema, vec![0]),
1818                move |batch| eq(batch.column(0), &Scalar::new(&scalar)),
1819            );
1820            RowFilter::new(vec![Box::new(predicate)])
1821        };
1822
1823        // select only one of the pages
1824        let selection = RowSelection::from(vec![RowSelector::skip(5), RowSelector::select(1)]);
1825
1826        let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
1827        let reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
1828
1829        // using the predicate cache (default)
1830        let reader_with_cache = TestReader::new(data.clone());
1831        let requests_with_cache = reader_with_cache.requests.clone();
1832        let stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
1833            reader_with_cache,
1834            reader_metadata.clone(),
1835        )
1836        .with_batch_size(1000)
1837        .with_row_selection(selection.clone())
1838        .with_row_filter(build_filter())
1839        .build()
1840        .unwrap();
1841        let batches_with_cache: Vec<_> = stream.try_collect().await.unwrap();
1842
1843        // disabling the predicate cache
1844        let reader_without_cache = TestReader::new(data);
1845        let requests_without_cache = reader_without_cache.requests.clone();
1846        let stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
1847            reader_without_cache,
1848            reader_metadata,
1849        )
1850        .with_batch_size(1000)
1851        .with_row_selection(selection)
1852        .with_row_filter(build_filter())
1853        .with_max_predicate_cache_size(0) // disabling it by setting the limit to 0
1854        .build()
1855        .unwrap();
1856        let batches_without_cache: Vec<_> = stream.try_collect().await.unwrap();
1857
1858        assert_eq!(batches_with_cache, batches_without_cache);
1859
1860        let requests_with_cache = requests_with_cache.lock().unwrap();
1861        let requests_without_cache = requests_without_cache.lock().unwrap();
1862
1863        // less requests will be made without the predicate cache
1864        assert_eq!(requests_with_cache.len(), 11);
1865        assert_eq!(requests_without_cache.len(), 2);
1866
1867        // less bytes will be retrieved without the predicate cache
1868        assert_eq!(
1869            requests_with_cache.iter().map(|r| r.len()).sum::<usize>(),
1870            433
1871        );
1872        assert_eq!(
1873            requests_without_cache
1874                .iter()
1875                .map(|r| r.len())
1876                .sum::<usize>(),
1877            92
1878        );
1879    }
1880
1881    #[test]
1882    fn test_row_numbers_with_multiple_row_groups() {
1883        test_row_numbers_with_multiple_row_groups_helper(
1884            false,
1885            |path, selection, _row_filter, batch_size| {
1886                let runtime = tokio::runtime::Builder::new_current_thread()
1887                    .enable_all()
1888                    .build()
1889                    .expect("Could not create runtime");
1890                runtime.block_on(async move {
1891                    let file = tokio::fs::File::open(path).await.unwrap();
1892                    let row_number_field = Arc::new(
1893                        Field::new("row_number", DataType::Int64, false)
1894                            .with_extension_type(RowNumber),
1895                    );
1896                    let options = ArrowReaderOptions::new()
1897                        .with_virtual_columns(vec![row_number_field])
1898                        .unwrap();
1899                    let reader = ParquetRecordBatchStreamBuilder::new_with_options(file, options)
1900                        .await
1901                        .unwrap()
1902                        .with_row_selection(selection)
1903                        .with_batch_size(batch_size)
1904                        .build()
1905                        .expect("Could not create reader");
1906                    reader.try_collect::<Vec<_>>().await.unwrap()
1907                })
1908            },
1909        );
1910    }
1911
1912    #[test]
1913    fn test_row_numbers_with_multiple_row_groups_and_filter() {
1914        test_row_numbers_with_multiple_row_groups_helper(
1915            true,
1916            |path, selection, row_filter, batch_size| {
1917                let runtime = tokio::runtime::Builder::new_current_thread()
1918                    .enable_all()
1919                    .build()
1920                    .expect("Could not create runtime");
1921                runtime.block_on(async move {
1922                    let file = tokio::fs::File::open(path).await.unwrap();
1923                    let row_number_field = Arc::new(
1924                        Field::new("row_number", DataType::Int64, false)
1925                            .with_extension_type(RowNumber),
1926                    );
1927                    let options = ArrowReaderOptions::new()
1928                        .with_virtual_columns(vec![row_number_field])
1929                        .unwrap();
1930                    let reader = ParquetRecordBatchStreamBuilder::new_with_options(file, options)
1931                        .await
1932                        .unwrap()
1933                        .with_row_selection(selection)
1934                        .with_row_filter(row_filter.expect("No row filter"))
1935                        .with_batch_size(batch_size)
1936                        .build()
1937                        .expect("Could not create reader");
1938                    reader.try_collect::<Vec<_>>().await.unwrap()
1939                })
1940            },
1941        );
1942    }
1943
1944    #[tokio::test]
1945    async fn test_nested_lists() -> Result<()> {
1946        // Test case for https://github.com/apache/arrow-rs/issues/8657
1947        let list_inner_field = Arc::new(Field::new("item", DataType::Float32, true));
1948        let table_schema = Arc::new(Schema::new(vec![
1949            Field::new("id", DataType::Int32, false),
1950            Field::new("vector", DataType::List(list_inner_field.clone()), true),
1951        ]));
1952
1953        let mut list_builder =
1954            ListBuilder::new(Float32Builder::new()).with_field(list_inner_field.clone());
1955        list_builder.values().append_slice(&[10.0, 10.0, 10.0]);
1956        list_builder.append(true);
1957        list_builder.values().append_slice(&[20.0, 20.0, 20.0]);
1958        list_builder.append(true);
1959        list_builder.values().append_slice(&[30.0, 30.0, 30.0]);
1960        list_builder.append(true);
1961        list_builder.values().append_slice(&[40.0, 40.0, 40.0]);
1962        list_builder.append(true);
1963        let list_array = list_builder.finish();
1964
1965        let data = vec![RecordBatch::try_new(
1966            table_schema.clone(),
1967            vec![
1968                Arc::new(Int32Array::from(vec![1, 2, 3, 4])),
1969                Arc::new(list_array),
1970            ],
1971        )?];
1972
1973        let mut buffer = Vec::new();
1974        let mut writer = AsyncArrowWriter::try_new(&mut buffer, table_schema, None)?;
1975
1976        for batch in data {
1977            writer.write(&batch).await?;
1978        }
1979
1980        writer.close().await?;
1981
1982        let reader = TestReader::new(Bytes::from(buffer));
1983        let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
1984
1985        let predicate = ArrowPredicateFn::new(ProjectionMask::all(), |batch| {
1986            Ok(BooleanArray::from(vec![true; batch.num_rows()]))
1987        });
1988
1989        let projection_mask = ProjectionMask::all();
1990
1991        let mut stream = builder
1992            .with_row_filter(RowFilter::new(vec![Box::new(predicate)]))
1993            .with_projection(projection_mask)
1994            .build()?;
1995
1996        while let Some(batch) = stream.next().await {
1997            let _ = batch.unwrap(); // ensure there is no panic
1998        }
1999
2000        Ok(())
2001    }
2002}