datafusion_datasource_parquet/
reader.rs1use crate::ParquetFileMetrics;
22use crate::metadata::DFParquetMetadata;
23use bytes::Bytes;
24use datafusion_datasource::PartitionedFile;
25use datafusion_execution::cache::cache_manager::FileMetadata;
26use datafusion_execution::cache::cache_manager::FileMetadataCache;
27use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
28use futures::FutureExt;
29use futures::future::BoxFuture;
30use object_store::ObjectStore;
31use parquet::arrow::arrow_reader::ArrowReaderOptions;
32use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
33use parquet::file::metadata::ParquetMetaData;
34use std::any::Any;
35use std::collections::HashMap;
36use std::fmt::Debug;
37use std::ops::Range;
38use std::sync::Arc;
39
40pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static {
48 fn create_reader(
63 &self,
64 partition_index: usize,
65 partitioned_file: PartitionedFile,
66 metadata_size_hint: Option<usize>,
67 metrics: &ExecutionPlanMetricsSet,
68 ) -> datafusion_common::Result<Box<dyn AsyncFileReader + Send>>;
69}
70
71#[derive(Debug)]
78pub struct DefaultParquetFileReaderFactory {
79 store: Arc<dyn ObjectStore>,
80}
81
82impl DefaultParquetFileReaderFactory {
83 pub fn new(store: Arc<dyn ObjectStore>) -> Self {
85 Self { store }
86 }
87}
88
89pub struct ParquetFileReader {
98 pub file_metrics: ParquetFileMetrics,
99 pub inner: ParquetObjectReader,
100 pub partitioned_file: PartitionedFile,
101}
102
103impl AsyncFileReader for ParquetFileReader {
104 fn get_bytes(
105 &mut self,
106 range: Range<u64>,
107 ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
108 let bytes_scanned = range.end - range.start;
109 self.file_metrics.bytes_scanned.add(bytes_scanned as usize);
110 self.inner.get_bytes(range)
111 }
112
113 fn get_byte_ranges(
114 &mut self,
115 ranges: Vec<Range<u64>>,
116 ) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>>
117 where
118 Self: Send,
119 {
120 let total: u64 = ranges.iter().map(|r| r.end - r.start).sum();
121 self.file_metrics.bytes_scanned.add(total as usize);
122 self.inner.get_byte_ranges(ranges)
123 }
124
125 fn get_metadata<'a>(
126 &'a mut self,
127 options: Option<&'a ArrowReaderOptions>,
128 ) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> {
129 self.inner.get_metadata(options)
130 }
131}
132
133impl Drop for ParquetFileReader {
134 fn drop(&mut self) {
135 self.file_metrics
136 .scan_efficiency_ratio
137 .add_part(self.file_metrics.bytes_scanned.value());
138 self.file_metrics
140 .scan_efficiency_ratio
141 .set_total(self.partitioned_file.object_meta.size as usize);
142 }
143}
144
145impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
146 fn create_reader(
147 &self,
148 partition_index: usize,
149 partitioned_file: PartitionedFile,
150 metadata_size_hint: Option<usize>,
151 metrics: &ExecutionPlanMetricsSet,
152 ) -> datafusion_common::Result<Box<dyn AsyncFileReader + Send>> {
153 let file_metrics = ParquetFileMetrics::new(
154 partition_index,
155 partitioned_file.object_meta.location.as_ref(),
156 metrics,
157 );
158 let store = Arc::clone(&self.store);
159 let mut inner = ParquetObjectReader::new(
160 store,
161 partitioned_file.object_meta.location.clone(),
162 )
163 .with_file_size(partitioned_file.object_meta.size);
164
165 if let Some(hint) = metadata_size_hint {
166 inner = inner.with_footer_size_hint(hint)
167 };
168
169 Ok(Box::new(ParquetFileReader {
170 inner,
171 file_metrics,
172 partitioned_file,
173 }))
174 }
175}
176
177#[derive(Debug)]
183pub struct CachedParquetFileReaderFactory {
184 store: Arc<dyn ObjectStore>,
185 metadata_cache: Arc<dyn FileMetadataCache>,
186}
187
188impl CachedParquetFileReaderFactory {
189 pub fn new(
190 store: Arc<dyn ObjectStore>,
191 metadata_cache: Arc<dyn FileMetadataCache>,
192 ) -> Self {
193 Self {
194 store,
195 metadata_cache,
196 }
197 }
198}
199
200impl ParquetFileReaderFactory for CachedParquetFileReaderFactory {
201 fn create_reader(
202 &self,
203 partition_index: usize,
204 partitioned_file: PartitionedFile,
205 metadata_size_hint: Option<usize>,
206 metrics: &ExecutionPlanMetricsSet,
207 ) -> datafusion_common::Result<Box<dyn AsyncFileReader + Send>> {
208 let file_metrics = ParquetFileMetrics::new(
209 partition_index,
210 partitioned_file.object_meta.location.as_ref(),
211 metrics,
212 );
213 let store = Arc::clone(&self.store);
214
215 let mut inner = ParquetObjectReader::new(
216 store,
217 partitioned_file.object_meta.location.clone(),
218 )
219 .with_file_size(partitioned_file.object_meta.size);
220
221 if let Some(hint) = metadata_size_hint {
222 inner = inner.with_footer_size_hint(hint)
223 };
224
225 Ok(Box::new(CachedParquetFileReader::new(
226 file_metrics,
227 Arc::clone(&self.store),
228 inner,
229 partitioned_file,
230 Arc::clone(&self.metadata_cache),
231 metadata_size_hint,
232 )))
233 }
234}
235
236pub struct CachedParquetFileReader {
240 pub file_metrics: ParquetFileMetrics,
241 store: Arc<dyn ObjectStore>,
242 pub inner: ParquetObjectReader,
243 partitioned_file: PartitionedFile,
244 metadata_cache: Arc<dyn FileMetadataCache>,
245 metadata_size_hint: Option<usize>,
246}
247
248impl CachedParquetFileReader {
249 pub fn new(
250 file_metrics: ParquetFileMetrics,
251 store: Arc<dyn ObjectStore>,
252 inner: ParquetObjectReader,
253 partitioned_file: PartitionedFile,
254 metadata_cache: Arc<dyn FileMetadataCache>,
255 metadata_size_hint: Option<usize>,
256 ) -> Self {
257 Self {
258 file_metrics,
259 store,
260 inner,
261 partitioned_file,
262 metadata_cache,
263 metadata_size_hint,
264 }
265 }
266}
267
268impl AsyncFileReader for CachedParquetFileReader {
269 fn get_bytes(
270 &mut self,
271 range: Range<u64>,
272 ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
273 let bytes_scanned = range.end - range.start;
274 self.file_metrics.bytes_scanned.add(bytes_scanned as usize);
275 self.inner.get_bytes(range)
276 }
277
278 fn get_byte_ranges(
279 &mut self,
280 ranges: Vec<Range<u64>>,
281 ) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>>
282 where
283 Self: Send,
284 {
285 let total: u64 = ranges.iter().map(|r| r.end - r.start).sum();
286 self.file_metrics.bytes_scanned.add(total as usize);
287 self.inner.get_byte_ranges(ranges)
288 }
289
290 fn get_metadata<'a>(
291 &'a mut self,
292 #[cfg_attr(not(feature = "parquet_encryption"), expect(unused_variables))]
293 options: Option<&'a ArrowReaderOptions>,
294 ) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> {
295 let object_meta = self.partitioned_file.object_meta.clone();
296 let metadata_cache = Arc::clone(&self.metadata_cache);
297
298 async move {
299 #[cfg(feature = "parquet_encryption")]
300 let file_decryption_properties = options
301 .and_then(|o| o.file_decryption_properties())
302 .map(Arc::clone);
303
304 #[cfg(not(feature = "parquet_encryption"))]
305 let file_decryption_properties = None;
306
307 DFParquetMetadata::new(&self.store, &object_meta)
308 .with_decryption_properties(file_decryption_properties)
309 .with_file_metadata_cache(Some(Arc::clone(&metadata_cache)))
310 .with_metadata_size_hint(self.metadata_size_hint)
311 .fetch_metadata()
312 .await
313 .map_err(|e| {
314 parquet::errors::ParquetError::General(format!(
315 "Failed to fetch metadata for file {}: {e}",
316 object_meta.location,
317 ))
318 })
319 }
320 .boxed()
321 }
322}
323
324impl Drop for CachedParquetFileReader {
325 fn drop(&mut self) {
326 self.file_metrics
327 .scan_efficiency_ratio
328 .add_part(self.file_metrics.bytes_scanned.value());
329 self.file_metrics
331 .scan_efficiency_ratio
332 .set_total(self.partitioned_file.object_meta.size as usize);
333 }
334}
335
336pub struct CachedParquetMetaData(Arc<ParquetMetaData>);
338
339impl CachedParquetMetaData {
340 pub fn new(metadata: Arc<ParquetMetaData>) -> Self {
341 Self(metadata)
342 }
343
344 pub fn parquet_metadata(&self) -> &Arc<ParquetMetaData> {
345 &self.0
346 }
347}
348
349impl FileMetadata for CachedParquetMetaData {
350 fn as_any(&self) -> &dyn Any {
351 self
352 }
353
354 fn memory_size(&self) -> usize {
355 self.0.memory_size()
356 }
357
358 fn extra_info(&self) -> HashMap<String, String> {
359 let page_index =
360 self.0.column_index().is_some() && self.0.offset_index().is_some();
361 HashMap::from([("page_index".to_owned(), page_index.to_string())])
362 }
363}