polars_io/parquet/read/
read_impl.rs

1use std::borrow::Cow;
2
3use arrow::bitmap::Bitmap;
4use arrow::datatypes::ArrowSchemaRef;
5use polars_core::chunked_array::builder::NullChunkedBuilder;
6use polars_core::prelude::*;
7use polars_core::series::IsSorted;
8use polars_core::utils::accumulate_dataframes_vertical;
9use polars_core::{POOL, config};
10use polars_parquet::read::{self, ColumnChunkMetadata, FileMetadata, Filter, RowGroupMetadata};
11use rayon::prelude::*;
12
13use super::mmap::mmap_columns;
14use super::utils::materialize_empty_df;
15use super::{ParallelStrategy, mmap};
16use crate::RowIndex;
17use crate::hive::materialize_hive_partitions;
18use crate::mmap::{MmapBytesReader, ReaderBytes};
19use crate::parquet::metadata::FileMetadataRef;
20use crate::parquet::read::ROW_COUNT_OVERFLOW_ERR;
21use crate::utils::slice::split_slice_at_file;
22
23#[cfg(debug_assertions)]
24// Ensure we get the proper polars types from schema inference
25// This saves unneeded casts.
26fn assert_dtypes(dtype: &ArrowDataType) {
27    use ArrowDataType as D;
28
29    match dtype {
30        // These should all be cast to the BinaryView / Utf8View variants
31        D::Utf8 | D::Binary | D::LargeUtf8 | D::LargeBinary => unreachable!(),
32
33        // These should be cast to Float32
34        D::Float16 => unreachable!(),
35
36        // This should have been converted to a LargeList
37        D::List(_) => unreachable!(),
38
39        // This should have been converted to a LargeList(Struct(_))
40        D::Map(_, _) => unreachable!(),
41
42        // Recursive checks
43        D::Dictionary(_, dtype, _) => assert_dtypes(dtype),
44        D::Extension(ext) => assert_dtypes(&ext.inner),
45        D::LargeList(inner) => assert_dtypes(&inner.dtype),
46        D::FixedSizeList(inner, _) => assert_dtypes(&inner.dtype),
47        D::Struct(fields) => fields.iter().for_each(|f| assert_dtypes(f.dtype())),
48
49        _ => {},
50    }
51}
52
53fn should_copy_sortedness(dtype: &DataType) -> bool {
54    // @NOTE: For now, we are a bit conservative with this.
55    use DataType as D;
56
57    matches!(
58        dtype,
59        D::Int8 | D::Int16 | D::Int32 | D::Int64 | D::UInt8 | D::UInt16 | D::UInt32 | D::UInt64
60    )
61}
62
63pub fn try_set_sorted_flag(series: &mut Series, col_idx: usize, sorting_map: &[(usize, IsSorted)]) {
64    let Some((sorted_col, is_sorted)) = sorting_map.first() else {
65        return;
66    };
67    if *sorted_col != col_idx || !should_copy_sortedness(series.dtype()) {
68        return;
69    }
70    if config::verbose() {
71        eprintln!(
72            "Parquet conserved SortingColumn for column chunk of '{}' to {is_sorted:?}",
73            series.name()
74        );
75    }
76
77    series.set_sorted_flag(*is_sorted);
78}
79
80pub fn create_sorting_map(md: &RowGroupMetadata) -> Vec<(usize, IsSorted)> {
81    let capacity = md.sorting_columns().map_or(0, |s| s.len());
82    let mut sorting_map = Vec::with_capacity(capacity);
83
84    if let Some(sorting_columns) = md.sorting_columns() {
85        for sorting in sorting_columns {
86            sorting_map.push((
87                sorting.column_idx as usize,
88                if sorting.descending {
89                    IsSorted::Descending
90                } else {
91                    IsSorted::Ascending
92                },
93            ))
94        }
95    }
96
97    sorting_map
98}
99
100fn column_idx_to_series(
101    column_i: usize,
102    // The metadata belonging to this column
103    field_md: &[&ColumnChunkMetadata],
104    filter: Option<Filter>,
105    file_schema: &ArrowSchema,
106    store: &mmap::ColumnStore,
107) -> PolarsResult<(Series, Bitmap)> {
108    let field = file_schema.get_at_index(column_i).unwrap().1;
109
110    #[cfg(debug_assertions)]
111    {
112        assert_dtypes(field.dtype())
113    }
114    let columns = mmap_columns(store, field_md);
115    let (array, pred_true_mask) = mmap::to_deserializer(columns, field.clone(), filter)?;
116    let series = Series::try_from((field, array))?;
117
118    Ok((series, pred_true_mask))
119}
120
121#[allow(clippy::too_many_arguments)]
122fn rg_to_dfs(
123    store: &mmap::ColumnStore,
124    previous_row_count: &mut IdxSize,
125    row_group_start: usize,
126    row_group_end: usize,
127    pre_slice: (usize, usize),
128    file_metadata: &FileMetadata,
129    schema: &ArrowSchemaRef,
130    row_index: Option<RowIndex>,
131    parallel: ParallelStrategy,
132    projection: &[usize],
133    hive_partition_columns: Option<&[Series]>,
134) -> PolarsResult<Vec<DataFrame>> {
135    if config::verbose() {
136        eprintln!("parquet scan with parallel = {parallel:?}");
137    }
138
139    // If we are only interested in the row_index, we take a little special path here.
140    if projection.is_empty() {
141        if let Some(row_index) = row_index {
142            let placeholder =
143                NullChunkedBuilder::new(PlSmallStr::from_static("__PL_TMP"), pre_slice.1).finish();
144            return Ok(vec![
145                DataFrame::new(vec![placeholder.into_series().into_column()])?
146                    .with_row_index(
147                        row_index.name.clone(),
148                        Some(row_index.offset + IdxSize::try_from(pre_slice.0).unwrap()),
149                    )?
150                    .select(std::iter::once(row_index.name))?,
151            ]);
152        }
153    }
154
155    use ParallelStrategy as S;
156
157    match parallel {
158        S::Columns | S::None => rg_to_dfs_optionally_par_over_columns(
159            store,
160            previous_row_count,
161            row_group_start,
162            row_group_end,
163            pre_slice,
164            file_metadata,
165            schema,
166            row_index,
167            parallel,
168            projection,
169            hive_partition_columns,
170        ),
171        _ => rg_to_dfs_par_over_rg(
172            store,
173            row_group_start,
174            row_group_end,
175            previous_row_count,
176            pre_slice,
177            file_metadata,
178            schema,
179            row_index,
180            projection,
181            hive_partition_columns,
182        ),
183    }
184}
185
186#[allow(clippy::too_many_arguments)]
187// might parallelize over columns
188fn rg_to_dfs_optionally_par_over_columns(
189    store: &mmap::ColumnStore,
190    previous_row_count: &mut IdxSize,
191    row_group_start: usize,
192    row_group_end: usize,
193    slice: (usize, usize),
194    file_metadata: &FileMetadata,
195    schema: &ArrowSchemaRef,
196    row_index: Option<RowIndex>,
197    parallel: ParallelStrategy,
198    projection: &[usize],
199    hive_partition_columns: Option<&[Series]>,
200) -> PolarsResult<Vec<DataFrame>> {
201    let mut dfs = Vec::with_capacity(row_group_end - row_group_start);
202
203    let mut n_rows_processed: usize = (0..row_group_start)
204        .map(|i| file_metadata.row_groups[i].num_rows())
205        .sum();
206    let slice_end = slice.0 + slice.1;
207
208    for rg_idx in row_group_start..row_group_end {
209        let md = &file_metadata.row_groups[rg_idx];
210
211        let rg_slice =
212            split_slice_at_file(&mut n_rows_processed, md.num_rows(), slice.0, slice_end);
213        let current_row_count = md.num_rows() as IdxSize;
214
215        let sorting_map = create_sorting_map(md);
216
217        let f = |column_i: &usize| {
218            let (name, field) = schema.get_at_index(*column_i).unwrap();
219
220            let Some(iter) = md.columns_under_root_iter(name) else {
221                return Ok(Column::full_null(
222                    name.clone(),
223                    rg_slice.1,
224                    &DataType::from_arrow_field(field),
225                ));
226            };
227
228            let part = iter.collect::<Vec<_>>();
229
230            let (mut series, _) = column_idx_to_series(
231                *column_i,
232                part.as_slice(),
233                Some(Filter::new_ranged(rg_slice.0, rg_slice.0 + rg_slice.1)),
234                schema,
235                store,
236            )?;
237
238            try_set_sorted_flag(&mut series, *column_i, &sorting_map);
239            Ok(series.into_column())
240        };
241
242        let columns = if let ParallelStrategy::Columns = parallel {
243            POOL.install(|| {
244                projection
245                    .par_iter()
246                    .map(f)
247                    .collect::<PolarsResult<Vec<_>>>()
248            })?
249        } else {
250            projection.iter().map(f).collect::<PolarsResult<Vec<_>>>()?
251        };
252
253        let mut df = unsafe { DataFrame::new_no_checks(rg_slice.1, columns) };
254        if let Some(rc) = &row_index {
255            unsafe {
256                df.with_row_index_mut(
257                    rc.name.clone(),
258                    Some(*previous_row_count + rc.offset + rg_slice.0 as IdxSize),
259                )
260            };
261        }
262
263        materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns);
264
265        *previous_row_count = previous_row_count.checked_add(current_row_count).ok_or_else(||
266            polars_err!(
267                ComputeError: "Parquet file produces more than pow(2, 32) rows; \
268                consider compiling with polars-bigidx feature (polars-u64-idx package on python), \
269                or set 'streaming'"
270            ),
271        )?;
272        dfs.push(df);
273
274        if *previous_row_count as usize >= slice_end {
275            break;
276        }
277    }
278
279    Ok(dfs)
280}
281
282#[allow(clippy::too_many_arguments)]
283// parallelizes over row groups
284fn rg_to_dfs_par_over_rg(
285    store: &mmap::ColumnStore,
286    row_group_start: usize,
287    row_group_end: usize,
288    rows_read: &mut IdxSize,
289    slice: (usize, usize),
290    file_metadata: &FileMetadata,
291    schema: &ArrowSchemaRef,
292    row_index: Option<RowIndex>,
293    projection: &[usize],
294    hive_partition_columns: Option<&[Series]>,
295) -> PolarsResult<Vec<DataFrame>> {
296    // compute the limits per row group and the row count offsets
297    let mut row_groups = Vec::with_capacity(row_group_end - row_group_start);
298
299    let mut n_rows_processed: usize = (0..row_group_start)
300        .map(|i| file_metadata.row_groups[i].num_rows())
301        .sum();
302    let slice_end = slice.0 + slice.1;
303
304    // rows_scanned is the number of rows that have been scanned so far when checking for overlap with the slice.
305    // rows_read is the number of rows found to overlap with the slice, and thus the number of rows that will be
306    // read into a dataframe.
307    let mut rows_scanned: IdxSize;
308
309    if row_group_start > 0 {
310        // In the case of async reads, we need to account for the fact that row_group_start may be greater than
311        // zero due to earlier processing.
312        // For details, see: https://github.com/pola-rs/polars/pull/20508#discussion_r1900165649
313        rows_scanned = (0..row_group_start)
314            .map(|i| file_metadata.row_groups[i].num_rows() as IdxSize)
315            .sum();
316    } else {
317        rows_scanned = 0;
318    }
319
320    for i in row_group_start..row_group_end {
321        let row_count_start = rows_scanned;
322        let rg_md = &file_metadata.row_groups[i];
323        let n_rows_this_file = rg_md.num_rows();
324        let rg_slice =
325            split_slice_at_file(&mut n_rows_processed, n_rows_this_file, slice.0, slice_end);
326        rows_scanned = rows_scanned
327            .checked_add(n_rows_this_file as IdxSize)
328            .ok_or(ROW_COUNT_OVERFLOW_ERR)?;
329
330        *rows_read += rg_slice.1 as IdxSize;
331
332        if rg_slice.1 == 0 {
333            continue;
334        }
335
336        row_groups.push((rg_md, rg_slice, row_count_start));
337    }
338
339    let dfs = POOL.install(|| {
340        // Set partitioned fields to prevent quadratic behavior.
341        // Ensure all row groups are partitioned.
342        row_groups
343            .into_par_iter()
344            .map(|(md, slice, row_count_start)| {
345                if slice.1 == 0 {
346                    return Ok(None);
347                }
348                // test we don't read the parquet file if this env var is set
349                #[cfg(debug_assertions)]
350                {
351                    assert!(std::env::var("POLARS_PANIC_IF_PARQUET_PARSED").is_err())
352                }
353
354                let sorting_map = create_sorting_map(md);
355
356                let columns = projection
357                    .iter()
358                    .map(|column_i| {
359                        let (name, field) = schema.get_at_index(*column_i).unwrap();
360
361                        let Some(iter) = md.columns_under_root_iter(name) else {
362                            return Ok(Column::full_null(
363                                name.clone(),
364                                md.num_rows(),
365                                &DataType::from_arrow_field(field),
366                            ));
367                        };
368
369                        let part = iter.collect::<Vec<_>>();
370
371                        let (mut series, _) = column_idx_to_series(
372                            *column_i,
373                            part.as_slice(),
374                            Some(Filter::new_ranged(slice.0, slice.0 + slice.1)),
375                            schema,
376                            store,
377                        )?;
378
379                        try_set_sorted_flag(&mut series, *column_i, &sorting_map);
380                        Ok(series.into_column())
381                    })
382                    .collect::<PolarsResult<Vec<_>>>()?;
383
384                let mut df = unsafe { DataFrame::new_no_checks(slice.1, columns) };
385
386                if let Some(rc) = &row_index {
387                    unsafe {
388                        df.with_row_index_mut(
389                            rc.name.clone(),
390                            Some(row_count_start as IdxSize + rc.offset + slice.0 as IdxSize),
391                        )
392                    };
393                }
394
395                materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns);
396
397                Ok(Some(df))
398            })
399            .collect::<PolarsResult<Vec<_>>>()
400    })?;
401    Ok(dfs.into_iter().flatten().collect())
402}
403
404#[allow(clippy::too_many_arguments)]
405pub fn read_parquet<R: MmapBytesReader>(
406    mut reader: R,
407    pre_slice: (usize, usize),
408    projection: Option<&[usize]>,
409    reader_schema: &ArrowSchemaRef,
410    metadata: Option<FileMetadataRef>,
411    mut parallel: ParallelStrategy,
412    row_index: Option<RowIndex>,
413    hive_partition_columns: Option<&[Series]>,
414) -> PolarsResult<DataFrame> {
415    // Fast path.
416    if pre_slice.1 == 0 {
417        return Ok(materialize_empty_df(
418            projection,
419            reader_schema,
420            hive_partition_columns,
421            row_index.as_ref(),
422        ));
423    }
424
425    let file_metadata = metadata
426        .map(Ok)
427        .unwrap_or_else(|| read::read_metadata(&mut reader).map(Arc::new))?;
428    let n_row_groups = file_metadata.row_groups.len();
429
430    // if there are multiple row groups and categorical data
431    // we need a string cache
432    // we keep it alive until the end of the function
433    let _sc = if n_row_groups > 1 {
434        #[cfg(feature = "dtype-categorical")]
435        {
436            Some(polars_core::StringCacheHolder::hold())
437        }
438        #[cfg(not(feature = "dtype-categorical"))]
439        {
440            Some(0u8)
441        }
442    } else {
443        None
444    };
445
446    let materialized_projection = projection
447        .map(Cow::Borrowed)
448        .unwrap_or_else(|| Cow::Owned((0usize..reader_schema.len()).collect::<Vec<_>>()));
449
450    if ParallelStrategy::Auto == parallel {
451        if n_row_groups > materialized_projection.len() || n_row_groups > POOL.current_num_threads()
452        {
453            parallel = ParallelStrategy::RowGroups;
454        } else {
455            parallel = ParallelStrategy::Columns;
456        }
457    }
458
459    if let (ParallelStrategy::Columns, true) = (parallel, materialized_projection.len() == 1) {
460        parallel = ParallelStrategy::None;
461    }
462
463    let reader = ReaderBytes::from(&mut reader);
464    let store = mmap::ColumnStore::Local(unsafe {
465        std::mem::transmute::<ReaderBytes<'_>, ReaderBytes<'static>>(reader).to_memslice()
466    });
467
468    let dfs = rg_to_dfs(
469        &store,
470        &mut 0,
471        0,
472        n_row_groups,
473        pre_slice,
474        &file_metadata,
475        reader_schema,
476        row_index.clone(),
477        parallel,
478        &materialized_projection,
479        hive_partition_columns,
480    )?;
481
482    if dfs.is_empty() {
483        Ok(materialize_empty_df(
484            projection,
485            reader_schema,
486            hive_partition_columns,
487            row_index.as_ref(),
488        ))
489    } else {
490        accumulate_dataframes_vertical(dfs)
491    }
492}
493
494pub fn calc_prefilter_cost(mask: &arrow::bitmap::Bitmap) -> f64 {
495    let num_edges = mask.num_edges() as f64;
496    let rg_len = mask.len() as f64;
497
498    // @GB: I did quite some analysis on this.
499    //
500    // Pre-filtered and Post-filtered can both be faster in certain scenarios.
501    //
502    // - Pre-filtered is faster when there is some amount of clustering or
503    // sorting involved or if the number of values selected is small.
504    // - Post-filtering is faster when the predicate selects a somewhat random
505    // elements throughout the row group.
506    //
507    // The following is a heuristic value to try and estimate which one is
508    // faster. Essentially, it sees how many times it needs to switch between
509    // skipping items and collecting items and compares it against the number
510    // of values that it will collect.
511    //
512    // Closer to 0: pre-filtering is probably better.
513    // Closer to 1: post-filtering is probably better.
514    (num_edges / rg_len).clamp(0.0, 1.0)
515}
516
517#[derive(Clone, Copy)]
518pub enum PrefilterMaskSetting {
519    Auto,
520    Pre,
521    Post,
522}
523
524impl PrefilterMaskSetting {
525    pub fn init_from_env() -> Self {
526        std::env::var("POLARS_PQ_PREFILTERED_MASK").map_or(Self::Auto, |v| match &v[..] {
527            "auto" => Self::Auto,
528            "pre" => Self::Pre,
529            "post" => Self::Post,
530            _ => panic!("Invalid `POLARS_PQ_PREFILTERED_MASK` value '{v}'."),
531        })
532    }
533
534    pub fn should_prefilter(&self, prefilter_cost: f64, dtype: &ArrowDataType) -> bool {
535        match self {
536            Self::Auto => {
537                // Prefiltering is only expensive for nested types so we make the cut-off quite
538                // high.
539                let is_nested = dtype.is_nested();
540
541                // We empirically selected these numbers.
542                !is_nested && prefilter_cost <= 0.01
543            },
544            Self::Pre => true,
545            Self::Post => false,
546        }
547    }
548}