datafusion_common/
stats.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module provides data structures to represent statistics
19
20use std::fmt::{self, Debug, Display};
21
22use crate::{Result, ScalarValue};
23
24use crate::error::_plan_err;
25use arrow::datatypes::{DataType, Schema};
26
27/// Represents a value with a degree of certainty. `Precision` is used to
28/// propagate information the precision of statistical values.
29#[derive(Clone, PartialEq, Eq, Default, Copy)]
30pub enum Precision<T: Debug + Clone + PartialEq + Eq + PartialOrd> {
31    /// The exact value is known
32    Exact(T),
33    /// The value is not known exactly, but is likely close to this value
34    Inexact(T),
35    /// Nothing is known about the value
36    #[default]
37    Absent,
38}
39
40impl<T: Debug + Clone + PartialEq + Eq + PartialOrd> Precision<T> {
41    /// If we have some value (exact or inexact), it returns that value.
42    /// Otherwise, it returns `None`.
43    pub fn get_value(&self) -> Option<&T> {
44        match self {
45            Precision::Exact(value) | Precision::Inexact(value) => Some(value),
46            Precision::Absent => None,
47        }
48    }
49
50    /// Transform the value in this [`Precision`] object, if one exists, using
51    /// the given function. Preserves the exactness state.
52    pub fn map<U, F>(self, f: F) -> Precision<U>
53    where
54        F: Fn(T) -> U,
55        U: Debug + Clone + PartialEq + Eq + PartialOrd,
56    {
57        match self {
58            Precision::Exact(val) => Precision::Exact(f(val)),
59            Precision::Inexact(val) => Precision::Inexact(f(val)),
60            _ => Precision::<U>::Absent,
61        }
62    }
63
64    /// Returns `Some(true)` if we have an exact value, `Some(false)` if we
65    /// have an inexact value, and `None` if there is no value.
66    pub fn is_exact(&self) -> Option<bool> {
67        match self {
68            Precision::Exact(_) => Some(true),
69            Precision::Inexact(_) => Some(false),
70            _ => None,
71        }
72    }
73
74    /// Returns the maximum of two (possibly inexact) values, conservatively
75    /// propagating exactness information. If one of the input values is
76    /// [`Precision::Absent`], the result is `Absent` too.
77    pub fn max(&self, other: &Precision<T>) -> Precision<T> {
78        match (self, other) {
79            (Precision::Exact(a), Precision::Exact(b)) => {
80                Precision::Exact(if a >= b { a.clone() } else { b.clone() })
81            }
82            (Precision::Inexact(a), Precision::Exact(b))
83            | (Precision::Exact(a), Precision::Inexact(b))
84            | (Precision::Inexact(a), Precision::Inexact(b)) => {
85                Precision::Inexact(if a >= b { a.clone() } else { b.clone() })
86            }
87            (_, _) => Precision::Absent,
88        }
89    }
90
91    /// Returns the minimum of two (possibly inexact) values, conservatively
92    /// propagating exactness information. If one of the input values is
93    /// [`Precision::Absent`], the result is `Absent` too.
94    pub fn min(&self, other: &Precision<T>) -> Precision<T> {
95        match (self, other) {
96            (Precision::Exact(a), Precision::Exact(b)) => {
97                Precision::Exact(if a >= b { b.clone() } else { a.clone() })
98            }
99            (Precision::Inexact(a), Precision::Exact(b))
100            | (Precision::Exact(a), Precision::Inexact(b))
101            | (Precision::Inexact(a), Precision::Inexact(b)) => {
102                Precision::Inexact(if a >= b { b.clone() } else { a.clone() })
103            }
104            (_, _) => Precision::Absent,
105        }
106    }
107
108    /// Demotes the precision state from exact to inexact (if present).
109    pub fn to_inexact(self) -> Self {
110        match self {
111            Precision::Exact(value) => Precision::Inexact(value),
112            _ => self,
113        }
114    }
115}
116
117impl Precision<usize> {
118    /// Calculates the sum of two (possibly inexact) [`usize`] values,
119    /// conservatively propagating exactness information. If one of the input
120    /// values is [`Precision::Absent`], the result is `Absent` too.
121    pub fn add(&self, other: &Precision<usize>) -> Precision<usize> {
122        match (self, other) {
123            (Precision::Exact(a), Precision::Exact(b)) => a.checked_add(*b).map_or_else(
124                || Precision::Inexact(a.saturating_add(*b)),
125                Precision::Exact,
126            ),
127            (Precision::Inexact(a), Precision::Exact(b))
128            | (Precision::Exact(a), Precision::Inexact(b))
129            | (Precision::Inexact(a), Precision::Inexact(b)) => {
130                Precision::Inexact(a.saturating_add(*b))
131            }
132            (_, _) => Precision::Absent,
133        }
134    }
135
136    /// Calculates the difference of two (possibly inexact) [`usize`] values,
137    /// conservatively propagating exactness information. If one of the input
138    /// values is [`Precision::Absent`], the result is `Absent` too.
139    pub fn sub(&self, other: &Precision<usize>) -> Precision<usize> {
140        match (self, other) {
141            (Precision::Exact(a), Precision::Exact(b)) => a.checked_sub(*b).map_or_else(
142                || Precision::Inexact(a.saturating_sub(*b)),
143                Precision::Exact,
144            ),
145            (Precision::Inexact(a), Precision::Exact(b))
146            | (Precision::Exact(a), Precision::Inexact(b))
147            | (Precision::Inexact(a), Precision::Inexact(b)) => {
148                Precision::Inexact(a.saturating_sub(*b))
149            }
150            (_, _) => Precision::Absent,
151        }
152    }
153
154    /// Calculates the multiplication of two (possibly inexact) [`usize`] values,
155    /// conservatively propagating exactness information. If one of the input
156    /// values is [`Precision::Absent`], the result is `Absent` too.
157    pub fn multiply(&self, other: &Precision<usize>) -> Precision<usize> {
158        match (self, other) {
159            (Precision::Exact(a), Precision::Exact(b)) => a.checked_mul(*b).map_or_else(
160                || Precision::Inexact(a.saturating_mul(*b)),
161                Precision::Exact,
162            ),
163            (Precision::Inexact(a), Precision::Exact(b))
164            | (Precision::Exact(a), Precision::Inexact(b))
165            | (Precision::Inexact(a), Precision::Inexact(b)) => {
166                Precision::Inexact(a.saturating_mul(*b))
167            }
168            (_, _) => Precision::Absent,
169        }
170    }
171
172    /// Return the estimate of applying a filter with estimated selectivity
173    /// `selectivity` to this Precision. A selectivity of `1.0` means that all
174    /// rows are selected. A selectivity of `0.5` means half the rows are
175    /// selected. Will always return inexact statistics.
176    pub fn with_estimated_selectivity(self, selectivity: f64) -> Self {
177        self.map(|v| ((v as f64 * selectivity).ceil()) as usize)
178            .to_inexact()
179    }
180}
181
182impl Precision<ScalarValue> {
183    /// Calculates the sum of two (possibly inexact) [`ScalarValue`] values,
184    /// conservatively propagating exactness information. If one of the input
185    /// values is [`Precision::Absent`], the result is `Absent` too.
186    pub fn add(&self, other: &Precision<ScalarValue>) -> Precision<ScalarValue> {
187        match (self, other) {
188            (Precision::Exact(a), Precision::Exact(b)) => {
189                a.add(b).map(Precision::Exact).unwrap_or(Precision::Absent)
190            }
191            (Precision::Inexact(a), Precision::Exact(b))
192            | (Precision::Exact(a), Precision::Inexact(b))
193            | (Precision::Inexact(a), Precision::Inexact(b)) => a
194                .add(b)
195                .map(Precision::Inexact)
196                .unwrap_or(Precision::Absent),
197            (_, _) => Precision::Absent,
198        }
199    }
200
201    /// Calculates the difference of two (possibly inexact) [`ScalarValue`] values,
202    /// conservatively propagating exactness information. If one of the input
203    /// values is [`Precision::Absent`], the result is `Absent` too.
204    pub fn sub(&self, other: &Precision<ScalarValue>) -> Precision<ScalarValue> {
205        match (self, other) {
206            (Precision::Exact(a), Precision::Exact(b)) => {
207                a.sub(b).map(Precision::Exact).unwrap_or(Precision::Absent)
208            }
209            (Precision::Inexact(a), Precision::Exact(b))
210            | (Precision::Exact(a), Precision::Inexact(b))
211            | (Precision::Inexact(a), Precision::Inexact(b)) => a
212                .sub(b)
213                .map(Precision::Inexact)
214                .unwrap_or(Precision::Absent),
215            (_, _) => Precision::Absent,
216        }
217    }
218
219    /// Calculates the multiplication of two (possibly inexact) [`ScalarValue`] values,
220    /// conservatively propagating exactness information. If one of the input
221    /// values is [`Precision::Absent`], the result is `Absent` too.
222    pub fn multiply(&self, other: &Precision<ScalarValue>) -> Precision<ScalarValue> {
223        match (self, other) {
224            (Precision::Exact(a), Precision::Exact(b)) => a
225                .mul_checked(b)
226                .map(Precision::Exact)
227                .unwrap_or(Precision::Absent),
228            (Precision::Inexact(a), Precision::Exact(b))
229            | (Precision::Exact(a), Precision::Inexact(b))
230            | (Precision::Inexact(a), Precision::Inexact(b)) => a
231                .mul_checked(b)
232                .map(Precision::Inexact)
233                .unwrap_or(Precision::Absent),
234            (_, _) => Precision::Absent,
235        }
236    }
237
238    /// Casts the value to the given data type, propagating exactness information.
239    pub fn cast_to(&self, data_type: &DataType) -> Result<Precision<ScalarValue>> {
240        match self {
241            Precision::Exact(value) => value.cast_to(data_type).map(Precision::Exact),
242            Precision::Inexact(value) => value.cast_to(data_type).map(Precision::Inexact),
243            Precision::Absent => Ok(Precision::Absent),
244        }
245    }
246}
247
248impl<T: Debug + Clone + PartialEq + Eq + PartialOrd> Debug for Precision<T> {
249    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
250        match self {
251            Precision::Exact(inner) => write!(f, "Exact({inner:?})"),
252            Precision::Inexact(inner) => write!(f, "Inexact({inner:?})"),
253            Precision::Absent => write!(f, "Absent"),
254        }
255    }
256}
257
258impl<T: Debug + Clone + PartialEq + Eq + PartialOrd> Display for Precision<T> {
259    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
260        match self {
261            Precision::Exact(inner) => write!(f, "Exact({inner:?})"),
262            Precision::Inexact(inner) => write!(f, "Inexact({inner:?})"),
263            Precision::Absent => write!(f, "Absent"),
264        }
265    }
266}
267
268impl From<Precision<usize>> for Precision<ScalarValue> {
269    fn from(value: Precision<usize>) -> Self {
270        match value {
271            Precision::Exact(v) => Precision::Exact(ScalarValue::UInt64(Some(v as u64))),
272            Precision::Inexact(v) => {
273                Precision::Inexact(ScalarValue::UInt64(Some(v as u64)))
274            }
275            Precision::Absent => Precision::Absent,
276        }
277    }
278}
279
280/// Statistics for a relation
281/// Fields are optional and can be inexact because the sources
282/// sometimes provide approximate estimates for performance reasons
283/// and the transformations output are not always predictable.
284#[derive(Debug, Clone, PartialEq, Eq)]
285pub struct Statistics {
286    /// The number of rows estimated to be scanned.
287    pub num_rows: Precision<usize>,
288    /// The total bytes of the output data.
289    /// Note that this is not the same as the total bytes that may be scanned,
290    /// processed, etc.
291    /// E.g. we may read 1GB of data from a Parquet file but the Arrow data
292    /// the node produces may be 2GB; it's this 2GB that is tracked here.
293    pub total_byte_size: Precision<usize>,
294    /// Statistics on a column level.
295    ///
296    /// It must contains a [`ColumnStatistics`] for each field in the schema of
297    /// the table to which the [`Statistics`] refer.
298    pub column_statistics: Vec<ColumnStatistics>,
299}
300
301impl Default for Statistics {
302    /// Returns a new [`Statistics`] instance with all fields set to unknown
303    /// and no columns.
304    fn default() -> Self {
305        Self {
306            num_rows: Precision::Absent,
307            total_byte_size: Precision::Absent,
308            column_statistics: vec![],
309        }
310    }
311}
312
313impl Statistics {
314    /// Returns a [`Statistics`] instance for the given schema by assigning
315    /// unknown statistics to each column in the schema.
316    pub fn new_unknown(schema: &Schema) -> Self {
317        Self {
318            num_rows: Precision::Absent,
319            total_byte_size: Precision::Absent,
320            column_statistics: Statistics::unknown_column(schema),
321        }
322    }
323
324    /// Calculates `total_byte_size` based on the schema and `num_rows`.
325    /// If any of the columns has non-primitive width, `total_byte_size` is set to inexact.
326    pub fn calculate_total_byte_size(&mut self, schema: &Schema) {
327        let mut row_size = Some(0);
328        for field in schema.fields() {
329            match field.data_type().primitive_width() {
330                Some(width) => {
331                    row_size = row_size.map(|s| s + width);
332                }
333                None => {
334                    row_size = None;
335                    break;
336                }
337            }
338        }
339        match row_size {
340            None => {
341                self.total_byte_size = self.total_byte_size.to_inexact();
342            }
343            Some(size) => {
344                self.total_byte_size = self.num_rows.multiply(&Precision::Exact(size));
345            }
346        }
347    }
348
349    /// Returns an unbounded `ColumnStatistics` for each field in the schema.
350    pub fn unknown_column(schema: &Schema) -> Vec<ColumnStatistics> {
351        schema
352            .fields()
353            .iter()
354            .map(|_| ColumnStatistics::new_unknown())
355            .collect()
356    }
357
358    /// Set the number of rows
359    pub fn with_num_rows(mut self, num_rows: Precision<usize>) -> Self {
360        self.num_rows = num_rows;
361        self
362    }
363
364    /// Set the total size, in bytes
365    pub fn with_total_byte_size(mut self, total_byte_size: Precision<usize>) -> Self {
366        self.total_byte_size = total_byte_size;
367        self
368    }
369
370    /// Add a column to the column statistics
371    pub fn add_column_statistics(mut self, column_stats: ColumnStatistics) -> Self {
372        self.column_statistics.push(column_stats);
373        self
374    }
375
376    /// If the exactness of a [`Statistics`] instance is lost, this function relaxes
377    /// the exactness of all information by converting them [`Precision::Inexact`].
378    pub fn to_inexact(mut self) -> Self {
379        self.num_rows = self.num_rows.to_inexact();
380        self.total_byte_size = self.total_byte_size.to_inexact();
381        self.column_statistics = self
382            .column_statistics
383            .into_iter()
384            .map(|s| s.to_inexact())
385            .collect();
386        self
387    }
388
389    /// Project the statistics to the given column indices.
390    ///
391    /// For example, if we had statistics for columns `{"a", "b", "c"}`,
392    /// projecting to `vec![2, 1]` would return statistics for columns `{"c",
393    /// "b"}`.
394    pub fn project(mut self, projection: Option<&Vec<usize>>) -> Self {
395        let Some(projection) = projection else {
396            return self;
397        };
398
399        #[expect(clippy::large_enum_variant)]
400        enum Slot {
401            /// The column is taken and put into the specified statistics location
402            Taken(usize),
403            /// The original columns is present
404            Present(ColumnStatistics),
405        }
406
407        // Convert to Vec<Slot> so we can avoid copying the statistics
408        let mut columns: Vec<_> = std::mem::take(&mut self.column_statistics)
409            .into_iter()
410            .map(Slot::Present)
411            .collect();
412
413        for idx in projection {
414            let next_idx = self.column_statistics.len();
415            let slot = std::mem::replace(
416                columns.get_mut(*idx).expect("projection out of bounds"),
417                Slot::Taken(next_idx),
418            );
419            match slot {
420                // The column was there, so just move it
421                Slot::Present(col) => self.column_statistics.push(col),
422                // The column was taken, so copy from the previous location
423                Slot::Taken(prev_idx) => self
424                    .column_statistics
425                    .push(self.column_statistics[prev_idx].clone()),
426            }
427        }
428
429        self
430    }
431
432    /// Calculates the statistics after applying `fetch` and `skip` operations.
433    ///
434    /// Here, `self` denotes per-partition statistics. Use the `n_partitions`
435    /// parameter to compute global statistics in a multi-partition setting.
436    pub fn with_fetch(
437        mut self,
438        fetch: Option<usize>,
439        skip: usize,
440        n_partitions: usize,
441    ) -> Result<Self> {
442        let fetch_val = fetch.unwrap_or(usize::MAX);
443
444        // Get the ratio of rows after / rows before on a per-partition basis
445        let num_rows_before = self.num_rows;
446
447        self.num_rows = match self {
448            Statistics {
449                num_rows: Precision::Exact(nr),
450                ..
451            }
452            | Statistics {
453                num_rows: Precision::Inexact(nr),
454                ..
455            } => {
456                // Here, the inexact case gives us an upper bound on the number of rows.
457                if nr <= skip {
458                    // All input data will be skipped:
459                    Precision::Exact(0)
460                } else if nr <= fetch_val && skip == 0 {
461                    // If the input does not reach the `fetch` globally, and `skip`
462                    // is zero (meaning the input and output are identical), return
463                    // input stats as is.
464                    // TODO: Can input stats still be used, but adjusted, when `skip`
465                    //       is non-zero?
466                    return Ok(self);
467                } else if nr - skip <= fetch_val {
468                    // After `skip` input rows are skipped, the remaining rows are
469                    // less than or equal to the `fetch` values, so `num_rows` must
470                    // equal the remaining rows.
471                    check_num_rows(
472                        (nr - skip).checked_mul(n_partitions),
473                        // We know that we have an estimate for the number of rows:
474                        self.num_rows.is_exact().unwrap(),
475                    )
476                } else {
477                    // At this point we know that we were given a `fetch` value
478                    // as the `None` case would go into the branch above. Since
479                    // the input has more rows than `fetch + skip`, the number
480                    // of rows will be the `fetch`, other statistics will have to be downgraded to inexact.
481                    check_num_rows(
482                        fetch_val.checked_mul(n_partitions),
483                        // We know that we have an estimate for the number of rows:
484                        self.num_rows.is_exact().unwrap(),
485                    )
486                }
487            }
488            Statistics {
489                num_rows: Precision::Absent,
490                ..
491            } => check_num_rows(fetch.and_then(|v| v.checked_mul(n_partitions)), false),
492        };
493        let ratio: f64 = match (num_rows_before, self.num_rows) {
494            (
495                Precision::Exact(nr_before) | Precision::Inexact(nr_before),
496                Precision::Exact(nr_after) | Precision::Inexact(nr_after),
497            ) => {
498                if nr_before == 0 {
499                    0.0
500                } else {
501                    nr_after as f64 / nr_before as f64
502                }
503            }
504            _ => 0.0,
505        };
506        self.column_statistics = self
507            .column_statistics
508            .into_iter()
509            .map(|cs| {
510                let mut cs = cs.to_inexact();
511                // Scale byte_size by the row ratio
512                cs.byte_size = match cs.byte_size {
513                    Precision::Exact(n) | Precision::Inexact(n) => {
514                        Precision::Inexact((n as f64 * ratio) as usize)
515                    }
516                    Precision::Absent => Precision::Absent,
517                };
518                cs
519            })
520            .collect();
521
522        // Compute total_byte_size as sum of column byte_size values if all are present,
523        // otherwise fall back to scaling the original total_byte_size
524        let sum_scan_bytes: Option<usize> = self
525            .column_statistics
526            .iter()
527            .map(|cs| cs.byte_size.get_value().copied())
528            .try_fold(0usize, |acc, val| val.map(|v| acc + v));
529
530        self.total_byte_size = match sum_scan_bytes {
531            Some(sum) => Precision::Inexact(sum),
532            None => {
533                // Fall back to scaling original total_byte_size if not all columns have byte_size
534                match &self.total_byte_size {
535                    Precision::Exact(n) | Precision::Inexact(n) => {
536                        Precision::Inexact((*n as f64 * ratio) as usize)
537                    }
538                    Precision::Absent => Precision::Absent,
539                }
540            }
541        };
542        Ok(self)
543    }
544
545    /// Summarize zero or more statistics into a single `Statistics` instance.
546    ///
547    /// The method assumes that all statistics are for the same schema.
548    /// If not, maybe you can call `SchemaMapper::map_column_statistics` to make them consistent.
549    ///
550    /// Returns an error if the statistics do not match the specified schemas.
551    pub fn try_merge_iter<'a, I>(items: I, schema: &Schema) -> Result<Statistics>
552    where
553        I: IntoIterator<Item = &'a Statistics>,
554    {
555        let mut items = items.into_iter();
556
557        let Some(init) = items.next() else {
558            return Ok(Statistics::new_unknown(schema));
559        };
560        items.try_fold(init.clone(), |acc: Statistics, item_stats: &Statistics| {
561            acc.try_merge(item_stats)
562        })
563    }
564
565    /// Merge this Statistics value with another Statistics value.
566    ///
567    /// Returns an error if the statistics do not match (different schemas).
568    ///
569    /// # Example
570    /// ```
571    /// # use datafusion_common::{ColumnStatistics, ScalarValue, Statistics};
572    /// # use arrow::datatypes::{Field, Schema, DataType};
573    /// # use datafusion_common::stats::Precision;
574    /// let stats1 = Statistics::default()
575    ///     .with_num_rows(Precision::Exact(1))
576    ///     .with_total_byte_size(Precision::Exact(2))
577    ///     .add_column_statistics(
578    ///         ColumnStatistics::new_unknown()
579    ///             .with_null_count(Precision::Exact(3))
580    ///             .with_min_value(Precision::Exact(ScalarValue::from(4)))
581    ///             .with_max_value(Precision::Exact(ScalarValue::from(5))),
582    ///     );
583    ///
584    /// let stats2 = Statistics::default()
585    ///     .with_num_rows(Precision::Exact(10))
586    ///     .with_total_byte_size(Precision::Inexact(20))
587    ///     .add_column_statistics(
588    ///         ColumnStatistics::new_unknown()
589    ///             // absent null count
590    ///             .with_min_value(Precision::Exact(ScalarValue::from(40)))
591    ///             .with_max_value(Precision::Exact(ScalarValue::from(50))),
592    ///     );
593    ///
594    /// let merged_stats = stats1.try_merge(&stats2).unwrap();
595    /// let expected_stats = Statistics::default()
596    ///     .with_num_rows(Precision::Exact(11))
597    ///     .with_total_byte_size(Precision::Inexact(22)) // inexact in stats2 --> inexact
598    ///     .add_column_statistics(
599    ///         ColumnStatistics::new_unknown()
600    ///             .with_null_count(Precision::Absent) // missing from stats2 --> absent
601    ///             .with_min_value(Precision::Exact(ScalarValue::from(4)))
602    ///             .with_max_value(Precision::Exact(ScalarValue::from(50))),
603    ///     );
604    ///
605    /// assert_eq!(merged_stats, expected_stats)
606    /// ```
607    pub fn try_merge(self, other: &Statistics) -> Result<Self> {
608        let Self {
609            mut num_rows,
610            mut total_byte_size,
611            mut column_statistics,
612        } = self;
613
614        // Accumulate statistics for subsequent items
615        num_rows = num_rows.add(&other.num_rows);
616        total_byte_size = total_byte_size.add(&other.total_byte_size);
617
618        if column_statistics.len() != other.column_statistics.len() {
619            return _plan_err!(
620                "Cannot merge statistics with different number of columns: {} vs {}",
621                column_statistics.len(),
622                other.column_statistics.len()
623            );
624        }
625
626        for (item_col_stats, col_stats) in other
627            .column_statistics
628            .iter()
629            .zip(column_statistics.iter_mut())
630        {
631            col_stats.null_count = col_stats.null_count.add(&item_col_stats.null_count);
632            col_stats.max_value = col_stats.max_value.max(&item_col_stats.max_value);
633            col_stats.min_value = col_stats.min_value.min(&item_col_stats.min_value);
634            col_stats.sum_value = col_stats.sum_value.add(&item_col_stats.sum_value);
635            col_stats.distinct_count = Precision::Absent;
636            col_stats.byte_size = col_stats.byte_size.add(&item_col_stats.byte_size);
637        }
638
639        Ok(Statistics {
640            num_rows,
641            total_byte_size,
642            column_statistics,
643        })
644    }
645}
646
647/// Creates an estimate of the number of rows in the output using the given
648/// optional value and exactness flag.
649fn check_num_rows(value: Option<usize>, is_exact: bool) -> Precision<usize> {
650    if let Some(value) = value {
651        if is_exact {
652            Precision::Exact(value)
653        } else {
654            // If the input stats are inexact, so are the output stats.
655            Precision::Inexact(value)
656        }
657    } else {
658        // If the estimate is not available (e.g. due to an overflow), we can
659        // not produce a reliable estimate.
660        Precision::Absent
661    }
662}
663
664impl Display for Statistics {
665    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
666        // string of column statistics
667        let column_stats = self
668            .column_statistics
669            .iter()
670            .enumerate()
671            .map(|(i, cs)| {
672                let s = format!("(Col[{i}]:");
673                let s = if cs.min_value != Precision::Absent {
674                    format!("{} Min={}", s, cs.min_value)
675                } else {
676                    s
677                };
678                let s = if cs.max_value != Precision::Absent {
679                    format!("{} Max={}", s, cs.max_value)
680                } else {
681                    s
682                };
683                let s = if cs.sum_value != Precision::Absent {
684                    format!("{} Sum={}", s, cs.sum_value)
685                } else {
686                    s
687                };
688                let s = if cs.null_count != Precision::Absent {
689                    format!("{} Null={}", s, cs.null_count)
690                } else {
691                    s
692                };
693                let s = if cs.distinct_count != Precision::Absent {
694                    format!("{} Distinct={}", s, cs.distinct_count)
695                } else {
696                    s
697                };
698                let s = if cs.byte_size != Precision::Absent {
699                    format!("{} ScanBytes={}", s, cs.byte_size)
700                } else {
701                    s
702                };
703
704                s + ")"
705            })
706            .collect::<Vec<_>>()
707            .join(",");
708
709        write!(
710            f,
711            "Rows={}, Bytes={}, [{}]",
712            self.num_rows, self.total_byte_size, column_stats
713        )?;
714
715        Ok(())
716    }
717}
718
719/// Statistics for a column within a relation
720#[derive(Clone, Debug, PartialEq, Eq, Default)]
721pub struct ColumnStatistics {
722    /// Number of null values on column
723    pub null_count: Precision<usize>,
724    /// Maximum value of column
725    pub max_value: Precision<ScalarValue>,
726    /// Minimum value of column
727    pub min_value: Precision<ScalarValue>,
728    /// Sum value of a column
729    pub sum_value: Precision<ScalarValue>,
730    /// Number of distinct values
731    pub distinct_count: Precision<usize>,
732    /// Estimated size of this column's data in bytes for the output.
733    ///
734    /// Note that this is not the same as the total bytes that may be scanned,
735    /// processed, etc.
736    ///
737    /// E.g. we may read 1GB of data from a Parquet file but the Arrow data
738    /// the node produces may be 2GB; it's this 2GB that is tracked here.
739    ///
740    /// Currently this is accurately calculated for primitive types only.
741    /// For complex types (like Utf8, List, Struct, etc), this value may be
742    /// absent or inexact (e.g. estimated from the size of the data in the source Parquet files).
743    ///
744    /// This value is automatically scaled when operations like limits or
745    /// filters reduce the number of rows (see [`Statistics::with_fetch`]).
746    pub byte_size: Precision<usize>,
747}
748
749impl ColumnStatistics {
750    /// Column contains a single non null value (e.g constant).
751    pub fn is_singleton(&self) -> bool {
752        match (&self.min_value, &self.max_value) {
753            // Min and max values are the same and not infinity.
754            (Precision::Exact(min), Precision::Exact(max)) => {
755                !min.is_null() && !max.is_null() && (min == max)
756            }
757            (_, _) => false,
758        }
759    }
760
761    /// Returns a [`ColumnStatistics`] instance having all [`Precision::Absent`] parameters.
762    pub fn new_unknown() -> Self {
763        Self {
764            null_count: Precision::Absent,
765            max_value: Precision::Absent,
766            min_value: Precision::Absent,
767            sum_value: Precision::Absent,
768            distinct_count: Precision::Absent,
769            byte_size: Precision::Absent,
770        }
771    }
772
773    /// Set the null count
774    pub fn with_null_count(mut self, null_count: Precision<usize>) -> Self {
775        self.null_count = null_count;
776        self
777    }
778
779    /// Set the max value
780    pub fn with_max_value(mut self, max_value: Precision<ScalarValue>) -> Self {
781        self.max_value = max_value;
782        self
783    }
784
785    /// Set the min value
786    pub fn with_min_value(mut self, min_value: Precision<ScalarValue>) -> Self {
787        self.min_value = min_value;
788        self
789    }
790
791    /// Set the sum value
792    pub fn with_sum_value(mut self, sum_value: Precision<ScalarValue>) -> Self {
793        self.sum_value = sum_value;
794        self
795    }
796
797    /// Set the distinct count
798    pub fn with_distinct_count(mut self, distinct_count: Precision<usize>) -> Self {
799        self.distinct_count = distinct_count;
800        self
801    }
802
803    /// Set the scan byte size
804    /// This should initially be set to the total size of the column.
805    pub fn with_byte_size(mut self, byte_size: Precision<usize>) -> Self {
806        self.byte_size = byte_size;
807        self
808    }
809
810    /// If the exactness of a [`ColumnStatistics`] instance is lost, this
811    /// function relaxes the exactness of all information by converting them
812    /// [`Precision::Inexact`].
813    pub fn to_inexact(mut self) -> Self {
814        self.null_count = self.null_count.to_inexact();
815        self.max_value = self.max_value.to_inexact();
816        self.min_value = self.min_value.to_inexact();
817        self.sum_value = self.sum_value.to_inexact();
818        self.distinct_count = self.distinct_count.to_inexact();
819        self.byte_size = self.byte_size.to_inexact();
820        self
821    }
822}
823
824#[cfg(test)]
825mod tests {
826    use super::*;
827    use crate::assert_contains;
828    use arrow::datatypes::Field;
829    use std::sync::Arc;
830
831    #[test]
832    fn test_get_value() {
833        let exact_precision = Precision::Exact(42);
834        let inexact_precision = Precision::Inexact(23);
835        let absent_precision = Precision::<i32>::Absent;
836
837        assert_eq!(*exact_precision.get_value().unwrap(), 42);
838        assert_eq!(*inexact_precision.get_value().unwrap(), 23);
839        assert_eq!(absent_precision.get_value(), None);
840    }
841
842    #[test]
843    fn test_map() {
844        let exact_precision = Precision::Exact(42);
845        let inexact_precision = Precision::Inexact(23);
846        let absent_precision = Precision::Absent;
847
848        let squared = |x| x * x;
849
850        assert_eq!(exact_precision.map(squared), Precision::Exact(1764));
851        assert_eq!(inexact_precision.map(squared), Precision::Inexact(529));
852        assert_eq!(absent_precision.map(squared), Precision::Absent);
853    }
854
855    #[test]
856    fn test_is_exact() {
857        let exact_precision = Precision::Exact(42);
858        let inexact_precision = Precision::Inexact(23);
859        let absent_precision = Precision::<i32>::Absent;
860
861        assert_eq!(exact_precision.is_exact(), Some(true));
862        assert_eq!(inexact_precision.is_exact(), Some(false));
863        assert_eq!(absent_precision.is_exact(), None);
864    }
865
866    #[test]
867    fn test_max() {
868        let precision1 = Precision::Exact(42);
869        let precision2 = Precision::Inexact(23);
870        let precision3 = Precision::Exact(30);
871        let absent_precision = Precision::Absent;
872
873        assert_eq!(precision1.max(&precision2), Precision::Inexact(42));
874        assert_eq!(precision1.max(&precision3), Precision::Exact(42));
875        assert_eq!(precision2.max(&precision3), Precision::Inexact(30));
876        assert_eq!(precision1.max(&absent_precision), Precision::Absent);
877    }
878
879    #[test]
880    fn test_min() {
881        let precision1 = Precision::Exact(42);
882        let precision2 = Precision::Inexact(23);
883        let precision3 = Precision::Exact(30);
884        let absent_precision = Precision::Absent;
885
886        assert_eq!(precision1.min(&precision2), Precision::Inexact(23));
887        assert_eq!(precision1.min(&precision3), Precision::Exact(30));
888        assert_eq!(precision2.min(&precision3), Precision::Inexact(23));
889        assert_eq!(precision1.min(&absent_precision), Precision::Absent);
890    }
891
892    #[test]
893    fn test_to_inexact() {
894        let exact_precision = Precision::Exact(42);
895        let inexact_precision = Precision::Inexact(42);
896        let absent_precision = Precision::<i32>::Absent;
897
898        assert_eq!(exact_precision.to_inexact(), inexact_precision);
899        assert_eq!(inexact_precision.to_inexact(), inexact_precision);
900        assert_eq!(absent_precision.to_inexact(), absent_precision);
901    }
902
903    #[test]
904    fn test_add() {
905        let precision1 = Precision::Exact(42);
906        let precision2 = Precision::Inexact(23);
907        let precision3 = Precision::Exact(30);
908        let absent_precision = Precision::Absent;
909        let precision_max_exact = Precision::Exact(usize::MAX);
910        let precision_max_inexact = Precision::Exact(usize::MAX);
911
912        assert_eq!(precision1.add(&precision2), Precision::Inexact(65));
913        assert_eq!(precision1.add(&precision3), Precision::Exact(72));
914        assert_eq!(precision2.add(&precision3), Precision::Inexact(53));
915        assert_eq!(precision1.add(&absent_precision), Precision::Absent);
916        assert_eq!(
917            precision_max_exact.add(&precision1),
918            Precision::Inexact(usize::MAX)
919        );
920        assert_eq!(
921            precision_max_inexact.add(&precision1),
922            Precision::Inexact(usize::MAX)
923        );
924    }
925
926    #[test]
927    fn test_add_scalar() {
928        let precision = Precision::Exact(ScalarValue::Int32(Some(42)));
929
930        assert_eq!(
931            precision.add(&Precision::Exact(ScalarValue::Int32(Some(23)))),
932            Precision::Exact(ScalarValue::Int32(Some(65))),
933        );
934        assert_eq!(
935            precision.add(&Precision::Inexact(ScalarValue::Int32(Some(23)))),
936            Precision::Inexact(ScalarValue::Int32(Some(65))),
937        );
938        assert_eq!(
939            precision.add(&Precision::Exact(ScalarValue::Int32(None))),
940            // As per behavior of ScalarValue::add
941            Precision::Exact(ScalarValue::Int32(None)),
942        );
943        assert_eq!(precision.add(&Precision::Absent), Precision::Absent);
944    }
945
946    #[test]
947    fn test_sub() {
948        let precision1 = Precision::Exact(42);
949        let precision2 = Precision::Inexact(23);
950        let precision3 = Precision::Exact(30);
951        let absent_precision = Precision::Absent;
952
953        assert_eq!(precision1.sub(&precision2), Precision::Inexact(19));
954        assert_eq!(precision1.sub(&precision3), Precision::Exact(12));
955        assert_eq!(precision2.sub(&precision1), Precision::Inexact(0));
956        assert_eq!(precision3.sub(&precision1), Precision::Inexact(0));
957        assert_eq!(precision1.sub(&absent_precision), Precision::Absent);
958    }
959
960    #[test]
961    fn test_sub_scalar() {
962        let precision = Precision::Exact(ScalarValue::Int32(Some(42)));
963
964        assert_eq!(
965            precision.sub(&Precision::Exact(ScalarValue::Int32(Some(23)))),
966            Precision::Exact(ScalarValue::Int32(Some(19))),
967        );
968        assert_eq!(
969            precision.sub(&Precision::Inexact(ScalarValue::Int32(Some(23)))),
970            Precision::Inexact(ScalarValue::Int32(Some(19))),
971        );
972        assert_eq!(
973            precision.sub(&Precision::Exact(ScalarValue::Int32(None))),
974            // As per behavior of ScalarValue::sub
975            Precision::Exact(ScalarValue::Int32(None)),
976        );
977        assert_eq!(precision.sub(&Precision::Absent), Precision::Absent);
978    }
979
980    #[test]
981    fn test_multiply() {
982        let precision1 = Precision::Exact(6);
983        let precision2 = Precision::Inexact(3);
984        let precision3 = Precision::Exact(5);
985        let precision_max_exact = Precision::Exact(usize::MAX);
986        let precision_max_inexact = Precision::Exact(usize::MAX);
987        let absent_precision = Precision::Absent;
988
989        assert_eq!(precision1.multiply(&precision2), Precision::Inexact(18));
990        assert_eq!(precision1.multiply(&precision3), Precision::Exact(30));
991        assert_eq!(precision2.multiply(&precision3), Precision::Inexact(15));
992        assert_eq!(precision1.multiply(&absent_precision), Precision::Absent);
993        assert_eq!(
994            precision_max_exact.multiply(&precision1),
995            Precision::Inexact(usize::MAX)
996        );
997        assert_eq!(
998            precision_max_inexact.multiply(&precision1),
999            Precision::Inexact(usize::MAX)
1000        );
1001    }
1002
1003    #[test]
1004    fn test_multiply_scalar() {
1005        let precision = Precision::Exact(ScalarValue::Int32(Some(6)));
1006
1007        assert_eq!(
1008            precision.multiply(&Precision::Exact(ScalarValue::Int32(Some(5)))),
1009            Precision::Exact(ScalarValue::Int32(Some(30))),
1010        );
1011        assert_eq!(
1012            precision.multiply(&Precision::Inexact(ScalarValue::Int32(Some(5)))),
1013            Precision::Inexact(ScalarValue::Int32(Some(30))),
1014        );
1015        assert_eq!(
1016            precision.multiply(&Precision::Exact(ScalarValue::Int32(None))),
1017            // As per behavior of ScalarValue::mul_checked
1018            Precision::Exact(ScalarValue::Int32(None)),
1019        );
1020        assert_eq!(precision.multiply(&Precision::Absent), Precision::Absent);
1021    }
1022
1023    #[test]
1024    fn test_cast_to() {
1025        // Valid
1026        assert_eq!(
1027            Precision::Exact(ScalarValue::Int32(Some(42)))
1028                .cast_to(&DataType::Int64)
1029                .unwrap(),
1030            Precision::Exact(ScalarValue::Int64(Some(42))),
1031        );
1032        assert_eq!(
1033            Precision::Inexact(ScalarValue::Int32(Some(42)))
1034                .cast_to(&DataType::Int64)
1035                .unwrap(),
1036            Precision::Inexact(ScalarValue::Int64(Some(42))),
1037        );
1038        // Null
1039        assert_eq!(
1040            Precision::Exact(ScalarValue::Int32(None))
1041                .cast_to(&DataType::Int64)
1042                .unwrap(),
1043            Precision::Exact(ScalarValue::Int64(None)),
1044        );
1045        // Overflow returns error
1046        assert!(
1047            Precision::Exact(ScalarValue::Int32(Some(256)))
1048                .cast_to(&DataType::Int8)
1049                .is_err()
1050        );
1051    }
1052
1053    #[test]
1054    fn test_precision_cloning() {
1055        // Precision<usize> is copy
1056        let precision: Precision<usize> = Precision::Exact(42);
1057        let p2 = precision;
1058        assert_eq!(precision, p2);
1059
1060        // Precision<ScalarValue> is not copy (requires .clone())
1061        let precision: Precision<ScalarValue> =
1062            Precision::Exact(ScalarValue::Int64(Some(42)));
1063        let p2 = precision.clone();
1064        assert_eq!(precision, p2);
1065    }
1066
1067    #[test]
1068    fn test_project_none() {
1069        let projection = None;
1070        let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
1071        assert_eq!(stats, make_stats(vec![10, 20, 30]));
1072    }
1073
1074    #[test]
1075    fn test_project_empty() {
1076        let projection = Some(vec![]);
1077        let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
1078        assert_eq!(stats, make_stats(vec![]));
1079    }
1080
1081    #[test]
1082    fn test_project_swap() {
1083        let projection = Some(vec![2, 1]);
1084        let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
1085        assert_eq!(stats, make_stats(vec![30, 20]));
1086    }
1087
1088    #[test]
1089    fn test_project_repeated() {
1090        let projection = Some(vec![1, 2, 1, 1, 0, 2]);
1091        let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
1092        assert_eq!(stats, make_stats(vec![20, 30, 20, 20, 10, 30]));
1093    }
1094
1095    // Make a Statistics structure with the specified null counts for each column
1096    fn make_stats(counts: impl IntoIterator<Item = usize>) -> Statistics {
1097        Statistics {
1098            num_rows: Precision::Exact(42),
1099            total_byte_size: Precision::Exact(500),
1100            column_statistics: counts.into_iter().map(col_stats_i64).collect(),
1101        }
1102    }
1103
1104    fn col_stats_i64(null_count: usize) -> ColumnStatistics {
1105        ColumnStatistics {
1106            null_count: Precision::Exact(null_count),
1107            max_value: Precision::Exact(ScalarValue::Int64(Some(42))),
1108            min_value: Precision::Exact(ScalarValue::Int64(Some(64))),
1109            sum_value: Precision::Exact(ScalarValue::Int64(Some(4600))),
1110            distinct_count: Precision::Exact(100),
1111            byte_size: Precision::Exact(800),
1112        }
1113    }
1114
1115    #[test]
1116    fn test_try_merge_basic() {
1117        // Create a schema with two columns
1118        let schema = Arc::new(Schema::new(vec![
1119            Field::new("col1", DataType::Int32, false),
1120            Field::new("col2", DataType::Int32, false),
1121        ]));
1122
1123        // Create items with statistics
1124        let stats1 = Statistics {
1125            num_rows: Precision::Exact(10),
1126            total_byte_size: Precision::Exact(100),
1127            column_statistics: vec![
1128                ColumnStatistics {
1129                    null_count: Precision::Exact(1),
1130                    max_value: Precision::Exact(ScalarValue::Int32(Some(100))),
1131                    min_value: Precision::Exact(ScalarValue::Int32(Some(1))),
1132                    sum_value: Precision::Exact(ScalarValue::Int32(Some(500))),
1133                    distinct_count: Precision::Absent,
1134                    byte_size: Precision::Exact(40),
1135                },
1136                ColumnStatistics {
1137                    null_count: Precision::Exact(2),
1138                    max_value: Precision::Exact(ScalarValue::Int32(Some(200))),
1139                    min_value: Precision::Exact(ScalarValue::Int32(Some(10))),
1140                    sum_value: Precision::Exact(ScalarValue::Int32(Some(1000))),
1141                    distinct_count: Precision::Absent,
1142                    byte_size: Precision::Exact(40),
1143                },
1144            ],
1145        };
1146
1147        let stats2 = Statistics {
1148            num_rows: Precision::Exact(15),
1149            total_byte_size: Precision::Exact(150),
1150            column_statistics: vec![
1151                ColumnStatistics {
1152                    null_count: Precision::Exact(2),
1153                    max_value: Precision::Exact(ScalarValue::Int32(Some(120))),
1154                    min_value: Precision::Exact(ScalarValue::Int32(Some(-10))),
1155                    sum_value: Precision::Exact(ScalarValue::Int32(Some(600))),
1156                    distinct_count: Precision::Absent,
1157                    byte_size: Precision::Exact(60),
1158                },
1159                ColumnStatistics {
1160                    null_count: Precision::Exact(3),
1161                    max_value: Precision::Exact(ScalarValue::Int32(Some(180))),
1162                    min_value: Precision::Exact(ScalarValue::Int32(Some(5))),
1163                    sum_value: Precision::Exact(ScalarValue::Int32(Some(1200))),
1164                    distinct_count: Precision::Absent,
1165                    byte_size: Precision::Exact(60),
1166                },
1167            ],
1168        };
1169
1170        let items = vec![stats1, stats2];
1171
1172        let summary_stats = Statistics::try_merge_iter(&items, &schema).unwrap();
1173
1174        // Verify the results
1175        assert_eq!(summary_stats.num_rows, Precision::Exact(25)); // 10 + 15
1176        assert_eq!(summary_stats.total_byte_size, Precision::Exact(250)); // 100 + 150
1177
1178        // Verify column statistics
1179        let col1_stats = &summary_stats.column_statistics[0];
1180        assert_eq!(col1_stats.null_count, Precision::Exact(3)); // 1 + 2
1181        assert_eq!(
1182            col1_stats.max_value,
1183            Precision::Exact(ScalarValue::Int32(Some(120)))
1184        );
1185        assert_eq!(
1186            col1_stats.min_value,
1187            Precision::Exact(ScalarValue::Int32(Some(-10)))
1188        );
1189        assert_eq!(
1190            col1_stats.sum_value,
1191            Precision::Exact(ScalarValue::Int32(Some(1100)))
1192        ); // 500 + 600
1193
1194        let col2_stats = &summary_stats.column_statistics[1];
1195        assert_eq!(col2_stats.null_count, Precision::Exact(5)); // 2 + 3
1196        assert_eq!(
1197            col2_stats.max_value,
1198            Precision::Exact(ScalarValue::Int32(Some(200)))
1199        );
1200        assert_eq!(
1201            col2_stats.min_value,
1202            Precision::Exact(ScalarValue::Int32(Some(5)))
1203        );
1204        assert_eq!(
1205            col2_stats.sum_value,
1206            Precision::Exact(ScalarValue::Int32(Some(2200)))
1207        ); // 1000 + 1200
1208    }
1209
1210    #[test]
1211    fn test_try_merge_mixed_precision() {
1212        // Create a schema with one column
1213        let schema = Arc::new(Schema::new(vec![Field::new(
1214            "col1",
1215            DataType::Int32,
1216            false,
1217        )]));
1218
1219        // Create items with different precision levels
1220        let stats1 = Statistics {
1221            num_rows: Precision::Exact(10),
1222            total_byte_size: Precision::Inexact(100),
1223            column_statistics: vec![ColumnStatistics {
1224                null_count: Precision::Exact(1),
1225                max_value: Precision::Exact(ScalarValue::Int32(Some(100))),
1226                min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1227                sum_value: Precision::Exact(ScalarValue::Int32(Some(500))),
1228                distinct_count: Precision::Absent,
1229                byte_size: Precision::Exact(40),
1230            }],
1231        };
1232
1233        let stats2 = Statistics {
1234            num_rows: Precision::Inexact(15),
1235            total_byte_size: Precision::Exact(150),
1236            column_statistics: vec![ColumnStatistics {
1237                null_count: Precision::Inexact(2),
1238                max_value: Precision::Inexact(ScalarValue::Int32(Some(120))),
1239                min_value: Precision::Exact(ScalarValue::Int32(Some(-10))),
1240                sum_value: Precision::Absent,
1241                distinct_count: Precision::Absent,
1242                byte_size: Precision::Inexact(60),
1243            }],
1244        };
1245
1246        let items = vec![stats1, stats2];
1247
1248        let summary_stats = Statistics::try_merge_iter(&items, &schema).unwrap();
1249
1250        assert_eq!(summary_stats.num_rows, Precision::Inexact(25));
1251        assert_eq!(summary_stats.total_byte_size, Precision::Inexact(250));
1252
1253        let col_stats = &summary_stats.column_statistics[0];
1254        assert_eq!(col_stats.null_count, Precision::Inexact(3));
1255        assert_eq!(
1256            col_stats.max_value,
1257            Precision::Inexact(ScalarValue::Int32(Some(120)))
1258        );
1259        assert_eq!(
1260            col_stats.min_value,
1261            Precision::Inexact(ScalarValue::Int32(Some(-10)))
1262        );
1263        assert!(matches!(col_stats.sum_value, Precision::Absent));
1264    }
1265
1266    #[test]
1267    fn test_try_merge_empty() {
1268        let schema = Arc::new(Schema::new(vec![Field::new(
1269            "col1",
1270            DataType::Int32,
1271            false,
1272        )]));
1273
1274        // Empty collection
1275        let items: Vec<Statistics> = vec![];
1276
1277        let summary_stats = Statistics::try_merge_iter(&items, &schema).unwrap();
1278
1279        // Verify default values for empty collection
1280        assert_eq!(summary_stats.num_rows, Precision::Absent);
1281        assert_eq!(summary_stats.total_byte_size, Precision::Absent);
1282        assert_eq!(summary_stats.column_statistics.len(), 1);
1283        assert_eq!(
1284            summary_stats.column_statistics[0].null_count,
1285            Precision::Absent
1286        );
1287    }
1288
1289    #[test]
1290    fn test_try_merge_mismatched_size() {
1291        // Create a schema with one column
1292        let schema = Arc::new(Schema::new(vec![Field::new(
1293            "col1",
1294            DataType::Int32,
1295            false,
1296        )]));
1297
1298        // No column statistics
1299        let stats1 = Statistics::default();
1300
1301        let stats2 =
1302            Statistics::default().add_column_statistics(ColumnStatistics::new_unknown());
1303
1304        let items = vec![stats1, stats2];
1305
1306        let e = Statistics::try_merge_iter(&items, &schema).unwrap_err();
1307        assert_contains!(
1308            e.to_string(),
1309            "Error during planning: Cannot merge statistics with different number of columns: 0 vs 1"
1310        );
1311    }
1312
1313    #[test]
1314    fn test_try_merge_distinct_count_absent() {
1315        // Create statistics with known distinct counts
1316        let stats1 = Statistics::default()
1317            .with_num_rows(Precision::Exact(10))
1318            .with_total_byte_size(Precision::Exact(100))
1319            .add_column_statistics(
1320                ColumnStatistics::new_unknown()
1321                    .with_null_count(Precision::Exact(0))
1322                    .with_min_value(Precision::Exact(ScalarValue::Int32(Some(1))))
1323                    .with_max_value(Precision::Exact(ScalarValue::Int32(Some(10))))
1324                    .with_distinct_count(Precision::Exact(5)),
1325            );
1326
1327        let stats2 = Statistics::default()
1328            .with_num_rows(Precision::Exact(15))
1329            .with_total_byte_size(Precision::Exact(150))
1330            .add_column_statistics(
1331                ColumnStatistics::new_unknown()
1332                    .with_null_count(Precision::Exact(0))
1333                    .with_min_value(Precision::Exact(ScalarValue::Int32(Some(5))))
1334                    .with_max_value(Precision::Exact(ScalarValue::Int32(Some(20))))
1335                    .with_distinct_count(Precision::Exact(7)),
1336            );
1337
1338        // Merge statistics
1339        let merged_stats = stats1.try_merge(&stats2).unwrap();
1340
1341        // Verify the results
1342        assert_eq!(merged_stats.num_rows, Precision::Exact(25));
1343        assert_eq!(merged_stats.total_byte_size, Precision::Exact(250));
1344
1345        let col_stats = &merged_stats.column_statistics[0];
1346        assert_eq!(col_stats.null_count, Precision::Exact(0));
1347        assert_eq!(
1348            col_stats.min_value,
1349            Precision::Exact(ScalarValue::Int32(Some(1)))
1350        );
1351        assert_eq!(
1352            col_stats.max_value,
1353            Precision::Exact(ScalarValue::Int32(Some(20)))
1354        );
1355        // Distinct count should be Absent after merge
1356        assert_eq!(col_stats.distinct_count, Precision::Absent);
1357    }
1358
1359    #[test]
1360    fn test_with_fetch_basic_preservation() {
1361        // Test that column statistics and byte size are preserved (as inexact) when applying fetch
1362        let original_stats = Statistics {
1363            num_rows: Precision::Exact(1000),
1364            total_byte_size: Precision::Exact(8000),
1365            column_statistics: vec![
1366                ColumnStatistics {
1367                    null_count: Precision::Exact(10),
1368                    max_value: Precision::Exact(ScalarValue::Int32(Some(100))),
1369                    min_value: Precision::Exact(ScalarValue::Int32(Some(0))),
1370                    sum_value: Precision::Exact(ScalarValue::Int32(Some(5050))),
1371                    distinct_count: Precision::Exact(50),
1372                    byte_size: Precision::Exact(4000),
1373                },
1374                ColumnStatistics {
1375                    null_count: Precision::Exact(20),
1376                    max_value: Precision::Exact(ScalarValue::Int64(Some(200))),
1377                    min_value: Precision::Exact(ScalarValue::Int64(Some(10))),
1378                    sum_value: Precision::Exact(ScalarValue::Int64(Some(10100))),
1379                    distinct_count: Precision::Exact(75),
1380                    byte_size: Precision::Exact(8000),
1381                },
1382            ],
1383        };
1384
1385        // Apply fetch of 100 rows (10% of original)
1386        let result = original_stats.clone().with_fetch(Some(100), 0, 1).unwrap();
1387
1388        // Check num_rows
1389        assert_eq!(result.num_rows, Precision::Exact(100));
1390
1391        // Check total_byte_size is computed as sum of scaled column byte_size values
1392        // Column 1: 4000 * 0.1 = 400, Column 2: 8000 * 0.1 = 800, Sum = 1200
1393        assert_eq!(result.total_byte_size, Precision::Inexact(1200));
1394
1395        // Check column statistics are preserved but marked as inexact
1396        assert_eq!(result.column_statistics.len(), 2);
1397
1398        // First column
1399        assert_eq!(
1400            result.column_statistics[0].null_count,
1401            Precision::Inexact(10)
1402        );
1403        assert_eq!(
1404            result.column_statistics[0].max_value,
1405            Precision::Inexact(ScalarValue::Int32(Some(100)))
1406        );
1407        assert_eq!(
1408            result.column_statistics[0].min_value,
1409            Precision::Inexact(ScalarValue::Int32(Some(0)))
1410        );
1411        assert_eq!(
1412            result.column_statistics[0].sum_value,
1413            Precision::Inexact(ScalarValue::Int32(Some(5050)))
1414        );
1415        assert_eq!(
1416            result.column_statistics[0].distinct_count,
1417            Precision::Inexact(50)
1418        );
1419
1420        // Second column
1421        assert_eq!(
1422            result.column_statistics[1].null_count,
1423            Precision::Inexact(20)
1424        );
1425        assert_eq!(
1426            result.column_statistics[1].max_value,
1427            Precision::Inexact(ScalarValue::Int64(Some(200)))
1428        );
1429        assert_eq!(
1430            result.column_statistics[1].min_value,
1431            Precision::Inexact(ScalarValue::Int64(Some(10)))
1432        );
1433        assert_eq!(
1434            result.column_statistics[1].sum_value,
1435            Precision::Inexact(ScalarValue::Int64(Some(10100)))
1436        );
1437        assert_eq!(
1438            result.column_statistics[1].distinct_count,
1439            Precision::Inexact(75)
1440        );
1441    }
1442
1443    #[test]
1444    fn test_with_fetch_inexact_input() {
1445        // Test that inexact input statistics remain inexact
1446        let original_stats = Statistics {
1447            num_rows: Precision::Inexact(1000),
1448            total_byte_size: Precision::Inexact(8000),
1449            column_statistics: vec![ColumnStatistics {
1450                null_count: Precision::Inexact(10),
1451                max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1452                min_value: Precision::Inexact(ScalarValue::Int32(Some(0))),
1453                sum_value: Precision::Inexact(ScalarValue::Int32(Some(5050))),
1454                distinct_count: Precision::Inexact(50),
1455                byte_size: Precision::Inexact(4000),
1456            }],
1457        };
1458
1459        let result = original_stats.clone().with_fetch(Some(500), 0, 1).unwrap();
1460
1461        // Check num_rows is inexact
1462        assert_eq!(result.num_rows, Precision::Inexact(500));
1463
1464        // Check total_byte_size is computed as sum of scaled column byte_size values
1465        // Column 1: 4000 * 0.5 = 2000, Sum = 2000
1466        assert_eq!(result.total_byte_size, Precision::Inexact(2000));
1467
1468        // Column stats remain inexact
1469        assert_eq!(
1470            result.column_statistics[0].null_count,
1471            Precision::Inexact(10)
1472        );
1473    }
1474
1475    #[test]
1476    fn test_with_fetch_skip_all_rows() {
1477        // Test when skip >= num_rows (all rows are skipped)
1478        let original_stats = Statistics {
1479            num_rows: Precision::Exact(100),
1480            total_byte_size: Precision::Exact(800),
1481            column_statistics: vec![col_stats_i64(10)],
1482        };
1483
1484        let result = original_stats.clone().with_fetch(Some(50), 100, 1).unwrap();
1485
1486        assert_eq!(result.num_rows, Precision::Exact(0));
1487        // When ratio is 0/100 = 0, byte size should be 0
1488        assert_eq!(result.total_byte_size, Precision::Inexact(0));
1489    }
1490
1491    #[test]
1492    fn test_with_fetch_no_limit() {
1493        // Test when fetch is None and skip is 0 (no limit applied)
1494        let original_stats = Statistics {
1495            num_rows: Precision::Exact(100),
1496            total_byte_size: Precision::Exact(800),
1497            column_statistics: vec![col_stats_i64(10)],
1498        };
1499
1500        let result = original_stats.clone().with_fetch(None, 0, 1).unwrap();
1501
1502        // Stats should be unchanged when no fetch and no skip
1503        assert_eq!(result.num_rows, Precision::Exact(100));
1504        assert_eq!(result.total_byte_size, Precision::Exact(800));
1505    }
1506
1507    #[test]
1508    fn test_with_fetch_with_skip() {
1509        // Test with both skip and fetch
1510        let original_stats = Statistics {
1511            num_rows: Precision::Exact(1000),
1512            total_byte_size: Precision::Exact(8000),
1513            column_statistics: vec![col_stats_i64(10)],
1514        };
1515
1516        // Skip 200, fetch 300, so we get rows 200-500
1517        let result = original_stats
1518            .clone()
1519            .with_fetch(Some(300), 200, 1)
1520            .unwrap();
1521
1522        assert_eq!(result.num_rows, Precision::Exact(300));
1523        // Column 1: byte_size 800 * (300/500) = 240, Sum = 240
1524        assert_eq!(result.total_byte_size, Precision::Inexact(240));
1525    }
1526
1527    #[test]
1528    fn test_with_fetch_multi_partition() {
1529        // Test with multiple partitions
1530        let original_stats = Statistics {
1531            num_rows: Precision::Exact(1000), // per partition
1532            total_byte_size: Precision::Exact(8000),
1533            column_statistics: vec![col_stats_i64(10)],
1534        };
1535
1536        // Fetch 100 per partition, 4 partitions = 400 total
1537        let result = original_stats.clone().with_fetch(Some(100), 0, 4).unwrap();
1538
1539        assert_eq!(result.num_rows, Precision::Exact(400));
1540        // Column 1: byte_size 800 * 0.4 = 320, Sum = 320
1541        assert_eq!(result.total_byte_size, Precision::Inexact(320));
1542    }
1543
1544    #[test]
1545    fn test_with_fetch_absent_stats() {
1546        // Test with absent statistics
1547        let original_stats = Statistics {
1548            num_rows: Precision::Absent,
1549            total_byte_size: Precision::Absent,
1550            column_statistics: vec![ColumnStatistics {
1551                null_count: Precision::Absent,
1552                max_value: Precision::Absent,
1553                min_value: Precision::Absent,
1554                sum_value: Precision::Absent,
1555                distinct_count: Precision::Absent,
1556                byte_size: Precision::Absent,
1557            }],
1558        };
1559
1560        let result = original_stats.clone().with_fetch(Some(100), 0, 1).unwrap();
1561
1562        // With absent input stats, output should be inexact estimate
1563        assert_eq!(result.num_rows, Precision::Inexact(100));
1564        assert_eq!(result.total_byte_size, Precision::Absent);
1565        // Column stats should remain absent
1566        assert_eq!(result.column_statistics[0].null_count, Precision::Absent);
1567    }
1568
1569    #[test]
1570    fn test_with_fetch_fetch_exceeds_rows() {
1571        // Test when fetch is larger than available rows after skip
1572        let original_stats = Statistics {
1573            num_rows: Precision::Exact(100),
1574            total_byte_size: Precision::Exact(800),
1575            column_statistics: vec![col_stats_i64(10)],
1576        };
1577
1578        // Skip 50, fetch 100, but only 50 rows remain
1579        let result = original_stats.clone().with_fetch(Some(100), 50, 1).unwrap();
1580
1581        assert_eq!(result.num_rows, Precision::Exact(50));
1582        // 50/100 = 0.5, so 800 * 0.5 = 400
1583        assert_eq!(result.total_byte_size, Precision::Inexact(400));
1584    }
1585
1586    #[test]
1587    fn test_with_fetch_preserves_all_column_stats() {
1588        // Comprehensive test that all column statistic fields are preserved
1589        let original_col_stats = ColumnStatistics {
1590            null_count: Precision::Exact(42),
1591            max_value: Precision::Exact(ScalarValue::Int32(Some(999))),
1592            min_value: Precision::Exact(ScalarValue::Int32(Some(-100))),
1593            sum_value: Precision::Exact(ScalarValue::Int32(Some(123456))),
1594            distinct_count: Precision::Exact(789),
1595            byte_size: Precision::Exact(4000),
1596        };
1597
1598        let original_stats = Statistics {
1599            num_rows: Precision::Exact(1000),
1600            total_byte_size: Precision::Exact(8000),
1601            column_statistics: vec![original_col_stats.clone()],
1602        };
1603
1604        let result = original_stats.with_fetch(Some(250), 0, 1).unwrap();
1605
1606        let result_col_stats = &result.column_statistics[0];
1607
1608        // All values should be preserved but marked as inexact
1609        assert_eq!(result_col_stats.null_count, Precision::Inexact(42));
1610        assert_eq!(
1611            result_col_stats.max_value,
1612            Precision::Inexact(ScalarValue::Int32(Some(999)))
1613        );
1614        assert_eq!(
1615            result_col_stats.min_value,
1616            Precision::Inexact(ScalarValue::Int32(Some(-100)))
1617        );
1618        assert_eq!(
1619            result_col_stats.sum_value,
1620            Precision::Inexact(ScalarValue::Int32(Some(123456)))
1621        );
1622        assert_eq!(result_col_stats.distinct_count, Precision::Inexact(789));
1623    }
1624
1625    #[test]
1626    fn test_byte_size_try_merge() {
1627        // Test that byte_size is summed correctly in try_merge
1628        let col_stats1 = ColumnStatistics {
1629            null_count: Precision::Exact(10),
1630            max_value: Precision::Absent,
1631            min_value: Precision::Absent,
1632            sum_value: Precision::Absent,
1633            distinct_count: Precision::Absent,
1634            byte_size: Precision::Exact(1000),
1635        };
1636        let col_stats2 = ColumnStatistics {
1637            null_count: Precision::Exact(20),
1638            max_value: Precision::Absent,
1639            min_value: Precision::Absent,
1640            sum_value: Precision::Absent,
1641            distinct_count: Precision::Absent,
1642            byte_size: Precision::Exact(2000),
1643        };
1644
1645        let stats1 = Statistics {
1646            num_rows: Precision::Exact(50),
1647            total_byte_size: Precision::Exact(1000),
1648            column_statistics: vec![col_stats1],
1649        };
1650        let stats2 = Statistics {
1651            num_rows: Precision::Exact(100),
1652            total_byte_size: Precision::Exact(2000),
1653            column_statistics: vec![col_stats2],
1654        };
1655
1656        let merged = stats1.try_merge(&stats2).unwrap();
1657        assert_eq!(
1658            merged.column_statistics[0].byte_size,
1659            Precision::Exact(3000) // 1000 + 2000
1660        );
1661    }
1662
1663    #[test]
1664    fn test_byte_size_to_inexact() {
1665        let col_stats = ColumnStatistics {
1666            null_count: Precision::Exact(10),
1667            max_value: Precision::Absent,
1668            min_value: Precision::Absent,
1669            sum_value: Precision::Absent,
1670            distinct_count: Precision::Absent,
1671            byte_size: Precision::Exact(5000),
1672        };
1673
1674        let inexact = col_stats.to_inexact();
1675        assert_eq!(inexact.byte_size, Precision::Inexact(5000));
1676    }
1677
1678    #[test]
1679    fn test_with_byte_size_builder() {
1680        let col_stats =
1681            ColumnStatistics::new_unknown().with_byte_size(Precision::Exact(8192));
1682        assert_eq!(col_stats.byte_size, Precision::Exact(8192));
1683    }
1684
1685    #[test]
1686    fn test_with_fetch_scales_byte_size() {
1687        // Test that byte_size is scaled by the row ratio in with_fetch
1688        let original_stats = Statistics {
1689            num_rows: Precision::Exact(1000),
1690            total_byte_size: Precision::Exact(8000),
1691            column_statistics: vec![
1692                ColumnStatistics {
1693                    null_count: Precision::Exact(10),
1694                    max_value: Precision::Absent,
1695                    min_value: Precision::Absent,
1696                    sum_value: Precision::Absent,
1697                    distinct_count: Precision::Absent,
1698                    byte_size: Precision::Exact(4000),
1699                },
1700                ColumnStatistics {
1701                    null_count: Precision::Exact(20),
1702                    max_value: Precision::Absent,
1703                    min_value: Precision::Absent,
1704                    sum_value: Precision::Absent,
1705                    distinct_count: Precision::Absent,
1706                    byte_size: Precision::Exact(8000),
1707                },
1708            ],
1709        };
1710
1711        // Apply fetch of 100 rows (10% of original)
1712        let result = original_stats.with_fetch(Some(100), 0, 1).unwrap();
1713
1714        // byte_size should be scaled: 4000 * 0.1 = 400, 8000 * 0.1 = 800
1715        assert_eq!(
1716            result.column_statistics[0].byte_size,
1717            Precision::Inexact(400)
1718        );
1719        assert_eq!(
1720            result.column_statistics[1].byte_size,
1721            Precision::Inexact(800)
1722        );
1723
1724        // total_byte_size should be computed as sum of byte_size values: 400 + 800 = 1200
1725        assert_eq!(result.total_byte_size, Precision::Inexact(1200));
1726    }
1727
1728    #[test]
1729    fn test_with_fetch_total_byte_size_fallback() {
1730        // Test that total_byte_size falls back to scaling when not all columns have byte_size
1731        let original_stats = Statistics {
1732            num_rows: Precision::Exact(1000),
1733            total_byte_size: Precision::Exact(8000),
1734            column_statistics: vec![
1735                ColumnStatistics {
1736                    null_count: Precision::Exact(10),
1737                    max_value: Precision::Absent,
1738                    min_value: Precision::Absent,
1739                    sum_value: Precision::Absent,
1740                    distinct_count: Precision::Absent,
1741                    byte_size: Precision::Exact(4000),
1742                },
1743                ColumnStatistics {
1744                    null_count: Precision::Exact(20),
1745                    max_value: Precision::Absent,
1746                    min_value: Precision::Absent,
1747                    sum_value: Precision::Absent,
1748                    distinct_count: Precision::Absent,
1749                    byte_size: Precision::Absent, // One column has no byte_size
1750                },
1751            ],
1752        };
1753
1754        // Apply fetch of 100 rows (10% of original)
1755        let result = original_stats.with_fetch(Some(100), 0, 1).unwrap();
1756
1757        // total_byte_size should fall back to scaling: 8000 * 0.1 = 800
1758        assert_eq!(result.total_byte_size, Precision::Inexact(800));
1759    }
1760}