datafusion_common/
stats.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module provides data structures to represent statistics
19
20use std::fmt::{self, Debug, Display};
21
22use crate::{Result, ScalarValue};
23
24use crate::error::_plan_err;
25use arrow::datatypes::{DataType, Schema, SchemaRef};
26
27/// Represents a value with a degree of certainty. `Precision` is used to
28/// propagate information the precision of statistical values.
29#[derive(Clone, PartialEq, Eq, Default, Copy)]
30pub enum Precision<T: Debug + Clone + PartialEq + Eq + PartialOrd> {
31    /// The exact value is known
32    Exact(T),
33    /// The value is not known exactly, but is likely close to this value
34    Inexact(T),
35    /// Nothing is known about the value
36    #[default]
37    Absent,
38}
39
40impl<T: Debug + Clone + PartialEq + Eq + PartialOrd> Precision<T> {
41    /// If we have some value (exact or inexact), it returns that value.
42    /// Otherwise, it returns `None`.
43    pub fn get_value(&self) -> Option<&T> {
44        match self {
45            Precision::Exact(value) | Precision::Inexact(value) => Some(value),
46            Precision::Absent => None,
47        }
48    }
49
50    /// Transform the value in this [`Precision`] object, if one exists, using
51    /// the given function. Preserves the exactness state.
52    pub fn map<U, F>(self, f: F) -> Precision<U>
53    where
54        F: Fn(T) -> U,
55        U: Debug + Clone + PartialEq + Eq + PartialOrd,
56    {
57        match self {
58            Precision::Exact(val) => Precision::Exact(f(val)),
59            Precision::Inexact(val) => Precision::Inexact(f(val)),
60            _ => Precision::<U>::Absent,
61        }
62    }
63
64    /// Returns `Some(true)` if we have an exact value, `Some(false)` if we
65    /// have an inexact value, and `None` if there is no value.
66    pub fn is_exact(&self) -> Option<bool> {
67        match self {
68            Precision::Exact(_) => Some(true),
69            Precision::Inexact(_) => Some(false),
70            _ => None,
71        }
72    }
73
74    /// Returns the maximum of two (possibly inexact) values, conservatively
75    /// propagating exactness information. If one of the input values is
76    /// [`Precision::Absent`], the result is `Absent` too.
77    pub fn max(&self, other: &Precision<T>) -> Precision<T> {
78        match (self, other) {
79            (Precision::Exact(a), Precision::Exact(b)) => {
80                Precision::Exact(if a >= b { a.clone() } else { b.clone() })
81            }
82            (Precision::Inexact(a), Precision::Exact(b))
83            | (Precision::Exact(a), Precision::Inexact(b))
84            | (Precision::Inexact(a), Precision::Inexact(b)) => {
85                Precision::Inexact(if a >= b { a.clone() } else { b.clone() })
86            }
87            (_, _) => Precision::Absent,
88        }
89    }
90
91    /// Returns the minimum of two (possibly inexact) values, conservatively
92    /// propagating exactness information. If one of the input values is
93    /// [`Precision::Absent`], the result is `Absent` too.
94    pub fn min(&self, other: &Precision<T>) -> Precision<T> {
95        match (self, other) {
96            (Precision::Exact(a), Precision::Exact(b)) => {
97                Precision::Exact(if a >= b { b.clone() } else { a.clone() })
98            }
99            (Precision::Inexact(a), Precision::Exact(b))
100            | (Precision::Exact(a), Precision::Inexact(b))
101            | (Precision::Inexact(a), Precision::Inexact(b)) => {
102                Precision::Inexact(if a >= b { b.clone() } else { a.clone() })
103            }
104            (_, _) => Precision::Absent,
105        }
106    }
107
108    /// Demotes the precision state from exact to inexact (if present).
109    pub fn to_inexact(self) -> Self {
110        match self {
111            Precision::Exact(value) => Precision::Inexact(value),
112            _ => self,
113        }
114    }
115}
116
117impl Precision<usize> {
118    /// Calculates the sum of two (possibly inexact) [`usize`] values,
119    /// conservatively propagating exactness information. If one of the input
120    /// values is [`Precision::Absent`], the result is `Absent` too.
121    pub fn add(&self, other: &Precision<usize>) -> Precision<usize> {
122        match (self, other) {
123            (Precision::Exact(a), Precision::Exact(b)) => Precision::Exact(a + b),
124            (Precision::Inexact(a), Precision::Exact(b))
125            | (Precision::Exact(a), Precision::Inexact(b))
126            | (Precision::Inexact(a), Precision::Inexact(b)) => Precision::Inexact(a + b),
127            (_, _) => Precision::Absent,
128        }
129    }
130
131    /// Calculates the difference of two (possibly inexact) [`usize`] values,
132    /// conservatively propagating exactness information. If one of the input
133    /// values is [`Precision::Absent`], the result is `Absent` too.
134    pub fn sub(&self, other: &Precision<usize>) -> Precision<usize> {
135        match (self, other) {
136            (Precision::Exact(a), Precision::Exact(b)) => Precision::Exact(a - b),
137            (Precision::Inexact(a), Precision::Exact(b))
138            | (Precision::Exact(a), Precision::Inexact(b))
139            | (Precision::Inexact(a), Precision::Inexact(b)) => Precision::Inexact(a - b),
140            (_, _) => Precision::Absent,
141        }
142    }
143
144    /// Calculates the multiplication of two (possibly inexact) [`usize`] values,
145    /// conservatively propagating exactness information. If one of the input
146    /// values is [`Precision::Absent`], the result is `Absent` too.
147    pub fn multiply(&self, other: &Precision<usize>) -> Precision<usize> {
148        match (self, other) {
149            (Precision::Exact(a), Precision::Exact(b)) => Precision::Exact(a * b),
150            (Precision::Inexact(a), Precision::Exact(b))
151            | (Precision::Exact(a), Precision::Inexact(b))
152            | (Precision::Inexact(a), Precision::Inexact(b)) => Precision::Inexact(a * b),
153            (_, _) => Precision::Absent,
154        }
155    }
156
157    /// Return the estimate of applying a filter with estimated selectivity
158    /// `selectivity` to this Precision. A selectivity of `1.0` means that all
159    /// rows are selected. A selectivity of `0.5` means half the rows are
160    /// selected. Will always return inexact statistics.
161    pub fn with_estimated_selectivity(self, selectivity: f64) -> Self {
162        self.map(|v| ((v as f64 * selectivity).ceil()) as usize)
163            .to_inexact()
164    }
165}
166
167impl Precision<ScalarValue> {
168    /// Calculates the sum of two (possibly inexact) [`ScalarValue`] values,
169    /// conservatively propagating exactness information. If one of the input
170    /// values is [`Precision::Absent`], the result is `Absent` too.
171    pub fn add(&self, other: &Precision<ScalarValue>) -> Precision<ScalarValue> {
172        match (self, other) {
173            (Precision::Exact(a), Precision::Exact(b)) => {
174                a.add(b).map(Precision::Exact).unwrap_or(Precision::Absent)
175            }
176            (Precision::Inexact(a), Precision::Exact(b))
177            | (Precision::Exact(a), Precision::Inexact(b))
178            | (Precision::Inexact(a), Precision::Inexact(b)) => a
179                .add(b)
180                .map(Precision::Inexact)
181                .unwrap_or(Precision::Absent),
182            (_, _) => Precision::Absent,
183        }
184    }
185
186    /// Calculates the difference of two (possibly inexact) [`ScalarValue`] values,
187    /// conservatively propagating exactness information. If one of the input
188    /// values is [`Precision::Absent`], the result is `Absent` too.
189    pub fn sub(&self, other: &Precision<ScalarValue>) -> Precision<ScalarValue> {
190        match (self, other) {
191            (Precision::Exact(a), Precision::Exact(b)) => {
192                a.sub(b).map(Precision::Exact).unwrap_or(Precision::Absent)
193            }
194            (Precision::Inexact(a), Precision::Exact(b))
195            | (Precision::Exact(a), Precision::Inexact(b))
196            | (Precision::Inexact(a), Precision::Inexact(b)) => a
197                .sub(b)
198                .map(Precision::Inexact)
199                .unwrap_or(Precision::Absent),
200            (_, _) => Precision::Absent,
201        }
202    }
203
204    /// Calculates the multiplication of two (possibly inexact) [`ScalarValue`] values,
205    /// conservatively propagating exactness information. If one of the input
206    /// values is [`Precision::Absent`], the result is `Absent` too.
207    pub fn multiply(&self, other: &Precision<ScalarValue>) -> Precision<ScalarValue> {
208        match (self, other) {
209            (Precision::Exact(a), Precision::Exact(b)) => a
210                .mul_checked(b)
211                .map(Precision::Exact)
212                .unwrap_or(Precision::Absent),
213            (Precision::Inexact(a), Precision::Exact(b))
214            | (Precision::Exact(a), Precision::Inexact(b))
215            | (Precision::Inexact(a), Precision::Inexact(b)) => a
216                .mul_checked(b)
217                .map(Precision::Inexact)
218                .unwrap_or(Precision::Absent),
219            (_, _) => Precision::Absent,
220        }
221    }
222
223    /// Casts the value to the given data type, propagating exactness information.
224    pub fn cast_to(&self, data_type: &DataType) -> Result<Precision<ScalarValue>> {
225        match self {
226            Precision::Exact(value) => value.cast_to(data_type).map(Precision::Exact),
227            Precision::Inexact(value) => value.cast_to(data_type).map(Precision::Inexact),
228            Precision::Absent => Ok(Precision::Absent),
229        }
230    }
231}
232
233impl<T: Debug + Clone + PartialEq + Eq + PartialOrd> Debug for Precision<T> {
234    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
235        match self {
236            Precision::Exact(inner) => write!(f, "Exact({:?})", inner),
237            Precision::Inexact(inner) => write!(f, "Inexact({:?})", inner),
238            Precision::Absent => write!(f, "Absent"),
239        }
240    }
241}
242
243impl<T: Debug + Clone + PartialEq + Eq + PartialOrd> Display for Precision<T> {
244    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
245        match self {
246            Precision::Exact(inner) => write!(f, "Exact({:?})", inner),
247            Precision::Inexact(inner) => write!(f, "Inexact({:?})", inner),
248            Precision::Absent => write!(f, "Absent"),
249        }
250    }
251}
252
253impl From<Precision<usize>> for Precision<ScalarValue> {
254    fn from(value: Precision<usize>) -> Self {
255        match value {
256            Precision::Exact(v) => Precision::Exact(ScalarValue::UInt64(Some(v as u64))),
257            Precision::Inexact(v) => {
258                Precision::Inexact(ScalarValue::UInt64(Some(v as u64)))
259            }
260            Precision::Absent => Precision::Absent,
261        }
262    }
263}
264
265/// Statistics for a relation
266/// Fields are optional and can be inexact because the sources
267/// sometimes provide approximate estimates for performance reasons
268/// and the transformations output are not always predictable.
269#[derive(Debug, Clone, PartialEq, Eq)]
270pub struct Statistics {
271    /// The number of table rows.
272    pub num_rows: Precision<usize>,
273    /// Total bytes of the table rows.
274    pub total_byte_size: Precision<usize>,
275    /// Statistics on a column level.
276    ///
277    /// It must contains a [`ColumnStatistics`] for each field in the schema of
278    /// the table to which the [`Statistics`] refer.
279    pub column_statistics: Vec<ColumnStatistics>,
280}
281
282impl Default for Statistics {
283    /// Returns a new [`Statistics`] instance with all fields set to unknown
284    /// and no columns.
285    fn default() -> Self {
286        Self {
287            num_rows: Precision::Absent,
288            total_byte_size: Precision::Absent,
289            column_statistics: vec![],
290        }
291    }
292}
293
294impl Statistics {
295    /// Returns a [`Statistics`] instance for the given schema by assigning
296    /// unknown statistics to each column in the schema.
297    pub fn new_unknown(schema: &Schema) -> Self {
298        Self {
299            num_rows: Precision::Absent,
300            total_byte_size: Precision::Absent,
301            column_statistics: Statistics::unknown_column(schema),
302        }
303    }
304
305    /// Returns an unbounded `ColumnStatistics` for each field in the schema.
306    pub fn unknown_column(schema: &Schema) -> Vec<ColumnStatistics> {
307        schema
308            .fields()
309            .iter()
310            .map(|_| ColumnStatistics::new_unknown())
311            .collect()
312    }
313
314    /// Set the number of rows
315    pub fn with_num_rows(mut self, num_rows: Precision<usize>) -> Self {
316        self.num_rows = num_rows;
317        self
318    }
319
320    /// Set the total size, in bytes
321    pub fn with_total_byte_size(mut self, total_byte_size: Precision<usize>) -> Self {
322        self.total_byte_size = total_byte_size;
323        self
324    }
325
326    /// Add a column to the column statistics
327    pub fn add_column_statistics(mut self, column_stats: ColumnStatistics) -> Self {
328        self.column_statistics.push(column_stats);
329        self
330    }
331
332    /// If the exactness of a [`Statistics`] instance is lost, this function relaxes
333    /// the exactness of all information by converting them [`Precision::Inexact`].
334    pub fn to_inexact(mut self) -> Self {
335        self.num_rows = self.num_rows.to_inexact();
336        self.total_byte_size = self.total_byte_size.to_inexact();
337        self.column_statistics = self
338            .column_statistics
339            .into_iter()
340            .map(|s| s.to_inexact())
341            .collect();
342        self
343    }
344
345    /// Project the statistics to the given column indices.
346    ///
347    /// For example, if we had statistics for columns `{"a", "b", "c"}`,
348    /// projecting to `vec![2, 1]` would return statistics for columns `{"c",
349    /// "b"}`.
350    pub fn project(mut self, projection: Option<&Vec<usize>>) -> Self {
351        let Some(projection) = projection else {
352            return self;
353        };
354
355        enum Slot {
356            /// The column is taken and put into the specified statistics location
357            Taken(usize),
358            /// The original columns is present
359            Present(ColumnStatistics),
360        }
361
362        // Convert to Vec<Slot> so we can avoid copying the statistics
363        let mut columns: Vec<_> = std::mem::take(&mut self.column_statistics)
364            .into_iter()
365            .map(Slot::Present)
366            .collect();
367
368        for idx in projection {
369            let next_idx = self.column_statistics.len();
370            let slot = std::mem::replace(
371                columns.get_mut(*idx).expect("projection out of bounds"),
372                Slot::Taken(next_idx),
373            );
374            match slot {
375                // The column was there, so just move it
376                Slot::Present(col) => self.column_statistics.push(col),
377                // The column was taken, so copy from the previous location
378                Slot::Taken(prev_idx) => self
379                    .column_statistics
380                    .push(self.column_statistics[prev_idx].clone()),
381            }
382        }
383
384        self
385    }
386
387    /// Calculates the statistics after applying `fetch` and `skip` operations.
388    ///
389    /// Here, `self` denotes per-partition statistics. Use the `n_partitions`
390    /// parameter to compute global statistics in a multi-partition setting.
391    pub fn with_fetch(
392        mut self,
393        schema: SchemaRef,
394        fetch: Option<usize>,
395        skip: usize,
396        n_partitions: usize,
397    ) -> Result<Self> {
398        let fetch_val = fetch.unwrap_or(usize::MAX);
399
400        self.num_rows = match self {
401            Statistics {
402                num_rows: Precision::Exact(nr),
403                ..
404            }
405            | Statistics {
406                num_rows: Precision::Inexact(nr),
407                ..
408            } => {
409                // Here, the inexact case gives us an upper bound on the number of rows.
410                if nr <= skip {
411                    // All input data will be skipped:
412                    Precision::Exact(0)
413                } else if nr <= fetch_val && skip == 0 {
414                    // If the input does not reach the `fetch` globally, and `skip`
415                    // is zero (meaning the input and output are identical), return
416                    // input stats as is.
417                    // TODO: Can input stats still be used, but adjusted, when `skip`
418                    //       is non-zero?
419                    return Ok(self);
420                } else if nr - skip <= fetch_val {
421                    // After `skip` input rows are skipped, the remaining rows are
422                    // less than or equal to the `fetch` values, so `num_rows` must
423                    // equal the remaining rows.
424                    check_num_rows(
425                        (nr - skip).checked_mul(n_partitions),
426                        // We know that we have an estimate for the number of rows:
427                        self.num_rows.is_exact().unwrap(),
428                    )
429                } else {
430                    // At this point we know that we were given a `fetch` value
431                    // as the `None` case would go into the branch above. Since
432                    // the input has more rows than `fetch + skip`, the number
433                    // of rows will be the `fetch`, but we won't be able to
434                    // predict the other statistics.
435                    check_num_rows(
436                        fetch_val.checked_mul(n_partitions),
437                        // We know that we have an estimate for the number of rows:
438                        self.num_rows.is_exact().unwrap(),
439                    )
440                }
441            }
442            Statistics {
443                num_rows: Precision::Absent,
444                ..
445            } => check_num_rows(fetch.and_then(|v| v.checked_mul(n_partitions)), false),
446        };
447        self.column_statistics = Statistics::unknown_column(&schema);
448        self.total_byte_size = Precision::Absent;
449        Ok(self)
450    }
451
452    /// Summarize zero or more statistics into a single `Statistics` instance.
453    ///
454    /// Returns an error if the statistics do not match the specified schemas.
455    pub fn try_merge_iter<'a, I>(items: I, schema: &Schema) -> Result<Statistics>
456    where
457        I: IntoIterator<Item = &'a Statistics>,
458    {
459        let mut items = items.into_iter();
460
461        let Some(init) = items.next() else {
462            return Ok(Statistics::new_unknown(schema));
463        };
464        items.try_fold(init.clone(), |acc: Statistics, item_stats: &Statistics| {
465            acc.try_merge(item_stats)
466        })
467    }
468
469    /// Merge this Statistics value with another Statistics value.
470    ///
471    /// Returns an error if the statistics do not match (different schemas).
472    ///
473    /// # Example
474    /// ```
475    /// # use datafusion_common::{ColumnStatistics, ScalarValue, Statistics};
476    /// # use arrow::datatypes::{Field, Schema, DataType};
477    /// # use datafusion_common::stats::Precision;
478    /// let stats1 = Statistics::default()
479    ///   .with_num_rows(Precision::Exact(1))
480    ///   .with_total_byte_size(Precision::Exact(2))
481    ///   .add_column_statistics(ColumnStatistics::new_unknown()
482    ///      .with_null_count(Precision::Exact(3))
483    ///      .with_min_value(Precision::Exact(ScalarValue::from(4)))
484    ///      .with_max_value(Precision::Exact(ScalarValue::from(5)))
485    ///   );
486    ///
487    /// let stats2 = Statistics::default()
488    ///   .with_num_rows(Precision::Exact(10))
489    ///   .with_total_byte_size(Precision::Inexact(20))
490    ///   .add_column_statistics(ColumnStatistics::new_unknown()
491    ///       // absent null count
492    ///      .with_min_value(Precision::Exact(ScalarValue::from(40)))
493    ///      .with_max_value(Precision::Exact(ScalarValue::from(50)))
494    ///   );
495    ///
496    /// let merged_stats = stats1.try_merge(&stats2).unwrap();
497    /// let expected_stats = Statistics::default()
498    ///   .with_num_rows(Precision::Exact(11))
499    ///   .with_total_byte_size(Precision::Inexact(22)) // inexact in stats2 --> inexact
500    ///   .add_column_statistics(
501    ///     ColumnStatistics::new_unknown()
502    ///       .with_null_count(Precision::Absent) // missing from stats2 --> absent
503    ///       .with_min_value(Precision::Exact(ScalarValue::from(4)))
504    ///       .with_max_value(Precision::Exact(ScalarValue::from(50)))
505    ///   );
506    ///
507    /// assert_eq!(merged_stats, expected_stats)
508    /// ```
509    pub fn try_merge(self, other: &Statistics) -> Result<Self> {
510        let Self {
511            mut num_rows,
512            mut total_byte_size,
513            mut column_statistics,
514        } = self;
515
516        // Accumulate statistics for subsequent items
517        num_rows = num_rows.add(&other.num_rows);
518        total_byte_size = total_byte_size.add(&other.total_byte_size);
519
520        if column_statistics.len() != other.column_statistics.len() {
521            return _plan_err!(
522                "Cannot merge statistics with different number of columns: {} vs {}",
523                column_statistics.len(),
524                other.column_statistics.len()
525            );
526        }
527
528        for (item_col_stats, col_stats) in other
529            .column_statistics
530            .iter()
531            .zip(column_statistics.iter_mut())
532        {
533            col_stats.null_count = col_stats.null_count.add(&item_col_stats.null_count);
534            col_stats.max_value = col_stats.max_value.max(&item_col_stats.max_value);
535            col_stats.min_value = col_stats.min_value.min(&item_col_stats.min_value);
536            col_stats.sum_value = col_stats.sum_value.add(&item_col_stats.sum_value);
537        }
538
539        Ok(Statistics {
540            num_rows,
541            total_byte_size,
542            column_statistics,
543        })
544    }
545}
546
547/// Creates an estimate of the number of rows in the output using the given
548/// optional value and exactness flag.
549fn check_num_rows(value: Option<usize>, is_exact: bool) -> Precision<usize> {
550    if let Some(value) = value {
551        if is_exact {
552            Precision::Exact(value)
553        } else {
554            // If the input stats are inexact, so are the output stats.
555            Precision::Inexact(value)
556        }
557    } else {
558        // If the estimate is not available (e.g. due to an overflow), we can
559        // not produce a reliable estimate.
560        Precision::Absent
561    }
562}
563
564impl Display for Statistics {
565    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
566        // string of column statistics
567        let column_stats = self
568            .column_statistics
569            .iter()
570            .enumerate()
571            .map(|(i, cs)| {
572                let s = format!("(Col[{}]:", i);
573                let s = if cs.min_value != Precision::Absent {
574                    format!("{} Min={}", s, cs.min_value)
575                } else {
576                    s
577                };
578                let s = if cs.max_value != Precision::Absent {
579                    format!("{} Max={}", s, cs.max_value)
580                } else {
581                    s
582                };
583                let s = if cs.sum_value != Precision::Absent {
584                    format!("{} Sum={}", s, cs.sum_value)
585                } else {
586                    s
587                };
588                let s = if cs.null_count != Precision::Absent {
589                    format!("{} Null={}", s, cs.null_count)
590                } else {
591                    s
592                };
593                let s = if cs.distinct_count != Precision::Absent {
594                    format!("{} Distinct={}", s, cs.distinct_count)
595                } else {
596                    s
597                };
598
599                s + ")"
600            })
601            .collect::<Vec<_>>()
602            .join(",");
603
604        write!(
605            f,
606            "Rows={}, Bytes={}, [{}]",
607            self.num_rows, self.total_byte_size, column_stats
608        )?;
609
610        Ok(())
611    }
612}
613
614/// Statistics for a column within a relation
615#[derive(Clone, Debug, PartialEq, Eq, Default)]
616pub struct ColumnStatistics {
617    /// Number of null values on column
618    pub null_count: Precision<usize>,
619    /// Maximum value of column
620    pub max_value: Precision<ScalarValue>,
621    /// Minimum value of column
622    pub min_value: Precision<ScalarValue>,
623    /// Sum value of a column
624    pub sum_value: Precision<ScalarValue>,
625    /// Number of distinct values
626    pub distinct_count: Precision<usize>,
627}
628
629impl ColumnStatistics {
630    /// Column contains a single non null value (e.g constant).
631    pub fn is_singleton(&self) -> bool {
632        match (&self.min_value, &self.max_value) {
633            // Min and max values are the same and not infinity.
634            (Precision::Exact(min), Precision::Exact(max)) => {
635                !min.is_null() && !max.is_null() && (min == max)
636            }
637            (_, _) => false,
638        }
639    }
640
641    /// Returns a [`ColumnStatistics`] instance having all [`Precision::Absent`] parameters.
642    pub fn new_unknown() -> Self {
643        Self {
644            null_count: Precision::Absent,
645            max_value: Precision::Absent,
646            min_value: Precision::Absent,
647            sum_value: Precision::Absent,
648            distinct_count: Precision::Absent,
649        }
650    }
651
652    /// Set the null count
653    pub fn with_null_count(mut self, null_count: Precision<usize>) -> Self {
654        self.null_count = null_count;
655        self
656    }
657
658    /// Set the max value
659    pub fn with_max_value(mut self, max_value: Precision<ScalarValue>) -> Self {
660        self.max_value = max_value;
661        self
662    }
663
664    /// Set the min value
665    pub fn with_min_value(mut self, min_value: Precision<ScalarValue>) -> Self {
666        self.min_value = min_value;
667        self
668    }
669
670    /// Set the sum value
671    pub fn with_sum_value(mut self, sum_value: Precision<ScalarValue>) -> Self {
672        self.sum_value = sum_value;
673        self
674    }
675
676    /// Set the distinct count
677    pub fn with_distinct_count(mut self, distinct_count: Precision<usize>) -> Self {
678        self.distinct_count = distinct_count;
679        self
680    }
681
682    /// If the exactness of a [`ColumnStatistics`] instance is lost, this
683    /// function relaxes the exactness of all information by converting them
684    /// [`Precision::Inexact`].
685    pub fn to_inexact(mut self) -> Self {
686        self.null_count = self.null_count.to_inexact();
687        self.max_value = self.max_value.to_inexact();
688        self.min_value = self.min_value.to_inexact();
689        self.sum_value = self.sum_value.to_inexact();
690        self.distinct_count = self.distinct_count.to_inexact();
691        self
692    }
693}
694
695#[cfg(test)]
696mod tests {
697    use super::*;
698    use crate::assert_contains;
699    use arrow::datatypes::Field;
700    use std::sync::Arc;
701
702    #[test]
703    fn test_get_value() {
704        let exact_precision = Precision::Exact(42);
705        let inexact_precision = Precision::Inexact(23);
706        let absent_precision = Precision::<i32>::Absent;
707
708        assert_eq!(*exact_precision.get_value().unwrap(), 42);
709        assert_eq!(*inexact_precision.get_value().unwrap(), 23);
710        assert_eq!(absent_precision.get_value(), None);
711    }
712
713    #[test]
714    fn test_map() {
715        let exact_precision = Precision::Exact(42);
716        let inexact_precision = Precision::Inexact(23);
717        let absent_precision = Precision::Absent;
718
719        let squared = |x| x * x;
720
721        assert_eq!(exact_precision.map(squared), Precision::Exact(1764));
722        assert_eq!(inexact_precision.map(squared), Precision::Inexact(529));
723        assert_eq!(absent_precision.map(squared), Precision::Absent);
724    }
725
726    #[test]
727    fn test_is_exact() {
728        let exact_precision = Precision::Exact(42);
729        let inexact_precision = Precision::Inexact(23);
730        let absent_precision = Precision::<i32>::Absent;
731
732        assert_eq!(exact_precision.is_exact(), Some(true));
733        assert_eq!(inexact_precision.is_exact(), Some(false));
734        assert_eq!(absent_precision.is_exact(), None);
735    }
736
737    #[test]
738    fn test_max() {
739        let precision1 = Precision::Exact(42);
740        let precision2 = Precision::Inexact(23);
741        let precision3 = Precision::Exact(30);
742        let absent_precision = Precision::Absent;
743
744        assert_eq!(precision1.max(&precision2), Precision::Inexact(42));
745        assert_eq!(precision1.max(&precision3), Precision::Exact(42));
746        assert_eq!(precision2.max(&precision3), Precision::Inexact(30));
747        assert_eq!(precision1.max(&absent_precision), Precision::Absent);
748    }
749
750    #[test]
751    fn test_min() {
752        let precision1 = Precision::Exact(42);
753        let precision2 = Precision::Inexact(23);
754        let precision3 = Precision::Exact(30);
755        let absent_precision = Precision::Absent;
756
757        assert_eq!(precision1.min(&precision2), Precision::Inexact(23));
758        assert_eq!(precision1.min(&precision3), Precision::Exact(30));
759        assert_eq!(precision2.min(&precision3), Precision::Inexact(23));
760        assert_eq!(precision1.min(&absent_precision), Precision::Absent);
761    }
762
763    #[test]
764    fn test_to_inexact() {
765        let exact_precision = Precision::Exact(42);
766        let inexact_precision = Precision::Inexact(42);
767        let absent_precision = Precision::<i32>::Absent;
768
769        assert_eq!(exact_precision.to_inexact(), inexact_precision);
770        assert_eq!(inexact_precision.to_inexact(), inexact_precision);
771        assert_eq!(absent_precision.to_inexact(), absent_precision);
772    }
773
774    #[test]
775    fn test_add() {
776        let precision1 = Precision::Exact(42);
777        let precision2 = Precision::Inexact(23);
778        let precision3 = Precision::Exact(30);
779        let absent_precision = Precision::Absent;
780
781        assert_eq!(precision1.add(&precision2), Precision::Inexact(65));
782        assert_eq!(precision1.add(&precision3), Precision::Exact(72));
783        assert_eq!(precision2.add(&precision3), Precision::Inexact(53));
784        assert_eq!(precision1.add(&absent_precision), Precision::Absent);
785    }
786
787    #[test]
788    fn test_add_scalar() {
789        let precision = Precision::Exact(ScalarValue::Int32(Some(42)));
790
791        assert_eq!(
792            precision.add(&Precision::Exact(ScalarValue::Int32(Some(23)))),
793            Precision::Exact(ScalarValue::Int32(Some(65))),
794        );
795        assert_eq!(
796            precision.add(&Precision::Inexact(ScalarValue::Int32(Some(23)))),
797            Precision::Inexact(ScalarValue::Int32(Some(65))),
798        );
799        assert_eq!(
800            precision.add(&Precision::Exact(ScalarValue::Int32(None))),
801            // As per behavior of ScalarValue::add
802            Precision::Exact(ScalarValue::Int32(None)),
803        );
804        assert_eq!(precision.add(&Precision::Absent), Precision::Absent);
805    }
806
807    #[test]
808    fn test_sub() {
809        let precision1 = Precision::Exact(42);
810        let precision2 = Precision::Inexact(23);
811        let precision3 = Precision::Exact(30);
812        let absent_precision = Precision::Absent;
813
814        assert_eq!(precision1.sub(&precision2), Precision::Inexact(19));
815        assert_eq!(precision1.sub(&precision3), Precision::Exact(12));
816        assert_eq!(precision1.sub(&absent_precision), Precision::Absent);
817    }
818
819    #[test]
820    fn test_sub_scalar() {
821        let precision = Precision::Exact(ScalarValue::Int32(Some(42)));
822
823        assert_eq!(
824            precision.sub(&Precision::Exact(ScalarValue::Int32(Some(23)))),
825            Precision::Exact(ScalarValue::Int32(Some(19))),
826        );
827        assert_eq!(
828            precision.sub(&Precision::Inexact(ScalarValue::Int32(Some(23)))),
829            Precision::Inexact(ScalarValue::Int32(Some(19))),
830        );
831        assert_eq!(
832            precision.sub(&Precision::Exact(ScalarValue::Int32(None))),
833            // As per behavior of ScalarValue::sub
834            Precision::Exact(ScalarValue::Int32(None)),
835        );
836        assert_eq!(precision.sub(&Precision::Absent), Precision::Absent);
837    }
838
839    #[test]
840    fn test_multiply() {
841        let precision1 = Precision::Exact(6);
842        let precision2 = Precision::Inexact(3);
843        let precision3 = Precision::Exact(5);
844        let absent_precision = Precision::Absent;
845
846        assert_eq!(precision1.multiply(&precision2), Precision::Inexact(18));
847        assert_eq!(precision1.multiply(&precision3), Precision::Exact(30));
848        assert_eq!(precision2.multiply(&precision3), Precision::Inexact(15));
849        assert_eq!(precision1.multiply(&absent_precision), Precision::Absent);
850    }
851
852    #[test]
853    fn test_multiply_scalar() {
854        let precision = Precision::Exact(ScalarValue::Int32(Some(6)));
855
856        assert_eq!(
857            precision.multiply(&Precision::Exact(ScalarValue::Int32(Some(5)))),
858            Precision::Exact(ScalarValue::Int32(Some(30))),
859        );
860        assert_eq!(
861            precision.multiply(&Precision::Inexact(ScalarValue::Int32(Some(5)))),
862            Precision::Inexact(ScalarValue::Int32(Some(30))),
863        );
864        assert_eq!(
865            precision.multiply(&Precision::Exact(ScalarValue::Int32(None))),
866            // As per behavior of ScalarValue::mul_checked
867            Precision::Exact(ScalarValue::Int32(None)),
868        );
869        assert_eq!(precision.multiply(&Precision::Absent), Precision::Absent);
870    }
871
872    #[test]
873    fn test_cast_to() {
874        // Valid
875        assert_eq!(
876            Precision::Exact(ScalarValue::Int32(Some(42)))
877                .cast_to(&DataType::Int64)
878                .unwrap(),
879            Precision::Exact(ScalarValue::Int64(Some(42))),
880        );
881        assert_eq!(
882            Precision::Inexact(ScalarValue::Int32(Some(42)))
883                .cast_to(&DataType::Int64)
884                .unwrap(),
885            Precision::Inexact(ScalarValue::Int64(Some(42))),
886        );
887        // Null
888        assert_eq!(
889            Precision::Exact(ScalarValue::Int32(None))
890                .cast_to(&DataType::Int64)
891                .unwrap(),
892            Precision::Exact(ScalarValue::Int64(None)),
893        );
894        // Overflow returns error
895        assert!(Precision::Exact(ScalarValue::Int32(Some(256)))
896            .cast_to(&DataType::Int8)
897            .is_err());
898    }
899
900    #[test]
901    fn test_precision_cloning() {
902        // Precision<usize> is copy
903        let precision: Precision<usize> = Precision::Exact(42);
904        let p2 = precision;
905        assert_eq!(precision, p2);
906
907        // Precision<ScalarValue> is not copy (requires .clone())
908        let precision: Precision<ScalarValue> =
909            Precision::Exact(ScalarValue::Int64(Some(42)));
910        // Clippy would complain about this if it were Copy
911        #[allow(clippy::redundant_clone)]
912        let p2 = precision.clone();
913        assert_eq!(precision, p2);
914    }
915
916    #[test]
917    fn test_project_none() {
918        let projection = None;
919        let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
920        assert_eq!(stats, make_stats(vec![10, 20, 30]));
921    }
922
923    #[test]
924    fn test_project_empty() {
925        let projection = Some(vec![]);
926        let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
927        assert_eq!(stats, make_stats(vec![]));
928    }
929
930    #[test]
931    fn test_project_swap() {
932        let projection = Some(vec![2, 1]);
933        let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
934        assert_eq!(stats, make_stats(vec![30, 20]));
935    }
936
937    #[test]
938    fn test_project_repeated() {
939        let projection = Some(vec![1, 2, 1, 1, 0, 2]);
940        let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
941        assert_eq!(stats, make_stats(vec![20, 30, 20, 20, 10, 30]));
942    }
943
944    // Make a Statistics structure with the specified null counts for each column
945    fn make_stats(counts: impl IntoIterator<Item = usize>) -> Statistics {
946        Statistics {
947            num_rows: Precision::Exact(42),
948            total_byte_size: Precision::Exact(500),
949            column_statistics: counts.into_iter().map(col_stats_i64).collect(),
950        }
951    }
952
953    fn col_stats_i64(null_count: usize) -> ColumnStatistics {
954        ColumnStatistics {
955            null_count: Precision::Exact(null_count),
956            max_value: Precision::Exact(ScalarValue::Int64(Some(42))),
957            min_value: Precision::Exact(ScalarValue::Int64(Some(64))),
958            sum_value: Precision::Exact(ScalarValue::Int64(Some(4600))),
959            distinct_count: Precision::Exact(100),
960        }
961    }
962
963    #[test]
964    fn test_try_merge_basic() {
965        // Create a schema with two columns
966        let schema = Arc::new(Schema::new(vec![
967            Field::new("col1", DataType::Int32, false),
968            Field::new("col2", DataType::Int32, false),
969        ]));
970
971        // Create items with statistics
972        let stats1 = Statistics {
973            num_rows: Precision::Exact(10),
974            total_byte_size: Precision::Exact(100),
975            column_statistics: vec![
976                ColumnStatistics {
977                    null_count: Precision::Exact(1),
978                    max_value: Precision::Exact(ScalarValue::Int32(Some(100))),
979                    min_value: Precision::Exact(ScalarValue::Int32(Some(1))),
980                    sum_value: Precision::Exact(ScalarValue::Int32(Some(500))),
981                    distinct_count: Precision::Absent,
982                },
983                ColumnStatistics {
984                    null_count: Precision::Exact(2),
985                    max_value: Precision::Exact(ScalarValue::Int32(Some(200))),
986                    min_value: Precision::Exact(ScalarValue::Int32(Some(10))),
987                    sum_value: Precision::Exact(ScalarValue::Int32(Some(1000))),
988                    distinct_count: Precision::Absent,
989                },
990            ],
991        };
992
993        let stats2 = Statistics {
994            num_rows: Precision::Exact(15),
995            total_byte_size: Precision::Exact(150),
996            column_statistics: vec![
997                ColumnStatistics {
998                    null_count: Precision::Exact(2),
999                    max_value: Precision::Exact(ScalarValue::Int32(Some(120))),
1000                    min_value: Precision::Exact(ScalarValue::Int32(Some(-10))),
1001                    sum_value: Precision::Exact(ScalarValue::Int32(Some(600))),
1002                    distinct_count: Precision::Absent,
1003                },
1004                ColumnStatistics {
1005                    null_count: Precision::Exact(3),
1006                    max_value: Precision::Exact(ScalarValue::Int32(Some(180))),
1007                    min_value: Precision::Exact(ScalarValue::Int32(Some(5))),
1008                    sum_value: Precision::Exact(ScalarValue::Int32(Some(1200))),
1009                    distinct_count: Precision::Absent,
1010                },
1011            ],
1012        };
1013
1014        let items = vec![stats1, stats2];
1015
1016        let summary_stats = Statistics::try_merge_iter(&items, &schema).unwrap();
1017
1018        // Verify the results
1019        assert_eq!(summary_stats.num_rows, Precision::Exact(25)); // 10 + 15
1020        assert_eq!(summary_stats.total_byte_size, Precision::Exact(250)); // 100 + 150
1021
1022        // Verify column statistics
1023        let col1_stats = &summary_stats.column_statistics[0];
1024        assert_eq!(col1_stats.null_count, Precision::Exact(3)); // 1 + 2
1025        assert_eq!(
1026            col1_stats.max_value,
1027            Precision::Exact(ScalarValue::Int32(Some(120)))
1028        );
1029        assert_eq!(
1030            col1_stats.min_value,
1031            Precision::Exact(ScalarValue::Int32(Some(-10)))
1032        );
1033        assert_eq!(
1034            col1_stats.sum_value,
1035            Precision::Exact(ScalarValue::Int32(Some(1100)))
1036        ); // 500 + 600
1037
1038        let col2_stats = &summary_stats.column_statistics[1];
1039        assert_eq!(col2_stats.null_count, Precision::Exact(5)); // 2 + 3
1040        assert_eq!(
1041            col2_stats.max_value,
1042            Precision::Exact(ScalarValue::Int32(Some(200)))
1043        );
1044        assert_eq!(
1045            col2_stats.min_value,
1046            Precision::Exact(ScalarValue::Int32(Some(5)))
1047        );
1048        assert_eq!(
1049            col2_stats.sum_value,
1050            Precision::Exact(ScalarValue::Int32(Some(2200)))
1051        ); // 1000 + 1200
1052    }
1053
1054    #[test]
1055    fn test_try_merge_mixed_precision() {
1056        // Create a schema with one column
1057        let schema = Arc::new(Schema::new(vec![Field::new(
1058            "col1",
1059            DataType::Int32,
1060            false,
1061        )]));
1062
1063        // Create items with different precision levels
1064        let stats1 = Statistics {
1065            num_rows: Precision::Exact(10),
1066            total_byte_size: Precision::Inexact(100),
1067            column_statistics: vec![ColumnStatistics {
1068                null_count: Precision::Exact(1),
1069                max_value: Precision::Exact(ScalarValue::Int32(Some(100))),
1070                min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1071                sum_value: Precision::Exact(ScalarValue::Int32(Some(500))),
1072                distinct_count: Precision::Absent,
1073            }],
1074        };
1075
1076        let stats2 = Statistics {
1077            num_rows: Precision::Inexact(15),
1078            total_byte_size: Precision::Exact(150),
1079            column_statistics: vec![ColumnStatistics {
1080                null_count: Precision::Inexact(2),
1081                max_value: Precision::Inexact(ScalarValue::Int32(Some(120))),
1082                min_value: Precision::Exact(ScalarValue::Int32(Some(-10))),
1083                sum_value: Precision::Absent,
1084                distinct_count: Precision::Absent,
1085            }],
1086        };
1087
1088        let items = vec![stats1, stats2];
1089
1090        let summary_stats = Statistics::try_merge_iter(&items, &schema).unwrap();
1091
1092        assert_eq!(summary_stats.num_rows, Precision::Inexact(25));
1093        assert_eq!(summary_stats.total_byte_size, Precision::Inexact(250));
1094
1095        let col_stats = &summary_stats.column_statistics[0];
1096        assert_eq!(col_stats.null_count, Precision::Inexact(3));
1097        assert_eq!(
1098            col_stats.max_value,
1099            Precision::Inexact(ScalarValue::Int32(Some(120)))
1100        );
1101        assert_eq!(
1102            col_stats.min_value,
1103            Precision::Inexact(ScalarValue::Int32(Some(-10)))
1104        );
1105        assert!(matches!(col_stats.sum_value, Precision::Absent));
1106    }
1107
1108    #[test]
1109    fn test_try_merge_empty() {
1110        let schema = Arc::new(Schema::new(vec![Field::new(
1111            "col1",
1112            DataType::Int32,
1113            false,
1114        )]));
1115
1116        // Empty collection
1117        let items: Vec<Statistics> = vec![];
1118
1119        let summary_stats = Statistics::try_merge_iter(&items, &schema).unwrap();
1120
1121        // Verify default values for empty collection
1122        assert_eq!(summary_stats.num_rows, Precision::Absent);
1123        assert_eq!(summary_stats.total_byte_size, Precision::Absent);
1124        assert_eq!(summary_stats.column_statistics.len(), 1);
1125        assert_eq!(
1126            summary_stats.column_statistics[0].null_count,
1127            Precision::Absent
1128        );
1129    }
1130
1131    #[test]
1132    fn test_try_merge_mismatched_size() {
1133        // Create a schema with one column
1134        let schema = Arc::new(Schema::new(vec![Field::new(
1135            "col1",
1136            DataType::Int32,
1137            false,
1138        )]));
1139
1140        // No column statistics
1141        let stats1 = Statistics::default();
1142
1143        let stats2 =
1144            Statistics::default().add_column_statistics(ColumnStatistics::new_unknown());
1145
1146        let items = vec![stats1, stats2];
1147
1148        let e = Statistics::try_merge_iter(&items, &schema).unwrap_err();
1149        assert_contains!(e.to_string(), "Error during planning: Cannot merge statistics with different number of columns: 0 vs 1");
1150    }
1151}