1use std::borrow::Cow;
2
3use arrow::bitmap::Bitmap;
4use arrow::datatypes::ArrowSchemaRef;
5use polars_core::chunked_array::builder::NullChunkedBuilder;
6use polars_core::prelude::*;
7use polars_core::series::IsSorted;
8use polars_core::utils::accumulate_dataframes_vertical;
9use polars_core::{POOL, config};
10use polars_parquet::read::{self, ColumnChunkMetadata, FileMetadata, Filter, RowGroupMetadata};
11use rayon::prelude::*;
12
13use super::mmap::mmap_columns;
14use super::utils::materialize_empty_df;
15use super::{ParallelStrategy, mmap};
16use crate::RowIndex;
17use crate::hive::materialize_hive_partitions;
18use crate::mmap::{MmapBytesReader, ReaderBytes};
19use crate::parquet::metadata::FileMetadataRef;
20use crate::parquet::read::ROW_COUNT_OVERFLOW_ERR;
21use crate::utils::slice::split_slice_at_file;
22
23#[cfg(debug_assertions)]
24fn assert_dtypes(dtype: &ArrowDataType) {
27 use ArrowDataType as D;
28
29 match dtype {
30 D::Utf8 | D::Binary | D::LargeUtf8 | D::LargeBinary => unreachable!(),
32
33 D::Float16 => unreachable!(),
35
36 D::List(_) => unreachable!(),
38
39 D::Map(_, _) => unreachable!(),
41
42 D::Dictionary(_, dtype, _) => assert_dtypes(dtype),
44 D::Extension(ext) => assert_dtypes(&ext.inner),
45 D::LargeList(inner) => assert_dtypes(&inner.dtype),
46 D::FixedSizeList(inner, _) => assert_dtypes(&inner.dtype),
47 D::Struct(fields) => fields.iter().for_each(|f| assert_dtypes(f.dtype())),
48
49 _ => {},
50 }
51}
52
53fn should_copy_sortedness(dtype: &DataType) -> bool {
54 use DataType as D;
56
57 matches!(
58 dtype,
59 D::Int8 | D::Int16 | D::Int32 | D::Int64 | D::UInt8 | D::UInt16 | D::UInt32 | D::UInt64
60 )
61}
62
63pub fn try_set_sorted_flag(series: &mut Series, col_idx: usize, sorting_map: &[(usize, IsSorted)]) {
64 let Some((sorted_col, is_sorted)) = sorting_map.first() else {
65 return;
66 };
67 if *sorted_col != col_idx || !should_copy_sortedness(series.dtype()) {
68 return;
69 }
70 if config::verbose() {
71 eprintln!(
72 "Parquet conserved SortingColumn for column chunk of '{}' to {is_sorted:?}",
73 series.name()
74 );
75 }
76
77 series.set_sorted_flag(*is_sorted);
78}
79
80pub fn create_sorting_map(md: &RowGroupMetadata) -> Vec<(usize, IsSorted)> {
81 let capacity = md.sorting_columns().map_or(0, |s| s.len());
82 let mut sorting_map = Vec::with_capacity(capacity);
83
84 if let Some(sorting_columns) = md.sorting_columns() {
85 for sorting in sorting_columns {
86 sorting_map.push((
87 sorting.column_idx as usize,
88 if sorting.descending {
89 IsSorted::Descending
90 } else {
91 IsSorted::Ascending
92 },
93 ))
94 }
95 }
96
97 sorting_map
98}
99
100fn column_idx_to_series(
101 column_i: usize,
102 field_md: &[&ColumnChunkMetadata],
104 filter: Option<Filter>,
105 file_schema: &ArrowSchema,
106 store: &mmap::ColumnStore,
107) -> PolarsResult<(Series, Bitmap)> {
108 let field = file_schema.get_at_index(column_i).unwrap().1;
109
110 #[cfg(debug_assertions)]
111 {
112 assert_dtypes(field.dtype())
113 }
114 let columns = mmap_columns(store, field_md);
115 let (array, pred_true_mask) = mmap::to_deserializer(columns, field.clone(), filter)?;
116 let series = Series::try_from((field, array))?;
117
118 Ok((series, pred_true_mask))
119}
120
121#[allow(clippy::too_many_arguments)]
122fn rg_to_dfs(
123 store: &mmap::ColumnStore,
124 previous_row_count: &mut IdxSize,
125 row_group_start: usize,
126 row_group_end: usize,
127 pre_slice: (usize, usize),
128 file_metadata: &FileMetadata,
129 schema: &ArrowSchemaRef,
130 row_index: Option<RowIndex>,
131 parallel: ParallelStrategy,
132 projection: &[usize],
133 hive_partition_columns: Option<&[Series]>,
134) -> PolarsResult<Vec<DataFrame>> {
135 if config::verbose() {
136 eprintln!("parquet scan with parallel = {parallel:?}");
137 }
138
139 if projection.is_empty() {
141 if let Some(row_index) = row_index {
142 let placeholder =
143 NullChunkedBuilder::new(PlSmallStr::from_static("__PL_TMP"), pre_slice.1).finish();
144 return Ok(vec![
145 DataFrame::new(vec![placeholder.into_series().into_column()])?
146 .with_row_index(
147 row_index.name.clone(),
148 Some(row_index.offset + IdxSize::try_from(pre_slice.0).unwrap()),
149 )?
150 .select(std::iter::once(row_index.name))?,
151 ]);
152 }
153 }
154
155 use ParallelStrategy as S;
156
157 match parallel {
158 S::Columns | S::None => rg_to_dfs_optionally_par_over_columns(
159 store,
160 previous_row_count,
161 row_group_start,
162 row_group_end,
163 pre_slice,
164 file_metadata,
165 schema,
166 row_index,
167 parallel,
168 projection,
169 hive_partition_columns,
170 ),
171 _ => rg_to_dfs_par_over_rg(
172 store,
173 row_group_start,
174 row_group_end,
175 previous_row_count,
176 pre_slice,
177 file_metadata,
178 schema,
179 row_index,
180 projection,
181 hive_partition_columns,
182 ),
183 }
184}
185
186#[allow(clippy::too_many_arguments)]
187fn rg_to_dfs_optionally_par_over_columns(
189 store: &mmap::ColumnStore,
190 previous_row_count: &mut IdxSize,
191 row_group_start: usize,
192 row_group_end: usize,
193 slice: (usize, usize),
194 file_metadata: &FileMetadata,
195 schema: &ArrowSchemaRef,
196 row_index: Option<RowIndex>,
197 parallel: ParallelStrategy,
198 projection: &[usize],
199 hive_partition_columns: Option<&[Series]>,
200) -> PolarsResult<Vec<DataFrame>> {
201 let mut dfs = Vec::with_capacity(row_group_end - row_group_start);
202
203 let mut n_rows_processed: usize = (0..row_group_start)
204 .map(|i| file_metadata.row_groups[i].num_rows())
205 .sum();
206 let slice_end = slice.0 + slice.1;
207
208 for rg_idx in row_group_start..row_group_end {
209 let md = &file_metadata.row_groups[rg_idx];
210
211 let rg_slice =
212 split_slice_at_file(&mut n_rows_processed, md.num_rows(), slice.0, slice_end);
213 let current_row_count = md.num_rows() as IdxSize;
214
215 let sorting_map = create_sorting_map(md);
216
217 let f = |column_i: &usize| {
218 let (name, field) = schema.get_at_index(*column_i).unwrap();
219
220 let Some(iter) = md.columns_under_root_iter(name) else {
221 return Ok(Column::full_null(
222 name.clone(),
223 rg_slice.1,
224 &DataType::from_arrow_field(field),
225 ));
226 };
227
228 let part = iter.collect::<Vec<_>>();
229
230 let (mut series, _) = column_idx_to_series(
231 *column_i,
232 part.as_slice(),
233 Some(Filter::new_ranged(rg_slice.0, rg_slice.0 + rg_slice.1)),
234 schema,
235 store,
236 )?;
237
238 try_set_sorted_flag(&mut series, *column_i, &sorting_map);
239 Ok(series.into_column())
240 };
241
242 let columns = if let ParallelStrategy::Columns = parallel {
243 POOL.install(|| {
244 projection
245 .par_iter()
246 .map(f)
247 .collect::<PolarsResult<Vec<_>>>()
248 })?
249 } else {
250 projection.iter().map(f).collect::<PolarsResult<Vec<_>>>()?
251 };
252
253 let mut df = unsafe { DataFrame::new_no_checks(rg_slice.1, columns) };
254 if let Some(rc) = &row_index {
255 unsafe {
256 df.with_row_index_mut(
257 rc.name.clone(),
258 Some(*previous_row_count + rc.offset + rg_slice.0 as IdxSize),
259 )
260 };
261 }
262
263 materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns);
264
265 *previous_row_count = previous_row_count.checked_add(current_row_count).ok_or_else(||
266 polars_err!(
267 ComputeError: "Parquet file produces more than pow(2, 32) rows; \
268 consider compiling with polars-bigidx feature (polars-u64-idx package on python), \
269 or set 'streaming'"
270 ),
271 )?;
272 dfs.push(df);
273
274 if *previous_row_count as usize >= slice_end {
275 break;
276 }
277 }
278
279 Ok(dfs)
280}
281
282#[allow(clippy::too_many_arguments)]
283fn rg_to_dfs_par_over_rg(
285 store: &mmap::ColumnStore,
286 row_group_start: usize,
287 row_group_end: usize,
288 rows_read: &mut IdxSize,
289 slice: (usize, usize),
290 file_metadata: &FileMetadata,
291 schema: &ArrowSchemaRef,
292 row_index: Option<RowIndex>,
293 projection: &[usize],
294 hive_partition_columns: Option<&[Series]>,
295) -> PolarsResult<Vec<DataFrame>> {
296 let mut row_groups = Vec::with_capacity(row_group_end - row_group_start);
298
299 let mut n_rows_processed: usize = (0..row_group_start)
300 .map(|i| file_metadata.row_groups[i].num_rows())
301 .sum();
302 let slice_end = slice.0 + slice.1;
303
304 let mut rows_scanned: IdxSize;
308
309 if row_group_start > 0 {
310 rows_scanned = (0..row_group_start)
314 .map(|i| file_metadata.row_groups[i].num_rows() as IdxSize)
315 .sum();
316 } else {
317 rows_scanned = 0;
318 }
319
320 for i in row_group_start..row_group_end {
321 let row_count_start = rows_scanned;
322 let rg_md = &file_metadata.row_groups[i];
323 let n_rows_this_file = rg_md.num_rows();
324 let rg_slice =
325 split_slice_at_file(&mut n_rows_processed, n_rows_this_file, slice.0, slice_end);
326 rows_scanned = rows_scanned
327 .checked_add(n_rows_this_file as IdxSize)
328 .ok_or(ROW_COUNT_OVERFLOW_ERR)?;
329
330 *rows_read += rg_slice.1 as IdxSize;
331
332 if rg_slice.1 == 0 {
333 continue;
334 }
335
336 row_groups.push((rg_md, rg_slice, row_count_start));
337 }
338
339 let dfs = POOL.install(|| {
340 row_groups
343 .into_par_iter()
344 .map(|(md, slice, row_count_start)| {
345 if slice.1 == 0 {
346 return Ok(None);
347 }
348 #[cfg(debug_assertions)]
350 {
351 assert!(std::env::var("POLARS_PANIC_IF_PARQUET_PARSED").is_err())
352 }
353
354 let sorting_map = create_sorting_map(md);
355
356 let columns = projection
357 .iter()
358 .map(|column_i| {
359 let (name, field) = schema.get_at_index(*column_i).unwrap();
360
361 let Some(iter) = md.columns_under_root_iter(name) else {
362 return Ok(Column::full_null(
363 name.clone(),
364 md.num_rows(),
365 &DataType::from_arrow_field(field),
366 ));
367 };
368
369 let part = iter.collect::<Vec<_>>();
370
371 let (mut series, _) = column_idx_to_series(
372 *column_i,
373 part.as_slice(),
374 Some(Filter::new_ranged(slice.0, slice.0 + slice.1)),
375 schema,
376 store,
377 )?;
378
379 try_set_sorted_flag(&mut series, *column_i, &sorting_map);
380 Ok(series.into_column())
381 })
382 .collect::<PolarsResult<Vec<_>>>()?;
383
384 let mut df = unsafe { DataFrame::new_no_checks(slice.1, columns) };
385
386 if let Some(rc) = &row_index {
387 unsafe {
388 df.with_row_index_mut(
389 rc.name.clone(),
390 Some(row_count_start as IdxSize + rc.offset + slice.0 as IdxSize),
391 )
392 };
393 }
394
395 materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns);
396
397 Ok(Some(df))
398 })
399 .collect::<PolarsResult<Vec<_>>>()
400 })?;
401 Ok(dfs.into_iter().flatten().collect())
402}
403
404#[allow(clippy::too_many_arguments)]
405pub fn read_parquet<R: MmapBytesReader>(
406 mut reader: R,
407 pre_slice: (usize, usize),
408 projection: Option<&[usize]>,
409 reader_schema: &ArrowSchemaRef,
410 metadata: Option<FileMetadataRef>,
411 mut parallel: ParallelStrategy,
412 row_index: Option<RowIndex>,
413 hive_partition_columns: Option<&[Series]>,
414) -> PolarsResult<DataFrame> {
415 if pre_slice.1 == 0 {
417 return Ok(materialize_empty_df(
418 projection,
419 reader_schema,
420 hive_partition_columns,
421 row_index.as_ref(),
422 ));
423 }
424
425 let file_metadata = metadata
426 .map(Ok)
427 .unwrap_or_else(|| read::read_metadata(&mut reader).map(Arc::new))?;
428 let n_row_groups = file_metadata.row_groups.len();
429
430 let _sc = if n_row_groups > 1 {
434 #[cfg(feature = "dtype-categorical")]
435 {
436 Some(polars_core::StringCacheHolder::hold())
437 }
438 #[cfg(not(feature = "dtype-categorical"))]
439 {
440 Some(0u8)
441 }
442 } else {
443 None
444 };
445
446 let materialized_projection = projection
447 .map(Cow::Borrowed)
448 .unwrap_or_else(|| Cow::Owned((0usize..reader_schema.len()).collect::<Vec<_>>()));
449
450 if ParallelStrategy::Auto == parallel {
451 if n_row_groups > materialized_projection.len() || n_row_groups > POOL.current_num_threads()
452 {
453 parallel = ParallelStrategy::RowGroups;
454 } else {
455 parallel = ParallelStrategy::Columns;
456 }
457 }
458
459 if let (ParallelStrategy::Columns, true) = (parallel, materialized_projection.len() == 1) {
460 parallel = ParallelStrategy::None;
461 }
462
463 let reader = ReaderBytes::from(&mut reader);
464 let store = mmap::ColumnStore::Local(unsafe {
465 std::mem::transmute::<ReaderBytes<'_>, ReaderBytes<'static>>(reader).to_memslice()
466 });
467
468 let dfs = rg_to_dfs(
469 &store,
470 &mut 0,
471 0,
472 n_row_groups,
473 pre_slice,
474 &file_metadata,
475 reader_schema,
476 row_index.clone(),
477 parallel,
478 &materialized_projection,
479 hive_partition_columns,
480 )?;
481
482 if dfs.is_empty() {
483 Ok(materialize_empty_df(
484 projection,
485 reader_schema,
486 hive_partition_columns,
487 row_index.as_ref(),
488 ))
489 } else {
490 accumulate_dataframes_vertical(dfs)
491 }
492}
493
494pub fn calc_prefilter_cost(mask: &arrow::bitmap::Bitmap) -> f64 {
495 let num_edges = mask.num_edges() as f64;
496 let rg_len = mask.len() as f64;
497
498 (num_edges / rg_len).clamp(0.0, 1.0)
515}
516
517#[derive(Clone, Copy)]
518pub enum PrefilterMaskSetting {
519 Auto,
520 Pre,
521 Post,
522}
523
524impl PrefilterMaskSetting {
525 pub fn init_from_env() -> Self {
526 std::env::var("POLARS_PQ_PREFILTERED_MASK").map_or(Self::Auto, |v| match &v[..] {
527 "auto" => Self::Auto,
528 "pre" => Self::Pre,
529 "post" => Self::Post,
530 _ => panic!("Invalid `POLARS_PQ_PREFILTERED_MASK` value '{v}'."),
531 })
532 }
533
534 pub fn should_prefilter(&self, prefilter_cost: f64, dtype: &ArrowDataType) -> bool {
535 match self {
536 Self::Auto => {
537 let is_nested = dtype.is_nested();
540
541 !is_nested && prefilter_cost <= 0.01
543 },
544 Self::Pre => true,
545 Self::Post => false,
546 }
547 }
548}