1use polars_core::prelude::*;
2#[cfg(feature = "serde")]
3use serde::{Deserialize, Serialize};
4
5pub trait PhysicalIoExpr: Send + Sync {
6 fn evaluate_io(&self, df: &DataFrame) -> PolarsResult<Series>;
9
10 fn collect_live_columns(&self, live_columns: &mut PlIndexSet<PlSmallStr>);
13
14 fn as_stats_evaluator(&self) -> Option<&dyn StatsEvaluator> {
18 None
19 }
20}
21
22pub trait StatsEvaluator {
23 fn should_read(&self, stats: &BatchStats) -> PolarsResult<bool>;
24}
25
26#[cfg(any(feature = "parquet", feature = "ipc"))]
27pub fn apply_predicate(
28 df: &mut DataFrame,
29 predicate: Option<&dyn PhysicalIoExpr>,
30 parallel: bool,
31) -> PolarsResult<()> {
32 if let (Some(predicate), false) = (&predicate, df.get_columns().is_empty()) {
33 let s = predicate.evaluate_io(df)?;
34 let mask = s.bool().expect("filter predicates was not of type boolean");
35
36 if parallel {
37 *df = df.filter(mask)?;
38 } else {
39 *df = df._filter_seq(mask)?;
40 }
41 }
42 Ok(())
43}
44
45#[derive(Debug, Clone)]
52#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
53pub struct ColumnStats {
54 field: Field,
55 null_count: Option<Series>,
57 min_value: Option<Series>,
58 max_value: Option<Series>,
59}
60
61impl ColumnStats {
62 pub fn new(
64 field: Field,
65 null_count: Option<Series>,
66 min_value: Option<Series>,
67 max_value: Option<Series>,
68 ) -> Self {
69 Self {
70 field,
71 null_count,
72 min_value,
73 max_value,
74 }
75 }
76
77 pub fn from_field(field: Field) -> Self {
79 Self {
80 field,
81 null_count: None,
82 min_value: None,
83 max_value: None,
84 }
85 }
86
87 pub fn from_column_literal(s: Series) -> Self {
89 debug_assert_eq!(s.len(), 1);
90 Self {
91 field: s.field().into_owned(),
92 null_count: None,
93 min_value: Some(s.clone()),
94 max_value: Some(s),
95 }
96 }
97
98 pub fn field_name(&self) -> &PlSmallStr {
99 self.field.name()
100 }
101
102 pub fn dtype(&self) -> &DataType {
104 self.field.dtype()
105 }
106
107 pub fn get_null_count_state(&self) -> Option<&Series> {
109 self.null_count.as_ref()
110 }
111
112 pub fn get_min_state(&self) -> Option<&Series> {
114 self.min_value.as_ref()
115 }
116
117 pub fn get_max_state(&self) -> Option<&Series> {
119 self.max_value.as_ref()
120 }
121
122 pub fn null_count(&self) -> Option<usize> {
124 match self.dtype() {
125 #[cfg(feature = "dtype-struct")]
126 DataType::Struct(_) => None,
127 _ => {
128 let s = self.get_null_count_state()?;
129 if s.null_count() != s.len() {
131 s.sum().ok()
132 } else {
133 None
134 }
135 },
136 }
137 }
138
139 pub fn to_min_max(&self) -> Option<Series> {
141 let min_val = self.get_min_state()?;
142 let max_val = self.get_max_state()?;
143 let dtype = self.dtype();
144
145 if !use_min_max(dtype) {
146 return None;
147 }
148
149 let mut min_max_values = min_val.clone();
150 min_max_values.append(max_val).unwrap();
151 if min_max_values.null_count() > 0 {
152 None
153 } else {
154 Some(min_max_values)
155 }
156 }
157
158 pub fn to_min(&self) -> Option<&Series> {
162 let min_val = self.min_value.as_ref()?;
164 let dtype = min_val.dtype();
165
166 if !use_min_max(dtype) || min_val.len() != 1 {
167 return None;
168 }
169
170 if min_val.null_count() > 0 {
171 None
172 } else {
173 Some(min_val)
174 }
175 }
176
177 pub fn to_max(&self) -> Option<&Series> {
181 let max_val = self.max_value.as_ref()?;
183 let dtype = max_val.dtype();
184
185 if !use_min_max(dtype) || max_val.len() != 1 {
186 return None;
187 }
188
189 if max_val.null_count() > 0 {
190 None
191 } else {
192 Some(max_val)
193 }
194 }
195}
196
197fn use_min_max(dtype: &DataType) -> bool {
199 dtype.is_primitive_numeric()
200 || dtype.is_temporal()
201 || matches!(
202 dtype,
203 DataType::String | DataType::Binary | DataType::Boolean
204 )
205}
206
207#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
209#[derive(Debug, Clone)]
210pub struct BatchStats {
211 schema: SchemaRef,
212 stats: Vec<ColumnStats>,
213 num_rows: Option<usize>,
215}
216
217impl Default for BatchStats {
218 fn default() -> Self {
219 Self {
220 schema: Arc::new(Schema::default()),
221 stats: Vec::new(),
222 num_rows: None,
223 }
224 }
225}
226
227impl BatchStats {
228 pub fn new(schema: SchemaRef, stats: Vec<ColumnStats>, num_rows: Option<usize>) -> Self {
232 Self {
233 schema,
234 stats,
235 num_rows,
236 }
237 }
238
239 pub fn schema(&self) -> &SchemaRef {
241 &self.schema
242 }
243
244 pub fn column_stats(&self) -> &[ColumnStats] {
246 self.stats.as_ref()
247 }
248
249 pub fn get_stats(&self, column: &str) -> PolarsResult<&ColumnStats> {
253 self.schema.try_index_of(column).map(|i| &self.stats[i])
254 }
255
256 pub fn num_rows(&self) -> Option<usize> {
260 self.num_rows
261 }
262
263 pub fn with_schema(&mut self, schema: SchemaRef) {
264 self.schema = schema;
265 }
266
267 pub fn take_indices(&mut self, indices: &[usize]) {
268 self.stats = indices.iter().map(|&i| self.stats[i].clone()).collect();
269 }
270}