polars_io/
predicates.rs

1use polars_core::prelude::*;
2#[cfg(feature = "serde")]
3use serde::{Deserialize, Serialize};
4
5pub trait PhysicalIoExpr: Send + Sync {
6    /// Take a [`DataFrame`] and produces a boolean [`Series`] that serves
7    /// as a predicate mask
8    fn evaluate_io(&self, df: &DataFrame) -> PolarsResult<Series>;
9
10    /// Get the variables that are used in the expression i.e. live variables.
11    /// This can contain duplicates.
12    fn collect_live_columns(&self, live_columns: &mut PlIndexSet<PlSmallStr>);
13
14    /// Can take &dyn Statistics and determine of a file should be
15    /// read -> `true`
16    /// or not -> `false`
17    fn as_stats_evaluator(&self) -> Option<&dyn StatsEvaluator> {
18        None
19    }
20}
21
22pub trait StatsEvaluator {
23    fn should_read(&self, stats: &BatchStats) -> PolarsResult<bool>;
24}
25
26#[cfg(any(feature = "parquet", feature = "ipc"))]
27pub fn apply_predicate(
28    df: &mut DataFrame,
29    predicate: Option<&dyn PhysicalIoExpr>,
30    parallel: bool,
31) -> PolarsResult<()> {
32    if let (Some(predicate), false) = (&predicate, df.get_columns().is_empty()) {
33        let s = predicate.evaluate_io(df)?;
34        let mask = s.bool().expect("filter predicates was not of type boolean");
35
36        if parallel {
37            *df = df.filter(mask)?;
38        } else {
39            *df = df._filter_seq(mask)?;
40        }
41    }
42    Ok(())
43}
44
45/// Statistics of the values in a column.
46///
47/// The following statistics are tracked for each row group:
48/// - Null count
49/// - Minimum value
50/// - Maximum value
51#[derive(Debug, Clone)]
52#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
53pub struct ColumnStats {
54    field: Field,
55    // Each Series contains the stats for each row group.
56    null_count: Option<Series>,
57    min_value: Option<Series>,
58    max_value: Option<Series>,
59}
60
61impl ColumnStats {
62    /// Constructs a new [`ColumnStats`].
63    pub fn new(
64        field: Field,
65        null_count: Option<Series>,
66        min_value: Option<Series>,
67        max_value: Option<Series>,
68    ) -> Self {
69        Self {
70            field,
71            null_count,
72            min_value,
73            max_value,
74        }
75    }
76
77    /// Constructs a new [`ColumnStats`] with only the [`Field`] information and no statistics.
78    pub fn from_field(field: Field) -> Self {
79        Self {
80            field,
81            null_count: None,
82            min_value: None,
83            max_value: None,
84        }
85    }
86
87    /// Constructs a new [`ColumnStats`] from a single-value Series.
88    pub fn from_column_literal(s: Series) -> Self {
89        debug_assert_eq!(s.len(), 1);
90        Self {
91            field: s.field().into_owned(),
92            null_count: None,
93            min_value: Some(s.clone()),
94            max_value: Some(s),
95        }
96    }
97
98    pub fn field_name(&self) -> &PlSmallStr {
99        self.field.name()
100    }
101
102    /// Returns the [`DataType`] of the column.
103    pub fn dtype(&self) -> &DataType {
104        self.field.dtype()
105    }
106
107    /// Returns the null count of each row group of the column.
108    pub fn get_null_count_state(&self) -> Option<&Series> {
109        self.null_count.as_ref()
110    }
111
112    /// Returns the minimum value of each row group of the column.
113    pub fn get_min_state(&self) -> Option<&Series> {
114        self.min_value.as_ref()
115    }
116
117    /// Returns the maximum value of each row group of the column.
118    pub fn get_max_state(&self) -> Option<&Series> {
119        self.max_value.as_ref()
120    }
121
122    /// Returns the null count of the column.
123    pub fn null_count(&self) -> Option<usize> {
124        match self.dtype() {
125            #[cfg(feature = "dtype-struct")]
126            DataType::Struct(_) => None,
127            _ => {
128                let s = self.get_null_count_state()?;
129                // if all null, there are no statistics.
130                if s.null_count() != s.len() {
131                    s.sum().ok()
132                } else {
133                    None
134                }
135            },
136        }
137    }
138
139    /// Returns the minimum and maximum values of the column as a single [`Series`].
140    pub fn to_min_max(&self) -> Option<Series> {
141        let min_val = self.get_min_state()?;
142        let max_val = self.get_max_state()?;
143        let dtype = self.dtype();
144
145        if !use_min_max(dtype) {
146            return None;
147        }
148
149        let mut min_max_values = min_val.clone();
150        min_max_values.append(max_val).unwrap();
151        if min_max_values.null_count() > 0 {
152            None
153        } else {
154            Some(min_max_values)
155        }
156    }
157
158    /// Returns the minimum value of the column as a single-value [`Series`].
159    ///
160    /// Returns `None` if no maximum value is available.
161    pub fn to_min(&self) -> Option<&Series> {
162        // @scalar-opt
163        let min_val = self.min_value.as_ref()?;
164        let dtype = min_val.dtype();
165
166        if !use_min_max(dtype) || min_val.len() != 1 {
167            return None;
168        }
169
170        if min_val.null_count() > 0 {
171            None
172        } else {
173            Some(min_val)
174        }
175    }
176
177    /// Returns the maximum value of the column as a single-value [`Series`].
178    ///
179    /// Returns `None` if no maximum value is available.
180    pub fn to_max(&self) -> Option<&Series> {
181        // @scalar-opt
182        let max_val = self.max_value.as_ref()?;
183        let dtype = max_val.dtype();
184
185        if !use_min_max(dtype) || max_val.len() != 1 {
186            return None;
187        }
188
189        if max_val.null_count() > 0 {
190            None
191        } else {
192            Some(max_val)
193        }
194    }
195}
196
197/// Returns whether the [`DataType`] supports minimum/maximum operations.
198fn use_min_max(dtype: &DataType) -> bool {
199    dtype.is_primitive_numeric()
200        || dtype.is_temporal()
201        || matches!(
202            dtype,
203            DataType::String | DataType::Binary | DataType::Boolean
204        )
205}
206
207/// A collection of column stats with a known schema.
208#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
209#[derive(Debug, Clone)]
210pub struct BatchStats {
211    schema: SchemaRef,
212    stats: Vec<ColumnStats>,
213    // This might not be available, as when pruning hive partitions.
214    num_rows: Option<usize>,
215}
216
217impl Default for BatchStats {
218    fn default() -> Self {
219        Self {
220            schema: Arc::new(Schema::default()),
221            stats: Vec::new(),
222            num_rows: None,
223        }
224    }
225}
226
227impl BatchStats {
228    /// Constructs a new [`BatchStats`].
229    ///
230    /// The `stats` should match the order of the `schema`.
231    pub fn new(schema: SchemaRef, stats: Vec<ColumnStats>, num_rows: Option<usize>) -> Self {
232        Self {
233            schema,
234            stats,
235            num_rows,
236        }
237    }
238
239    /// Returns the [`Schema`] of the batch.
240    pub fn schema(&self) -> &SchemaRef {
241        &self.schema
242    }
243
244    /// Returns the [`ColumnStats`] of all columns in the batch, if known.
245    pub fn column_stats(&self) -> &[ColumnStats] {
246        self.stats.as_ref()
247    }
248
249    /// Returns the [`ColumnStats`] of a single column in the batch.
250    ///
251    /// Returns an `Err` if no statistics are available for the given column.
252    pub fn get_stats(&self, column: &str) -> PolarsResult<&ColumnStats> {
253        self.schema.try_index_of(column).map(|i| &self.stats[i])
254    }
255
256    /// Returns the number of rows in the batch.
257    ///
258    /// Returns `None` if the number of rows is unknown.
259    pub fn num_rows(&self) -> Option<usize> {
260        self.num_rows
261    }
262
263    pub fn with_schema(&mut self, schema: SchemaRef) {
264        self.schema = schema;
265    }
266
267    pub fn take_indices(&mut self, indices: &[usize]) {
268        self.stats = indices.iter().map(|&i| self.stats[i].clone()).collect();
269    }
270}