datafusion_datasource_parquet/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ParquetFileReaderFactory`] and [`DefaultParquetFileReaderFactory`] for
19//! low level control of parquet file readers
20
21use crate::ParquetFileMetrics;
22use crate::metadata::DFParquetMetadata;
23use bytes::Bytes;
24use datafusion_datasource::PartitionedFile;
25use datafusion_execution::cache::cache_manager::FileMetadata;
26use datafusion_execution::cache::cache_manager::FileMetadataCache;
27use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
28use futures::FutureExt;
29use futures::future::BoxFuture;
30use object_store::ObjectStore;
31use parquet::arrow::arrow_reader::ArrowReaderOptions;
32use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
33use parquet::file::metadata::ParquetMetaData;
34use std::any::Any;
35use std::collections::HashMap;
36use std::fmt::Debug;
37use std::ops::Range;
38use std::sync::Arc;
39
40/// Interface for reading parquet files.
41///
42/// The combined implementations of [`ParquetFileReaderFactory`] and
43/// [`AsyncFileReader`] can be used to provide custom data access operations
44/// such as pre-cached metadata, I/O coalescing, etc.
45///
46/// See [`DefaultParquetFileReaderFactory`] for a simple implementation.
47pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static {
48    /// Provides an `AsyncFileReader` for reading data from a parquet file specified
49    ///
50    /// # Notes
51    ///
52    /// If the resulting [`AsyncFileReader`]  returns `ParquetMetaData` without
53    /// page index information, the reader will load it on demand. Thus it is important
54    /// to ensure that the returned `ParquetMetaData` has the necessary information
55    /// if you wish to avoid a subsequent I/O
56    ///
57    /// # Arguments
58    /// * partition_index - Index of the partition (for reporting metrics)
59    /// * file - The file to be read
60    /// * metadata_size_hint - If specified, the first IO reads this many bytes from the footer
61    /// * metrics - Execution metrics
62    fn create_reader(
63        &self,
64        partition_index: usize,
65        partitioned_file: PartitionedFile,
66        metadata_size_hint: Option<usize>,
67        metrics: &ExecutionPlanMetricsSet,
68    ) -> datafusion_common::Result<Box<dyn AsyncFileReader + Send>>;
69}
70
71/// Default implementation of [`ParquetFileReaderFactory`]
72///
73/// This implementation:
74/// 1. Reads parquet directly from an underlying [`ObjectStore`] instance.
75/// 2. Reads the footer and page metadata on demand.
76/// 3. Does not cache metadata or coalesce I/O operations.
77#[derive(Debug)]
78pub struct DefaultParquetFileReaderFactory {
79    store: Arc<dyn ObjectStore>,
80}
81
82impl DefaultParquetFileReaderFactory {
83    /// Create a new `DefaultParquetFileReaderFactory`.
84    pub fn new(store: Arc<dyn ObjectStore>) -> Self {
85        Self { store }
86    }
87}
88
89/// Implements [`AsyncFileReader`] for a parquet file in object storage.
90///
91/// This implementation uses the [`ParquetObjectReader`] to read data from the
92/// object store on demand, as required, tracking the number of bytes read.
93///
94/// This implementation does not coalesce I/O operations or cache bytes. Such
95/// optimizations can be done either at the object store level or by providing a
96/// custom implementation of [`ParquetFileReaderFactory`].
97pub struct ParquetFileReader {
98    pub file_metrics: ParquetFileMetrics,
99    pub inner: ParquetObjectReader,
100    pub partitioned_file: PartitionedFile,
101}
102
103impl AsyncFileReader for ParquetFileReader {
104    fn get_bytes(
105        &mut self,
106        range: Range<u64>,
107    ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
108        let bytes_scanned = range.end - range.start;
109        self.file_metrics.bytes_scanned.add(bytes_scanned as usize);
110        self.inner.get_bytes(range)
111    }
112
113    fn get_byte_ranges(
114        &mut self,
115        ranges: Vec<Range<u64>>,
116    ) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>>
117    where
118        Self: Send,
119    {
120        let total: u64 = ranges.iter().map(|r| r.end - r.start).sum();
121        self.file_metrics.bytes_scanned.add(total as usize);
122        self.inner.get_byte_ranges(ranges)
123    }
124
125    fn get_metadata<'a>(
126        &'a mut self,
127        options: Option<&'a ArrowReaderOptions>,
128    ) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> {
129        self.inner.get_metadata(options)
130    }
131}
132
133impl Drop for ParquetFileReader {
134    fn drop(&mut self) {
135        self.file_metrics
136            .scan_efficiency_ratio
137            .add_part(self.file_metrics.bytes_scanned.value());
138        // Multiple ParquetFileReaders may run, so we set_total to avoid adding the total multiple times
139        self.file_metrics
140            .scan_efficiency_ratio
141            .set_total(self.partitioned_file.object_meta.size as usize);
142    }
143}
144
145impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
146    fn create_reader(
147        &self,
148        partition_index: usize,
149        partitioned_file: PartitionedFile,
150        metadata_size_hint: Option<usize>,
151        metrics: &ExecutionPlanMetricsSet,
152    ) -> datafusion_common::Result<Box<dyn AsyncFileReader + Send>> {
153        let file_metrics = ParquetFileMetrics::new(
154            partition_index,
155            partitioned_file.object_meta.location.as_ref(),
156            metrics,
157        );
158        let store = Arc::clone(&self.store);
159        let mut inner = ParquetObjectReader::new(
160            store,
161            partitioned_file.object_meta.location.clone(),
162        )
163        .with_file_size(partitioned_file.object_meta.size);
164
165        if let Some(hint) = metadata_size_hint {
166            inner = inner.with_footer_size_hint(hint)
167        };
168
169        Ok(Box::new(ParquetFileReader {
170            inner,
171            file_metrics,
172            partitioned_file,
173        }))
174    }
175}
176
177/// Implementation of [`ParquetFileReaderFactory`] supporting the caching of footer and page
178/// metadata. Reads and updates the [`FileMetadataCache`] with the [`ParquetMetaData`] data.
179/// This reader always loads the entire metadata (including page index, unless the file is
180/// encrypted), even if not required by the current query, to ensure it is always available for
181/// those that need it.
182#[derive(Debug)]
183pub struct CachedParquetFileReaderFactory {
184    store: Arc<dyn ObjectStore>,
185    metadata_cache: Arc<dyn FileMetadataCache>,
186}
187
188impl CachedParquetFileReaderFactory {
189    pub fn new(
190        store: Arc<dyn ObjectStore>,
191        metadata_cache: Arc<dyn FileMetadataCache>,
192    ) -> Self {
193        Self {
194            store,
195            metadata_cache,
196        }
197    }
198}
199
200impl ParquetFileReaderFactory for CachedParquetFileReaderFactory {
201    fn create_reader(
202        &self,
203        partition_index: usize,
204        partitioned_file: PartitionedFile,
205        metadata_size_hint: Option<usize>,
206        metrics: &ExecutionPlanMetricsSet,
207    ) -> datafusion_common::Result<Box<dyn AsyncFileReader + Send>> {
208        let file_metrics = ParquetFileMetrics::new(
209            partition_index,
210            partitioned_file.object_meta.location.as_ref(),
211            metrics,
212        );
213        let store = Arc::clone(&self.store);
214
215        let mut inner = ParquetObjectReader::new(
216            store,
217            partitioned_file.object_meta.location.clone(),
218        )
219        .with_file_size(partitioned_file.object_meta.size);
220
221        if let Some(hint) = metadata_size_hint {
222            inner = inner.with_footer_size_hint(hint)
223        };
224
225        Ok(Box::new(CachedParquetFileReader::new(
226            file_metrics,
227            Arc::clone(&self.store),
228            inner,
229            partitioned_file,
230            Arc::clone(&self.metadata_cache),
231            metadata_size_hint,
232        )))
233    }
234}
235
236/// Implements [`AsyncFileReader`] for a Parquet file in object storage. Reads the file metadata
237/// from the [`FileMetadataCache`], if available, otherwise reads it directly from the file and then
238/// updates the cache.
239pub struct CachedParquetFileReader {
240    pub file_metrics: ParquetFileMetrics,
241    store: Arc<dyn ObjectStore>,
242    pub inner: ParquetObjectReader,
243    partitioned_file: PartitionedFile,
244    metadata_cache: Arc<dyn FileMetadataCache>,
245    metadata_size_hint: Option<usize>,
246}
247
248impl CachedParquetFileReader {
249    pub fn new(
250        file_metrics: ParquetFileMetrics,
251        store: Arc<dyn ObjectStore>,
252        inner: ParquetObjectReader,
253        partitioned_file: PartitionedFile,
254        metadata_cache: Arc<dyn FileMetadataCache>,
255        metadata_size_hint: Option<usize>,
256    ) -> Self {
257        Self {
258            file_metrics,
259            store,
260            inner,
261            partitioned_file,
262            metadata_cache,
263            metadata_size_hint,
264        }
265    }
266}
267
268impl AsyncFileReader for CachedParquetFileReader {
269    fn get_bytes(
270        &mut self,
271        range: Range<u64>,
272    ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
273        let bytes_scanned = range.end - range.start;
274        self.file_metrics.bytes_scanned.add(bytes_scanned as usize);
275        self.inner.get_bytes(range)
276    }
277
278    fn get_byte_ranges(
279        &mut self,
280        ranges: Vec<Range<u64>>,
281    ) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>>
282    where
283        Self: Send,
284    {
285        let total: u64 = ranges.iter().map(|r| r.end - r.start).sum();
286        self.file_metrics.bytes_scanned.add(total as usize);
287        self.inner.get_byte_ranges(ranges)
288    }
289
290    fn get_metadata<'a>(
291        &'a mut self,
292        #[cfg_attr(not(feature = "parquet_encryption"), expect(unused_variables))]
293        options: Option<&'a ArrowReaderOptions>,
294    ) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> {
295        let object_meta = self.partitioned_file.object_meta.clone();
296        let metadata_cache = Arc::clone(&self.metadata_cache);
297
298        async move {
299            #[cfg(feature = "parquet_encryption")]
300            let file_decryption_properties = options
301                .and_then(|o| o.file_decryption_properties())
302                .map(Arc::clone);
303
304            #[cfg(not(feature = "parquet_encryption"))]
305            let file_decryption_properties = None;
306
307            DFParquetMetadata::new(&self.store, &object_meta)
308                .with_decryption_properties(file_decryption_properties)
309                .with_file_metadata_cache(Some(Arc::clone(&metadata_cache)))
310                .with_metadata_size_hint(self.metadata_size_hint)
311                .fetch_metadata()
312                .await
313                .map_err(|e| {
314                    parquet::errors::ParquetError::General(format!(
315                        "Failed to fetch metadata for file {}: {e}",
316                        object_meta.location,
317                    ))
318                })
319        }
320        .boxed()
321    }
322}
323
324impl Drop for CachedParquetFileReader {
325    fn drop(&mut self) {
326        self.file_metrics
327            .scan_efficiency_ratio
328            .add_part(self.file_metrics.bytes_scanned.value());
329        // Multiple ParquetFileReaders may run, so we set_total to avoid adding the total multiple times
330        self.file_metrics
331            .scan_efficiency_ratio
332            .set_total(self.partitioned_file.object_meta.size as usize);
333    }
334}
335
336/// Wrapper to implement [`FileMetadata`] for [`ParquetMetaData`].
337pub struct CachedParquetMetaData(Arc<ParquetMetaData>);
338
339impl CachedParquetMetaData {
340    pub fn new(metadata: Arc<ParquetMetaData>) -> Self {
341        Self(metadata)
342    }
343
344    pub fn parquet_metadata(&self) -> &Arc<ParquetMetaData> {
345        &self.0
346    }
347}
348
349impl FileMetadata for CachedParquetMetaData {
350    fn as_any(&self) -> &dyn Any {
351        self
352    }
353
354    fn memory_size(&self) -> usize {
355        self.0.memory_size()
356    }
357
358    fn extra_info(&self) -> HashMap<String, String> {
359        let page_index =
360            self.0.column_index().is_some() && self.0.offset_index().is_some();
361        HashMap::from([("page_index".to_owned(), page_index.to_string())])
362    }
363}