Skip to main content

datafusion_common/
stats.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module provides data structures to represent statistics
19
20use std::fmt::{self, Debug, Display};
21
22use crate::{Result, ScalarValue};
23
24use crate::error::_plan_err;
25use arrow::datatypes::{DataType, Schema};
26
27/// Represents a value with a degree of certainty. `Precision` is used to
28/// propagate information the precision of statistical values.
29#[derive(Clone, PartialEq, Eq, Default, Copy)]
30pub enum Precision<T: Debug + Clone + PartialEq + Eq + PartialOrd> {
31    /// The exact value is known
32    Exact(T),
33    /// The value is not known exactly, but is likely close to this value
34    Inexact(T),
35    /// Nothing is known about the value
36    #[default]
37    Absent,
38}
39
40impl<T: Debug + Clone + PartialEq + Eq + PartialOrd> Precision<T> {
41    /// If we have some value (exact or inexact), it returns that value.
42    /// Otherwise, it returns `None`.
43    pub fn get_value(&self) -> Option<&T> {
44        match self {
45            Precision::Exact(value) | Precision::Inexact(value) => Some(value),
46            Precision::Absent => None,
47        }
48    }
49
50    /// Transform the value in this [`Precision`] object, if one exists, using
51    /// the given function. Preserves the exactness state.
52    pub fn map<U, F>(self, f: F) -> Precision<U>
53    where
54        F: Fn(T) -> U,
55        U: Debug + Clone + PartialEq + Eq + PartialOrd,
56    {
57        match self {
58            Precision::Exact(val) => Precision::Exact(f(val)),
59            Precision::Inexact(val) => Precision::Inexact(f(val)),
60            _ => Precision::<U>::Absent,
61        }
62    }
63
64    /// Returns `Some(true)` if we have an exact value, `Some(false)` if we
65    /// have an inexact value, and `None` if there is no value.
66    pub fn is_exact(&self) -> Option<bool> {
67        match self {
68            Precision::Exact(_) => Some(true),
69            Precision::Inexact(_) => Some(false),
70            _ => None,
71        }
72    }
73
74    /// Returns the maximum of two (possibly inexact) values, conservatively
75    /// propagating exactness information. If one of the input values is
76    /// [`Precision::Absent`], the result is `Absent` too.
77    pub fn max(&self, other: &Precision<T>) -> Precision<T> {
78        match (self, other) {
79            (Precision::Exact(a), Precision::Exact(b)) => {
80                Precision::Exact(if a >= b { a.clone() } else { b.clone() })
81            }
82            (Precision::Inexact(a), Precision::Exact(b))
83            | (Precision::Exact(a), Precision::Inexact(b))
84            | (Precision::Inexact(a), Precision::Inexact(b)) => {
85                Precision::Inexact(if a >= b { a.clone() } else { b.clone() })
86            }
87            (_, _) => Precision::Absent,
88        }
89    }
90
91    /// Returns the minimum of two (possibly inexact) values, conservatively
92    /// propagating exactness information. If one of the input values is
93    /// [`Precision::Absent`], the result is `Absent` too.
94    pub fn min(&self, other: &Precision<T>) -> Precision<T> {
95        match (self, other) {
96            (Precision::Exact(a), Precision::Exact(b)) => {
97                Precision::Exact(if a >= b { b.clone() } else { a.clone() })
98            }
99            (Precision::Inexact(a), Precision::Exact(b))
100            | (Precision::Exact(a), Precision::Inexact(b))
101            | (Precision::Inexact(a), Precision::Inexact(b)) => {
102                Precision::Inexact(if a >= b { b.clone() } else { a.clone() })
103            }
104            (_, _) => Precision::Absent,
105        }
106    }
107
108    /// Demotes the precision state from exact to inexact (if present).
109    pub fn to_inexact(self) -> Self {
110        match self {
111            Precision::Exact(value) => Precision::Inexact(value),
112            _ => self,
113        }
114    }
115}
116
117impl Precision<usize> {
118    /// Calculates the sum of two (possibly inexact) [`usize`] values,
119    /// conservatively propagating exactness information. If one of the input
120    /// values is [`Precision::Absent`], the result is `Absent` too.
121    pub fn add(&self, other: &Precision<usize>) -> Precision<usize> {
122        match (self, other) {
123            (Precision::Exact(a), Precision::Exact(b)) => a.checked_add(*b).map_or_else(
124                || Precision::Inexact(a.saturating_add(*b)),
125                Precision::Exact,
126            ),
127            (Precision::Inexact(a), Precision::Exact(b))
128            | (Precision::Exact(a), Precision::Inexact(b))
129            | (Precision::Inexact(a), Precision::Inexact(b)) => {
130                Precision::Inexact(a.saturating_add(*b))
131            }
132            (_, _) => Precision::Absent,
133        }
134    }
135
136    /// Calculates the difference of two (possibly inexact) [`usize`] values,
137    /// conservatively propagating exactness information. If one of the input
138    /// values is [`Precision::Absent`], the result is `Absent` too.
139    pub fn sub(&self, other: &Precision<usize>) -> Precision<usize> {
140        match (self, other) {
141            (Precision::Exact(a), Precision::Exact(b)) => a.checked_sub(*b).map_or_else(
142                || Precision::Inexact(a.saturating_sub(*b)),
143                Precision::Exact,
144            ),
145            (Precision::Inexact(a), Precision::Exact(b))
146            | (Precision::Exact(a), Precision::Inexact(b))
147            | (Precision::Inexact(a), Precision::Inexact(b)) => {
148                Precision::Inexact(a.saturating_sub(*b))
149            }
150            (_, _) => Precision::Absent,
151        }
152    }
153
154    /// Calculates the multiplication of two (possibly inexact) [`usize`] values,
155    /// conservatively propagating exactness information. If one of the input
156    /// values is [`Precision::Absent`], the result is `Absent` too.
157    pub fn multiply(&self, other: &Precision<usize>) -> Precision<usize> {
158        match (self, other) {
159            (Precision::Exact(a), Precision::Exact(b)) => a.checked_mul(*b).map_or_else(
160                || Precision::Inexact(a.saturating_mul(*b)),
161                Precision::Exact,
162            ),
163            (Precision::Inexact(a), Precision::Exact(b))
164            | (Precision::Exact(a), Precision::Inexact(b))
165            | (Precision::Inexact(a), Precision::Inexact(b)) => {
166                Precision::Inexact(a.saturating_mul(*b))
167            }
168            (_, _) => Precision::Absent,
169        }
170    }
171
172    /// Return the estimate of applying a filter with estimated selectivity
173    /// `selectivity` to this Precision. A selectivity of `1.0` means that all
174    /// rows are selected. A selectivity of `0.5` means half the rows are
175    /// selected. Will always return inexact statistics.
176    pub fn with_estimated_selectivity(self, selectivity: f64) -> Self {
177        self.map(|v| ((v as f64 * selectivity).ceil()) as usize)
178            .to_inexact()
179    }
180}
181
182impl Precision<ScalarValue> {
183    /// Calculates the sum of two (possibly inexact) [`ScalarValue`] values,
184    /// conservatively propagating exactness information. If one of the input
185    /// values is [`Precision::Absent`], the result is `Absent` too.
186    pub fn add(&self, other: &Precision<ScalarValue>) -> Precision<ScalarValue> {
187        match (self, other) {
188            (Precision::Exact(a), Precision::Exact(b)) => {
189                a.add(b).map(Precision::Exact).unwrap_or(Precision::Absent)
190            }
191            (Precision::Inexact(a), Precision::Exact(b))
192            | (Precision::Exact(a), Precision::Inexact(b))
193            | (Precision::Inexact(a), Precision::Inexact(b)) => a
194                .add(b)
195                .map(Precision::Inexact)
196                .unwrap_or(Precision::Absent),
197            (_, _) => Precision::Absent,
198        }
199    }
200
201    /// Calculates the difference of two (possibly inexact) [`ScalarValue`] values,
202    /// conservatively propagating exactness information. If one of the input
203    /// values is [`Precision::Absent`], the result is `Absent` too.
204    pub fn sub(&self, other: &Precision<ScalarValue>) -> Precision<ScalarValue> {
205        match (self, other) {
206            (Precision::Exact(a), Precision::Exact(b)) => {
207                a.sub(b).map(Precision::Exact).unwrap_or(Precision::Absent)
208            }
209            (Precision::Inexact(a), Precision::Exact(b))
210            | (Precision::Exact(a), Precision::Inexact(b))
211            | (Precision::Inexact(a), Precision::Inexact(b)) => a
212                .sub(b)
213                .map(Precision::Inexact)
214                .unwrap_or(Precision::Absent),
215            (_, _) => Precision::Absent,
216        }
217    }
218
219    /// Calculates the multiplication of two (possibly inexact) [`ScalarValue`] values,
220    /// conservatively propagating exactness information. If one of the input
221    /// values is [`Precision::Absent`], the result is `Absent` too.
222    pub fn multiply(&self, other: &Precision<ScalarValue>) -> Precision<ScalarValue> {
223        match (self, other) {
224            (Precision::Exact(a), Precision::Exact(b)) => a
225                .mul_checked(b)
226                .map(Precision::Exact)
227                .unwrap_or(Precision::Absent),
228            (Precision::Inexact(a), Precision::Exact(b))
229            | (Precision::Exact(a), Precision::Inexact(b))
230            | (Precision::Inexact(a), Precision::Inexact(b)) => a
231                .mul_checked(b)
232                .map(Precision::Inexact)
233                .unwrap_or(Precision::Absent),
234            (_, _) => Precision::Absent,
235        }
236    }
237
238    /// Casts the value to the given data type, propagating exactness information.
239    pub fn cast_to(&self, data_type: &DataType) -> Result<Precision<ScalarValue>> {
240        match self {
241            Precision::Exact(value) => value.cast_to(data_type).map(Precision::Exact),
242            Precision::Inexact(value) => value.cast_to(data_type).map(Precision::Inexact),
243            Precision::Absent => Ok(Precision::Absent),
244        }
245    }
246}
247
248impl<T: Debug + Clone + PartialEq + Eq + PartialOrd> Debug for Precision<T> {
249    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
250        match self {
251            Precision::Exact(inner) => write!(f, "Exact({inner:?})"),
252            Precision::Inexact(inner) => write!(f, "Inexact({inner:?})"),
253            Precision::Absent => write!(f, "Absent"),
254        }
255    }
256}
257
258impl<T: Debug + Clone + PartialEq + Eq + PartialOrd> Display for Precision<T> {
259    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
260        match self {
261            Precision::Exact(inner) => write!(f, "Exact({inner:?})"),
262            Precision::Inexact(inner) => write!(f, "Inexact({inner:?})"),
263            Precision::Absent => write!(f, "Absent"),
264        }
265    }
266}
267
268impl From<Precision<usize>> for Precision<ScalarValue> {
269    fn from(value: Precision<usize>) -> Self {
270        match value {
271            Precision::Exact(v) => Precision::Exact(ScalarValue::UInt64(Some(v as u64))),
272            Precision::Inexact(v) => {
273                Precision::Inexact(ScalarValue::UInt64(Some(v as u64)))
274            }
275            Precision::Absent => Precision::Absent,
276        }
277    }
278}
279
280/// Statistics for a relation
281/// Fields are optional and can be inexact because the sources
282/// sometimes provide approximate estimates for performance reasons
283/// and the transformations output are not always predictable.
284#[derive(Debug, Clone, PartialEq, Eq)]
285pub struct Statistics {
286    /// The number of rows estimated to be scanned.
287    pub num_rows: Precision<usize>,
288    /// The total bytes of the output data.
289    /// Note that this is not the same as the total bytes that may be scanned,
290    /// processed, etc.
291    /// E.g. we may read 1GB of data from a Parquet file but the Arrow data
292    /// the node produces may be 2GB; it's this 2GB that is tracked here.
293    pub total_byte_size: Precision<usize>,
294    /// Statistics on a column level.
295    ///
296    /// It must contains a [`ColumnStatistics`] for each field in the schema of
297    /// the table to which the [`Statistics`] refer.
298    pub column_statistics: Vec<ColumnStatistics>,
299}
300
301impl Default for Statistics {
302    /// Returns a new [`Statistics`] instance with all fields set to unknown
303    /// and no columns.
304    fn default() -> Self {
305        Self {
306            num_rows: Precision::Absent,
307            total_byte_size: Precision::Absent,
308            column_statistics: vec![],
309        }
310    }
311}
312
313impl Statistics {
314    /// Returns a [`Statistics`] instance for the given schema by assigning
315    /// unknown statistics to each column in the schema.
316    pub fn new_unknown(schema: &Schema) -> Self {
317        Self {
318            num_rows: Precision::Absent,
319            total_byte_size: Precision::Absent,
320            column_statistics: Statistics::unknown_column(schema),
321        }
322    }
323
324    /// Calculates `total_byte_size` based on the schema and `num_rows`.
325    /// If any of the columns has non-primitive width, `total_byte_size` is set to inexact.
326    pub fn calculate_total_byte_size(&mut self, schema: &Schema) {
327        let mut row_size = Some(0);
328        for field in schema.fields() {
329            match field.data_type().primitive_width() {
330                Some(width) => {
331                    row_size = row_size.map(|s| s + width);
332                }
333                None => {
334                    row_size = None;
335                    break;
336                }
337            }
338        }
339        match row_size {
340            None => {
341                self.total_byte_size = self.total_byte_size.to_inexact();
342            }
343            Some(size) => {
344                self.total_byte_size = self.num_rows.multiply(&Precision::Exact(size));
345            }
346        }
347    }
348
349    /// Returns an unbounded `ColumnStatistics` for each field in the schema.
350    pub fn unknown_column(schema: &Schema) -> Vec<ColumnStatistics> {
351        schema
352            .fields()
353            .iter()
354            .map(|_| ColumnStatistics::new_unknown())
355            .collect()
356    }
357
358    /// Set the number of rows
359    pub fn with_num_rows(mut self, num_rows: Precision<usize>) -> Self {
360        self.num_rows = num_rows;
361        self
362    }
363
364    /// Set the total size, in bytes
365    pub fn with_total_byte_size(mut self, total_byte_size: Precision<usize>) -> Self {
366        self.total_byte_size = total_byte_size;
367        self
368    }
369
370    /// Add a column to the column statistics
371    pub fn add_column_statistics(mut self, column_stats: ColumnStatistics) -> Self {
372        self.column_statistics.push(column_stats);
373        self
374    }
375
376    /// If the exactness of a [`Statistics`] instance is lost, this function relaxes
377    /// the exactness of all information by converting them [`Precision::Inexact`].
378    pub fn to_inexact(mut self) -> Self {
379        self.num_rows = self.num_rows.to_inexact();
380        self.total_byte_size = self.total_byte_size.to_inexact();
381        self.column_statistics = self
382            .column_statistics
383            .into_iter()
384            .map(|s| s.to_inexact())
385            .collect();
386        self
387    }
388
389    /// Project the statistics to the given column indices.
390    ///
391    /// For example, if we had statistics for columns `{"a", "b", "c"}`,
392    /// projecting to `vec![2, 1]` would return statistics for columns `{"c",
393    /// "b"}`.
394    pub fn project(self, projection: Option<&impl AsRef<[usize]>>) -> Self {
395        let projection = projection.map(AsRef::as_ref);
396        self.project_impl(projection)
397    }
398
399    fn project_impl(mut self, projection: Option<&[usize]>) -> Self {
400        let Some(projection) = projection.map(AsRef::as_ref) else {
401            return self;
402        };
403
404        #[expect(clippy::large_enum_variant)]
405        enum Slot {
406            /// The column is taken and put into the specified statistics location
407            Taken(usize),
408            /// The original columns is present
409            Present(ColumnStatistics),
410        }
411
412        // Convert to Vec<Slot> so we can avoid copying the statistics
413        let mut columns: Vec<_> = std::mem::take(&mut self.column_statistics)
414            .into_iter()
415            .map(Slot::Present)
416            .collect();
417
418        for idx in projection.iter() {
419            let next_idx = self.column_statistics.len();
420            let slot = std::mem::replace(
421                columns.get_mut(*idx).expect("projection out of bounds"),
422                Slot::Taken(next_idx),
423            );
424            match slot {
425                // The column was there, so just move it
426                Slot::Present(col) => self.column_statistics.push(col),
427                // The column was taken, so copy from the previous location
428                Slot::Taken(prev_idx) => self
429                    .column_statistics
430                    .push(self.column_statistics[prev_idx].clone()),
431            }
432        }
433
434        self
435    }
436
437    /// Calculates the statistics after applying `fetch` and `skip` operations.
438    ///
439    /// Here, `self` denotes per-partition statistics. Use the `n_partitions`
440    /// parameter to compute global statistics in a multi-partition setting.
441    pub fn with_fetch(
442        mut self,
443        fetch: Option<usize>,
444        skip: usize,
445        n_partitions: usize,
446    ) -> Result<Self> {
447        let fetch_val = fetch.unwrap_or(usize::MAX);
448
449        // Get the ratio of rows after / rows before on a per-partition basis
450        let num_rows_before = self.num_rows;
451
452        self.num_rows = match self {
453            Statistics {
454                num_rows: Precision::Exact(nr),
455                ..
456            }
457            | Statistics {
458                num_rows: Precision::Inexact(nr),
459                ..
460            } => {
461                // Here, the inexact case gives us an upper bound on the number of rows.
462                if nr <= skip {
463                    // All input data will be skipped:
464                    Precision::Exact(0)
465                } else if nr <= fetch_val && skip == 0 {
466                    // If the input does not reach the `fetch` globally, and `skip`
467                    // is zero (meaning the input and output are identical), return
468                    // input stats as is.
469                    // TODO: Can input stats still be used, but adjusted, when `skip`
470                    //       is non-zero?
471                    return Ok(self);
472                } else if nr - skip <= fetch_val {
473                    // After `skip` input rows are skipped, the remaining rows are
474                    // less than or equal to the `fetch` values, so `num_rows` must
475                    // equal the remaining rows.
476                    check_num_rows(
477                        (nr - skip).checked_mul(n_partitions),
478                        // We know that we have an estimate for the number of rows:
479                        self.num_rows.is_exact().unwrap(),
480                    )
481                } else {
482                    // At this point we know that we were given a `fetch` value
483                    // as the `None` case would go into the branch above. Since
484                    // the input has more rows than `fetch + skip`, the number
485                    // of rows will be the `fetch`, other statistics will have to be downgraded to inexact.
486                    check_num_rows(
487                        fetch_val.checked_mul(n_partitions),
488                        // We know that we have an estimate for the number of rows:
489                        self.num_rows.is_exact().unwrap(),
490                    )
491                }
492            }
493            Statistics {
494                num_rows: Precision::Absent,
495                ..
496            } => check_num_rows(fetch.and_then(|v| v.checked_mul(n_partitions)), false),
497        };
498        let ratio: f64 = match (num_rows_before, self.num_rows) {
499            (
500                Precision::Exact(nr_before) | Precision::Inexact(nr_before),
501                Precision::Exact(nr_after) | Precision::Inexact(nr_after),
502            ) => {
503                if nr_before == 0 {
504                    0.0
505                } else {
506                    nr_after as f64 / nr_before as f64
507                }
508            }
509            _ => 0.0,
510        };
511        self.column_statistics = self
512            .column_statistics
513            .into_iter()
514            .map(|cs| {
515                let mut cs = cs.to_inexact();
516                // Scale byte_size by the row ratio
517                cs.byte_size = match cs.byte_size {
518                    Precision::Exact(n) | Precision::Inexact(n) => {
519                        Precision::Inexact((n as f64 * ratio) as usize)
520                    }
521                    Precision::Absent => Precision::Absent,
522                };
523                cs
524            })
525            .collect();
526
527        // Compute total_byte_size as sum of column byte_size values if all are present,
528        // otherwise fall back to scaling the original total_byte_size
529        let sum_scan_bytes: Option<usize> = self
530            .column_statistics
531            .iter()
532            .map(|cs| cs.byte_size.get_value().copied())
533            .try_fold(0usize, |acc, val| val.map(|v| acc + v));
534
535        self.total_byte_size = match sum_scan_bytes {
536            Some(sum) => Precision::Inexact(sum),
537            None => {
538                // Fall back to scaling original total_byte_size if not all columns have byte_size
539                match &self.total_byte_size {
540                    Precision::Exact(n) | Precision::Inexact(n) => {
541                        Precision::Inexact((*n as f64 * ratio) as usize)
542                    }
543                    Precision::Absent => Precision::Absent,
544                }
545            }
546        };
547        Ok(self)
548    }
549
550    /// Summarize zero or more statistics into a single `Statistics` instance.
551    ///
552    /// The method assumes that all statistics are for the same schema.
553    /// If not, maybe you can call `SchemaMapper::map_column_statistics` to make them consistent.
554    ///
555    /// Returns an error if the statistics do not match the specified schemas.
556    pub fn try_merge_iter<'a, I>(items: I, schema: &Schema) -> Result<Statistics>
557    where
558        I: IntoIterator<Item = &'a Statistics>,
559    {
560        let mut items = items.into_iter();
561
562        let Some(init) = items.next() else {
563            return Ok(Statistics::new_unknown(schema));
564        };
565        items.try_fold(init.clone(), |acc: Statistics, item_stats: &Statistics| {
566            acc.try_merge(item_stats)
567        })
568    }
569
570    /// Merge this Statistics value with another Statistics value.
571    ///
572    /// Returns an error if the statistics do not match (different schemas).
573    ///
574    /// # Example
575    /// ```
576    /// # use datafusion_common::{ColumnStatistics, ScalarValue, Statistics};
577    /// # use arrow::datatypes::{Field, Schema, DataType};
578    /// # use datafusion_common::stats::Precision;
579    /// let stats1 = Statistics::default()
580    ///     .with_num_rows(Precision::Exact(1))
581    ///     .with_total_byte_size(Precision::Exact(2))
582    ///     .add_column_statistics(
583    ///         ColumnStatistics::new_unknown()
584    ///             .with_null_count(Precision::Exact(3))
585    ///             .with_min_value(Precision::Exact(ScalarValue::from(4)))
586    ///             .with_max_value(Precision::Exact(ScalarValue::from(5))),
587    ///     );
588    ///
589    /// let stats2 = Statistics::default()
590    ///     .with_num_rows(Precision::Exact(10))
591    ///     .with_total_byte_size(Precision::Inexact(20))
592    ///     .add_column_statistics(
593    ///         ColumnStatistics::new_unknown()
594    ///             // absent null count
595    ///             .with_min_value(Precision::Exact(ScalarValue::from(40)))
596    ///             .with_max_value(Precision::Exact(ScalarValue::from(50))),
597    ///     );
598    ///
599    /// let merged_stats = stats1.try_merge(&stats2).unwrap();
600    /// let expected_stats = Statistics::default()
601    ///     .with_num_rows(Precision::Exact(11))
602    ///     .with_total_byte_size(Precision::Inexact(22)) // inexact in stats2 --> inexact
603    ///     .add_column_statistics(
604    ///         ColumnStatistics::new_unknown()
605    ///             .with_null_count(Precision::Absent) // missing from stats2 --> absent
606    ///             .with_min_value(Precision::Exact(ScalarValue::from(4)))
607    ///             .with_max_value(Precision::Exact(ScalarValue::from(50))),
608    ///     );
609    ///
610    /// assert_eq!(merged_stats, expected_stats)
611    /// ```
612    pub fn try_merge(self, other: &Statistics) -> Result<Self> {
613        let Self {
614            mut num_rows,
615            mut total_byte_size,
616            mut column_statistics,
617        } = self;
618
619        // Accumulate statistics for subsequent items
620        num_rows = num_rows.add(&other.num_rows);
621        total_byte_size = total_byte_size.add(&other.total_byte_size);
622
623        if column_statistics.len() != other.column_statistics.len() {
624            return _plan_err!(
625                "Cannot merge statistics with different number of columns: {} vs {}",
626                column_statistics.len(),
627                other.column_statistics.len()
628            );
629        }
630
631        for (item_col_stats, col_stats) in other
632            .column_statistics
633            .iter()
634            .zip(column_statistics.iter_mut())
635        {
636            col_stats.null_count = col_stats.null_count.add(&item_col_stats.null_count);
637            col_stats.max_value = col_stats.max_value.max(&item_col_stats.max_value);
638            col_stats.min_value = col_stats.min_value.min(&item_col_stats.min_value);
639            col_stats.sum_value = col_stats.sum_value.add(&item_col_stats.sum_value);
640            col_stats.distinct_count = Precision::Absent;
641            col_stats.byte_size = col_stats.byte_size.add(&item_col_stats.byte_size);
642        }
643
644        Ok(Statistics {
645            num_rows,
646            total_byte_size,
647            column_statistics,
648        })
649    }
650}
651
652/// Creates an estimate of the number of rows in the output using the given
653/// optional value and exactness flag.
654fn check_num_rows(value: Option<usize>, is_exact: bool) -> Precision<usize> {
655    if let Some(value) = value {
656        if is_exact {
657            Precision::Exact(value)
658        } else {
659            // If the input stats are inexact, so are the output stats.
660            Precision::Inexact(value)
661        }
662    } else {
663        // If the estimate is not available (e.g. due to an overflow), we can
664        // not produce a reliable estimate.
665        Precision::Absent
666    }
667}
668
669impl Display for Statistics {
670    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
671        // string of column statistics
672        let column_stats = self
673            .column_statistics
674            .iter()
675            .enumerate()
676            .map(|(i, cs)| {
677                let s = format!("(Col[{i}]:");
678                let s = if cs.min_value != Precision::Absent {
679                    format!("{} Min={}", s, cs.min_value)
680                } else {
681                    s
682                };
683                let s = if cs.max_value != Precision::Absent {
684                    format!("{} Max={}", s, cs.max_value)
685                } else {
686                    s
687                };
688                let s = if cs.sum_value != Precision::Absent {
689                    format!("{} Sum={}", s, cs.sum_value)
690                } else {
691                    s
692                };
693                let s = if cs.null_count != Precision::Absent {
694                    format!("{} Null={}", s, cs.null_count)
695                } else {
696                    s
697                };
698                let s = if cs.distinct_count != Precision::Absent {
699                    format!("{} Distinct={}", s, cs.distinct_count)
700                } else {
701                    s
702                };
703                let s = if cs.byte_size != Precision::Absent {
704                    format!("{} ScanBytes={}", s, cs.byte_size)
705                } else {
706                    s
707                };
708
709                s + ")"
710            })
711            .collect::<Vec<_>>()
712            .join(",");
713
714        write!(
715            f,
716            "Rows={}, Bytes={}, [{}]",
717            self.num_rows, self.total_byte_size, column_stats
718        )?;
719
720        Ok(())
721    }
722}
723
724/// Statistics for a column within a relation
725#[derive(Clone, Debug, PartialEq, Eq, Default)]
726pub struct ColumnStatistics {
727    /// Number of null values on column
728    pub null_count: Precision<usize>,
729    /// Maximum value of column
730    pub max_value: Precision<ScalarValue>,
731    /// Minimum value of column
732    pub min_value: Precision<ScalarValue>,
733    /// Sum value of a column
734    pub sum_value: Precision<ScalarValue>,
735    /// Number of distinct values
736    pub distinct_count: Precision<usize>,
737    /// Estimated size of this column's data in bytes for the output.
738    ///
739    /// Note that this is not the same as the total bytes that may be scanned,
740    /// processed, etc.
741    ///
742    /// E.g. we may read 1GB of data from a Parquet file but the Arrow data
743    /// the node produces may be 2GB; it's this 2GB that is tracked here.
744    ///
745    /// Currently this is accurately calculated for primitive types only.
746    /// For complex types (like Utf8, List, Struct, etc), this value may be
747    /// absent or inexact (e.g. estimated from the size of the data in the source Parquet files).
748    ///
749    /// This value is automatically scaled when operations like limits or
750    /// filters reduce the number of rows (see [`Statistics::with_fetch`]).
751    pub byte_size: Precision<usize>,
752}
753
754impl ColumnStatistics {
755    /// Column contains a single non null value (e.g constant).
756    pub fn is_singleton(&self) -> bool {
757        match (&self.min_value, &self.max_value) {
758            // Min and max values are the same and not infinity.
759            (Precision::Exact(min), Precision::Exact(max)) => {
760                !min.is_null() && !max.is_null() && (min == max)
761            }
762            (_, _) => false,
763        }
764    }
765
766    /// Returns a [`ColumnStatistics`] instance having all [`Precision::Absent`] parameters.
767    pub fn new_unknown() -> Self {
768        Self {
769            null_count: Precision::Absent,
770            max_value: Precision::Absent,
771            min_value: Precision::Absent,
772            sum_value: Precision::Absent,
773            distinct_count: Precision::Absent,
774            byte_size: Precision::Absent,
775        }
776    }
777
778    /// Set the null count
779    pub fn with_null_count(mut self, null_count: Precision<usize>) -> Self {
780        self.null_count = null_count;
781        self
782    }
783
784    /// Set the max value
785    pub fn with_max_value(mut self, max_value: Precision<ScalarValue>) -> Self {
786        self.max_value = max_value;
787        self
788    }
789
790    /// Set the min value
791    pub fn with_min_value(mut self, min_value: Precision<ScalarValue>) -> Self {
792        self.min_value = min_value;
793        self
794    }
795
796    /// Set the sum value
797    pub fn with_sum_value(mut self, sum_value: Precision<ScalarValue>) -> Self {
798        self.sum_value = sum_value;
799        self
800    }
801
802    /// Set the distinct count
803    pub fn with_distinct_count(mut self, distinct_count: Precision<usize>) -> Self {
804        self.distinct_count = distinct_count;
805        self
806    }
807
808    /// Set the scan byte size
809    /// This should initially be set to the total size of the column.
810    pub fn with_byte_size(mut self, byte_size: Precision<usize>) -> Self {
811        self.byte_size = byte_size;
812        self
813    }
814
815    /// If the exactness of a [`ColumnStatistics`] instance is lost, this
816    /// function relaxes the exactness of all information by converting them
817    /// [`Precision::Inexact`].
818    pub fn to_inexact(mut self) -> Self {
819        self.null_count = self.null_count.to_inexact();
820        self.max_value = self.max_value.to_inexact();
821        self.min_value = self.min_value.to_inexact();
822        self.sum_value = self.sum_value.to_inexact();
823        self.distinct_count = self.distinct_count.to_inexact();
824        self.byte_size = self.byte_size.to_inexact();
825        self
826    }
827}
828
829#[cfg(test)]
830mod tests {
831    use super::*;
832    use crate::assert_contains;
833    use arrow::datatypes::Field;
834    use std::sync::Arc;
835
836    #[test]
837    fn test_get_value() {
838        let exact_precision = Precision::Exact(42);
839        let inexact_precision = Precision::Inexact(23);
840        let absent_precision = Precision::<i32>::Absent;
841
842        assert_eq!(*exact_precision.get_value().unwrap(), 42);
843        assert_eq!(*inexact_precision.get_value().unwrap(), 23);
844        assert_eq!(absent_precision.get_value(), None);
845    }
846
847    #[test]
848    fn test_map() {
849        let exact_precision = Precision::Exact(42);
850        let inexact_precision = Precision::Inexact(23);
851        let absent_precision = Precision::Absent;
852
853        let squared = |x| x * x;
854
855        assert_eq!(exact_precision.map(squared), Precision::Exact(1764));
856        assert_eq!(inexact_precision.map(squared), Precision::Inexact(529));
857        assert_eq!(absent_precision.map(squared), Precision::Absent);
858    }
859
860    #[test]
861    fn test_is_exact() {
862        let exact_precision = Precision::Exact(42);
863        let inexact_precision = Precision::Inexact(23);
864        let absent_precision = Precision::<i32>::Absent;
865
866        assert_eq!(exact_precision.is_exact(), Some(true));
867        assert_eq!(inexact_precision.is_exact(), Some(false));
868        assert_eq!(absent_precision.is_exact(), None);
869    }
870
871    #[test]
872    fn test_max() {
873        let precision1 = Precision::Exact(42);
874        let precision2 = Precision::Inexact(23);
875        let precision3 = Precision::Exact(30);
876        let absent_precision = Precision::Absent;
877
878        assert_eq!(precision1.max(&precision2), Precision::Inexact(42));
879        assert_eq!(precision1.max(&precision3), Precision::Exact(42));
880        assert_eq!(precision2.max(&precision3), Precision::Inexact(30));
881        assert_eq!(precision1.max(&absent_precision), Precision::Absent);
882    }
883
884    #[test]
885    fn test_min() {
886        let precision1 = Precision::Exact(42);
887        let precision2 = Precision::Inexact(23);
888        let precision3 = Precision::Exact(30);
889        let absent_precision = Precision::Absent;
890
891        assert_eq!(precision1.min(&precision2), Precision::Inexact(23));
892        assert_eq!(precision1.min(&precision3), Precision::Exact(30));
893        assert_eq!(precision2.min(&precision3), Precision::Inexact(23));
894        assert_eq!(precision1.min(&absent_precision), Precision::Absent);
895    }
896
897    #[test]
898    fn test_to_inexact() {
899        let exact_precision = Precision::Exact(42);
900        let inexact_precision = Precision::Inexact(42);
901        let absent_precision = Precision::<i32>::Absent;
902
903        assert_eq!(exact_precision.to_inexact(), inexact_precision);
904        assert_eq!(inexact_precision.to_inexact(), inexact_precision);
905        assert_eq!(absent_precision.to_inexact(), absent_precision);
906    }
907
908    #[test]
909    fn test_add() {
910        let precision1 = Precision::Exact(42);
911        let precision2 = Precision::Inexact(23);
912        let precision3 = Precision::Exact(30);
913        let absent_precision = Precision::Absent;
914        let precision_max_exact = Precision::Exact(usize::MAX);
915        let precision_max_inexact = Precision::Exact(usize::MAX);
916
917        assert_eq!(precision1.add(&precision2), Precision::Inexact(65));
918        assert_eq!(precision1.add(&precision3), Precision::Exact(72));
919        assert_eq!(precision2.add(&precision3), Precision::Inexact(53));
920        assert_eq!(precision1.add(&absent_precision), Precision::Absent);
921        assert_eq!(
922            precision_max_exact.add(&precision1),
923            Precision::Inexact(usize::MAX)
924        );
925        assert_eq!(
926            precision_max_inexact.add(&precision1),
927            Precision::Inexact(usize::MAX)
928        );
929    }
930
931    #[test]
932    fn test_add_scalar() {
933        let precision = Precision::Exact(ScalarValue::Int32(Some(42)));
934
935        assert_eq!(
936            precision.add(&Precision::Exact(ScalarValue::Int32(Some(23)))),
937            Precision::Exact(ScalarValue::Int32(Some(65))),
938        );
939        assert_eq!(
940            precision.add(&Precision::Inexact(ScalarValue::Int32(Some(23)))),
941            Precision::Inexact(ScalarValue::Int32(Some(65))),
942        );
943        assert_eq!(
944            precision.add(&Precision::Exact(ScalarValue::Int32(None))),
945            // As per behavior of ScalarValue::add
946            Precision::Exact(ScalarValue::Int32(None)),
947        );
948        assert_eq!(precision.add(&Precision::Absent), Precision::Absent);
949    }
950
951    #[test]
952    fn test_sub() {
953        let precision1 = Precision::Exact(42);
954        let precision2 = Precision::Inexact(23);
955        let precision3 = Precision::Exact(30);
956        let absent_precision = Precision::Absent;
957
958        assert_eq!(precision1.sub(&precision2), Precision::Inexact(19));
959        assert_eq!(precision1.sub(&precision3), Precision::Exact(12));
960        assert_eq!(precision2.sub(&precision1), Precision::Inexact(0));
961        assert_eq!(precision3.sub(&precision1), Precision::Inexact(0));
962        assert_eq!(precision1.sub(&absent_precision), Precision::Absent);
963    }
964
965    #[test]
966    fn test_sub_scalar() {
967        let precision = Precision::Exact(ScalarValue::Int32(Some(42)));
968
969        assert_eq!(
970            precision.sub(&Precision::Exact(ScalarValue::Int32(Some(23)))),
971            Precision::Exact(ScalarValue::Int32(Some(19))),
972        );
973        assert_eq!(
974            precision.sub(&Precision::Inexact(ScalarValue::Int32(Some(23)))),
975            Precision::Inexact(ScalarValue::Int32(Some(19))),
976        );
977        assert_eq!(
978            precision.sub(&Precision::Exact(ScalarValue::Int32(None))),
979            // As per behavior of ScalarValue::sub
980            Precision::Exact(ScalarValue::Int32(None)),
981        );
982        assert_eq!(precision.sub(&Precision::Absent), Precision::Absent);
983    }
984
985    #[test]
986    fn test_multiply() {
987        let precision1 = Precision::Exact(6);
988        let precision2 = Precision::Inexact(3);
989        let precision3 = Precision::Exact(5);
990        let precision_max_exact = Precision::Exact(usize::MAX);
991        let precision_max_inexact = Precision::Exact(usize::MAX);
992        let absent_precision = Precision::Absent;
993
994        assert_eq!(precision1.multiply(&precision2), Precision::Inexact(18));
995        assert_eq!(precision1.multiply(&precision3), Precision::Exact(30));
996        assert_eq!(precision2.multiply(&precision3), Precision::Inexact(15));
997        assert_eq!(precision1.multiply(&absent_precision), Precision::Absent);
998        assert_eq!(
999            precision_max_exact.multiply(&precision1),
1000            Precision::Inexact(usize::MAX)
1001        );
1002        assert_eq!(
1003            precision_max_inexact.multiply(&precision1),
1004            Precision::Inexact(usize::MAX)
1005        );
1006    }
1007
1008    #[test]
1009    fn test_multiply_scalar() {
1010        let precision = Precision::Exact(ScalarValue::Int32(Some(6)));
1011
1012        assert_eq!(
1013            precision.multiply(&Precision::Exact(ScalarValue::Int32(Some(5)))),
1014            Precision::Exact(ScalarValue::Int32(Some(30))),
1015        );
1016        assert_eq!(
1017            precision.multiply(&Precision::Inexact(ScalarValue::Int32(Some(5)))),
1018            Precision::Inexact(ScalarValue::Int32(Some(30))),
1019        );
1020        assert_eq!(
1021            precision.multiply(&Precision::Exact(ScalarValue::Int32(None))),
1022            // As per behavior of ScalarValue::mul_checked
1023            Precision::Exact(ScalarValue::Int32(None)),
1024        );
1025        assert_eq!(precision.multiply(&Precision::Absent), Precision::Absent);
1026    }
1027
1028    #[test]
1029    fn test_cast_to() {
1030        // Valid
1031        assert_eq!(
1032            Precision::Exact(ScalarValue::Int32(Some(42)))
1033                .cast_to(&DataType::Int64)
1034                .unwrap(),
1035            Precision::Exact(ScalarValue::Int64(Some(42))),
1036        );
1037        assert_eq!(
1038            Precision::Inexact(ScalarValue::Int32(Some(42)))
1039                .cast_to(&DataType::Int64)
1040                .unwrap(),
1041            Precision::Inexact(ScalarValue::Int64(Some(42))),
1042        );
1043        // Null
1044        assert_eq!(
1045            Precision::Exact(ScalarValue::Int32(None))
1046                .cast_to(&DataType::Int64)
1047                .unwrap(),
1048            Precision::Exact(ScalarValue::Int64(None)),
1049        );
1050        // Overflow returns error
1051        assert!(
1052            Precision::Exact(ScalarValue::Int32(Some(256)))
1053                .cast_to(&DataType::Int8)
1054                .is_err()
1055        );
1056    }
1057
1058    #[test]
1059    fn test_precision_cloning() {
1060        // Precision<usize> is copy
1061        let precision: Precision<usize> = Precision::Exact(42);
1062        let p2 = precision;
1063        assert_eq!(precision, p2);
1064
1065        // Precision<ScalarValue> is not copy (requires .clone())
1066        let precision: Precision<ScalarValue> =
1067            Precision::Exact(ScalarValue::Int64(Some(42)));
1068        let p2 = precision.clone();
1069        assert_eq!(precision, p2);
1070    }
1071
1072    #[test]
1073    fn test_project_none() {
1074        let projection: Option<Vec<usize>> = None;
1075        let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
1076        assert_eq!(stats, make_stats(vec![10, 20, 30]));
1077    }
1078
1079    #[test]
1080    fn test_project_empty() {
1081        let projection = Some(vec![]);
1082        let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
1083        assert_eq!(stats, make_stats(vec![]));
1084    }
1085
1086    #[test]
1087    fn test_project_swap() {
1088        let projection = Some(vec![2, 1]);
1089        let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
1090        assert_eq!(stats, make_stats(vec![30, 20]));
1091    }
1092
1093    #[test]
1094    fn test_project_repeated() {
1095        let projection = Some(vec![1, 2, 1, 1, 0, 2]);
1096        let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
1097        assert_eq!(stats, make_stats(vec![20, 30, 20, 20, 10, 30]));
1098    }
1099
1100    // Make a Statistics structure with the specified null counts for each column
1101    fn make_stats(counts: impl IntoIterator<Item = usize>) -> Statistics {
1102        Statistics {
1103            num_rows: Precision::Exact(42),
1104            total_byte_size: Precision::Exact(500),
1105            column_statistics: counts.into_iter().map(col_stats_i64).collect(),
1106        }
1107    }
1108
1109    fn col_stats_i64(null_count: usize) -> ColumnStatistics {
1110        ColumnStatistics {
1111            null_count: Precision::Exact(null_count),
1112            max_value: Precision::Exact(ScalarValue::Int64(Some(42))),
1113            min_value: Precision::Exact(ScalarValue::Int64(Some(64))),
1114            sum_value: Precision::Exact(ScalarValue::Int64(Some(4600))),
1115            distinct_count: Precision::Exact(100),
1116            byte_size: Precision::Exact(800),
1117        }
1118    }
1119
1120    #[test]
1121    fn test_try_merge_basic() {
1122        // Create a schema with two columns
1123        let schema = Arc::new(Schema::new(vec![
1124            Field::new("col1", DataType::Int32, false),
1125            Field::new("col2", DataType::Int32, false),
1126        ]));
1127
1128        // Create items with statistics
1129        let stats1 = Statistics {
1130            num_rows: Precision::Exact(10),
1131            total_byte_size: Precision::Exact(100),
1132            column_statistics: vec![
1133                ColumnStatistics {
1134                    null_count: Precision::Exact(1),
1135                    max_value: Precision::Exact(ScalarValue::Int32(Some(100))),
1136                    min_value: Precision::Exact(ScalarValue::Int32(Some(1))),
1137                    sum_value: Precision::Exact(ScalarValue::Int32(Some(500))),
1138                    distinct_count: Precision::Absent,
1139                    byte_size: Precision::Exact(40),
1140                },
1141                ColumnStatistics {
1142                    null_count: Precision::Exact(2),
1143                    max_value: Precision::Exact(ScalarValue::Int32(Some(200))),
1144                    min_value: Precision::Exact(ScalarValue::Int32(Some(10))),
1145                    sum_value: Precision::Exact(ScalarValue::Int32(Some(1000))),
1146                    distinct_count: Precision::Absent,
1147                    byte_size: Precision::Exact(40),
1148                },
1149            ],
1150        };
1151
1152        let stats2 = Statistics {
1153            num_rows: Precision::Exact(15),
1154            total_byte_size: Precision::Exact(150),
1155            column_statistics: vec![
1156                ColumnStatistics {
1157                    null_count: Precision::Exact(2),
1158                    max_value: Precision::Exact(ScalarValue::Int32(Some(120))),
1159                    min_value: Precision::Exact(ScalarValue::Int32(Some(-10))),
1160                    sum_value: Precision::Exact(ScalarValue::Int32(Some(600))),
1161                    distinct_count: Precision::Absent,
1162                    byte_size: Precision::Exact(60),
1163                },
1164                ColumnStatistics {
1165                    null_count: Precision::Exact(3),
1166                    max_value: Precision::Exact(ScalarValue::Int32(Some(180))),
1167                    min_value: Precision::Exact(ScalarValue::Int32(Some(5))),
1168                    sum_value: Precision::Exact(ScalarValue::Int32(Some(1200))),
1169                    distinct_count: Precision::Absent,
1170                    byte_size: Precision::Exact(60),
1171                },
1172            ],
1173        };
1174
1175        let items = vec![stats1, stats2];
1176
1177        let summary_stats = Statistics::try_merge_iter(&items, &schema).unwrap();
1178
1179        // Verify the results
1180        assert_eq!(summary_stats.num_rows, Precision::Exact(25)); // 10 + 15
1181        assert_eq!(summary_stats.total_byte_size, Precision::Exact(250)); // 100 + 150
1182
1183        // Verify column statistics
1184        let col1_stats = &summary_stats.column_statistics[0];
1185        assert_eq!(col1_stats.null_count, Precision::Exact(3)); // 1 + 2
1186        assert_eq!(
1187            col1_stats.max_value,
1188            Precision::Exact(ScalarValue::Int32(Some(120)))
1189        );
1190        assert_eq!(
1191            col1_stats.min_value,
1192            Precision::Exact(ScalarValue::Int32(Some(-10)))
1193        );
1194        assert_eq!(
1195            col1_stats.sum_value,
1196            Precision::Exact(ScalarValue::Int32(Some(1100)))
1197        ); // 500 + 600
1198
1199        let col2_stats = &summary_stats.column_statistics[1];
1200        assert_eq!(col2_stats.null_count, Precision::Exact(5)); // 2 + 3
1201        assert_eq!(
1202            col2_stats.max_value,
1203            Precision::Exact(ScalarValue::Int32(Some(200)))
1204        );
1205        assert_eq!(
1206            col2_stats.min_value,
1207            Precision::Exact(ScalarValue::Int32(Some(5)))
1208        );
1209        assert_eq!(
1210            col2_stats.sum_value,
1211            Precision::Exact(ScalarValue::Int32(Some(2200)))
1212        ); // 1000 + 1200
1213    }
1214
1215    #[test]
1216    fn test_try_merge_mixed_precision() {
1217        // Create a schema with one column
1218        let schema = Arc::new(Schema::new(vec![Field::new(
1219            "col1",
1220            DataType::Int32,
1221            false,
1222        )]));
1223
1224        // Create items with different precision levels
1225        let stats1 = Statistics {
1226            num_rows: Precision::Exact(10),
1227            total_byte_size: Precision::Inexact(100),
1228            column_statistics: vec![ColumnStatistics {
1229                null_count: Precision::Exact(1),
1230                max_value: Precision::Exact(ScalarValue::Int32(Some(100))),
1231                min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
1232                sum_value: Precision::Exact(ScalarValue::Int32(Some(500))),
1233                distinct_count: Precision::Absent,
1234                byte_size: Precision::Exact(40),
1235            }],
1236        };
1237
1238        let stats2 = Statistics {
1239            num_rows: Precision::Inexact(15),
1240            total_byte_size: Precision::Exact(150),
1241            column_statistics: vec![ColumnStatistics {
1242                null_count: Precision::Inexact(2),
1243                max_value: Precision::Inexact(ScalarValue::Int32(Some(120))),
1244                min_value: Precision::Exact(ScalarValue::Int32(Some(-10))),
1245                sum_value: Precision::Absent,
1246                distinct_count: Precision::Absent,
1247                byte_size: Precision::Inexact(60),
1248            }],
1249        };
1250
1251        let items = vec![stats1, stats2];
1252
1253        let summary_stats = Statistics::try_merge_iter(&items, &schema).unwrap();
1254
1255        assert_eq!(summary_stats.num_rows, Precision::Inexact(25));
1256        assert_eq!(summary_stats.total_byte_size, Precision::Inexact(250));
1257
1258        let col_stats = &summary_stats.column_statistics[0];
1259        assert_eq!(col_stats.null_count, Precision::Inexact(3));
1260        assert_eq!(
1261            col_stats.max_value,
1262            Precision::Inexact(ScalarValue::Int32(Some(120)))
1263        );
1264        assert_eq!(
1265            col_stats.min_value,
1266            Precision::Inexact(ScalarValue::Int32(Some(-10)))
1267        );
1268        assert_eq!(col_stats.sum_value, Precision::Absent);
1269    }
1270
1271    #[test]
1272    fn test_try_merge_empty() {
1273        let schema = Arc::new(Schema::new(vec![Field::new(
1274            "col1",
1275            DataType::Int32,
1276            false,
1277        )]));
1278
1279        // Empty collection
1280        let items: Vec<Statistics> = vec![];
1281
1282        let summary_stats = Statistics::try_merge_iter(&items, &schema).unwrap();
1283
1284        // Verify default values for empty collection
1285        assert_eq!(summary_stats.num_rows, Precision::Absent);
1286        assert_eq!(summary_stats.total_byte_size, Precision::Absent);
1287        assert_eq!(summary_stats.column_statistics.len(), 1);
1288        assert_eq!(
1289            summary_stats.column_statistics[0].null_count,
1290            Precision::Absent
1291        );
1292    }
1293
1294    #[test]
1295    fn test_try_merge_mismatched_size() {
1296        // Create a schema with one column
1297        let schema = Arc::new(Schema::new(vec![Field::new(
1298            "col1",
1299            DataType::Int32,
1300            false,
1301        )]));
1302
1303        // No column statistics
1304        let stats1 = Statistics::default();
1305
1306        let stats2 =
1307            Statistics::default().add_column_statistics(ColumnStatistics::new_unknown());
1308
1309        let items = vec![stats1, stats2];
1310
1311        let e = Statistics::try_merge_iter(&items, &schema).unwrap_err();
1312        assert_contains!(
1313            e.to_string(),
1314            "Error during planning: Cannot merge statistics with different number of columns: 0 vs 1"
1315        );
1316    }
1317
1318    #[test]
1319    fn test_try_merge_distinct_count_absent() {
1320        // Create statistics with known distinct counts
1321        let stats1 = Statistics::default()
1322            .with_num_rows(Precision::Exact(10))
1323            .with_total_byte_size(Precision::Exact(100))
1324            .add_column_statistics(
1325                ColumnStatistics::new_unknown()
1326                    .with_null_count(Precision::Exact(0))
1327                    .with_min_value(Precision::Exact(ScalarValue::Int32(Some(1))))
1328                    .with_max_value(Precision::Exact(ScalarValue::Int32(Some(10))))
1329                    .with_distinct_count(Precision::Exact(5)),
1330            );
1331
1332        let stats2 = Statistics::default()
1333            .with_num_rows(Precision::Exact(15))
1334            .with_total_byte_size(Precision::Exact(150))
1335            .add_column_statistics(
1336                ColumnStatistics::new_unknown()
1337                    .with_null_count(Precision::Exact(0))
1338                    .with_min_value(Precision::Exact(ScalarValue::Int32(Some(5))))
1339                    .with_max_value(Precision::Exact(ScalarValue::Int32(Some(20))))
1340                    .with_distinct_count(Precision::Exact(7)),
1341            );
1342
1343        // Merge statistics
1344        let merged_stats = stats1.try_merge(&stats2).unwrap();
1345
1346        // Verify the results
1347        assert_eq!(merged_stats.num_rows, Precision::Exact(25));
1348        assert_eq!(merged_stats.total_byte_size, Precision::Exact(250));
1349
1350        let col_stats = &merged_stats.column_statistics[0];
1351        assert_eq!(col_stats.null_count, Precision::Exact(0));
1352        assert_eq!(
1353            col_stats.min_value,
1354            Precision::Exact(ScalarValue::Int32(Some(1)))
1355        );
1356        assert_eq!(
1357            col_stats.max_value,
1358            Precision::Exact(ScalarValue::Int32(Some(20)))
1359        );
1360        // Distinct count should be Absent after merge
1361        assert_eq!(col_stats.distinct_count, Precision::Absent);
1362    }
1363
1364    #[test]
1365    fn test_with_fetch_basic_preservation() {
1366        // Test that column statistics and byte size are preserved (as inexact) when applying fetch
1367        let original_stats = Statistics {
1368            num_rows: Precision::Exact(1000),
1369            total_byte_size: Precision::Exact(8000),
1370            column_statistics: vec![
1371                ColumnStatistics {
1372                    null_count: Precision::Exact(10),
1373                    max_value: Precision::Exact(ScalarValue::Int32(Some(100))),
1374                    min_value: Precision::Exact(ScalarValue::Int32(Some(0))),
1375                    sum_value: Precision::Exact(ScalarValue::Int32(Some(5050))),
1376                    distinct_count: Precision::Exact(50),
1377                    byte_size: Precision::Exact(4000),
1378                },
1379                ColumnStatistics {
1380                    null_count: Precision::Exact(20),
1381                    max_value: Precision::Exact(ScalarValue::Int64(Some(200))),
1382                    min_value: Precision::Exact(ScalarValue::Int64(Some(10))),
1383                    sum_value: Precision::Exact(ScalarValue::Int64(Some(10100))),
1384                    distinct_count: Precision::Exact(75),
1385                    byte_size: Precision::Exact(8000),
1386                },
1387            ],
1388        };
1389
1390        // Apply fetch of 100 rows (10% of original)
1391        let result = original_stats.clone().with_fetch(Some(100), 0, 1).unwrap();
1392
1393        // Check num_rows
1394        assert_eq!(result.num_rows, Precision::Exact(100));
1395
1396        // Check total_byte_size is computed as sum of scaled column byte_size values
1397        // Column 1: 4000 * 0.1 = 400, Column 2: 8000 * 0.1 = 800, Sum = 1200
1398        assert_eq!(result.total_byte_size, Precision::Inexact(1200));
1399
1400        // Check column statistics are preserved but marked as inexact
1401        assert_eq!(result.column_statistics.len(), 2);
1402
1403        // First column
1404        assert_eq!(
1405            result.column_statistics[0].null_count,
1406            Precision::Inexact(10)
1407        );
1408        assert_eq!(
1409            result.column_statistics[0].max_value,
1410            Precision::Inexact(ScalarValue::Int32(Some(100)))
1411        );
1412        assert_eq!(
1413            result.column_statistics[0].min_value,
1414            Precision::Inexact(ScalarValue::Int32(Some(0)))
1415        );
1416        assert_eq!(
1417            result.column_statistics[0].sum_value,
1418            Precision::Inexact(ScalarValue::Int32(Some(5050)))
1419        );
1420        assert_eq!(
1421            result.column_statistics[0].distinct_count,
1422            Precision::Inexact(50)
1423        );
1424
1425        // Second column
1426        assert_eq!(
1427            result.column_statistics[1].null_count,
1428            Precision::Inexact(20)
1429        );
1430        assert_eq!(
1431            result.column_statistics[1].max_value,
1432            Precision::Inexact(ScalarValue::Int64(Some(200)))
1433        );
1434        assert_eq!(
1435            result.column_statistics[1].min_value,
1436            Precision::Inexact(ScalarValue::Int64(Some(10)))
1437        );
1438        assert_eq!(
1439            result.column_statistics[1].sum_value,
1440            Precision::Inexact(ScalarValue::Int64(Some(10100)))
1441        );
1442        assert_eq!(
1443            result.column_statistics[1].distinct_count,
1444            Precision::Inexact(75)
1445        );
1446    }
1447
1448    #[test]
1449    fn test_with_fetch_inexact_input() {
1450        // Test that inexact input statistics remain inexact
1451        let original_stats = Statistics {
1452            num_rows: Precision::Inexact(1000),
1453            total_byte_size: Precision::Inexact(8000),
1454            column_statistics: vec![ColumnStatistics {
1455                null_count: Precision::Inexact(10),
1456                max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
1457                min_value: Precision::Inexact(ScalarValue::Int32(Some(0))),
1458                sum_value: Precision::Inexact(ScalarValue::Int32(Some(5050))),
1459                distinct_count: Precision::Inexact(50),
1460                byte_size: Precision::Inexact(4000),
1461            }],
1462        };
1463
1464        let result = original_stats.clone().with_fetch(Some(500), 0, 1).unwrap();
1465
1466        // Check num_rows is inexact
1467        assert_eq!(result.num_rows, Precision::Inexact(500));
1468
1469        // Check total_byte_size is computed as sum of scaled column byte_size values
1470        // Column 1: 4000 * 0.5 = 2000, Sum = 2000
1471        assert_eq!(result.total_byte_size, Precision::Inexact(2000));
1472
1473        // Column stats remain inexact
1474        assert_eq!(
1475            result.column_statistics[0].null_count,
1476            Precision::Inexact(10)
1477        );
1478    }
1479
1480    #[test]
1481    fn test_with_fetch_skip_all_rows() {
1482        // Test when skip >= num_rows (all rows are skipped)
1483        let original_stats = Statistics {
1484            num_rows: Precision::Exact(100),
1485            total_byte_size: Precision::Exact(800),
1486            column_statistics: vec![col_stats_i64(10)],
1487        };
1488
1489        let result = original_stats.clone().with_fetch(Some(50), 100, 1).unwrap();
1490
1491        assert_eq!(result.num_rows, Precision::Exact(0));
1492        // When ratio is 0/100 = 0, byte size should be 0
1493        assert_eq!(result.total_byte_size, Precision::Inexact(0));
1494    }
1495
1496    #[test]
1497    fn test_with_fetch_no_limit() {
1498        // Test when fetch is None and skip is 0 (no limit applied)
1499        let original_stats = Statistics {
1500            num_rows: Precision::Exact(100),
1501            total_byte_size: Precision::Exact(800),
1502            column_statistics: vec![col_stats_i64(10)],
1503        };
1504
1505        let result = original_stats.clone().with_fetch(None, 0, 1).unwrap();
1506
1507        // Stats should be unchanged when no fetch and no skip
1508        assert_eq!(result.num_rows, Precision::Exact(100));
1509        assert_eq!(result.total_byte_size, Precision::Exact(800));
1510    }
1511
1512    #[test]
1513    fn test_with_fetch_with_skip() {
1514        // Test with both skip and fetch
1515        let original_stats = Statistics {
1516            num_rows: Precision::Exact(1000),
1517            total_byte_size: Precision::Exact(8000),
1518            column_statistics: vec![col_stats_i64(10)],
1519        };
1520
1521        // Skip 200, fetch 300, so we get rows 200-500
1522        let result = original_stats
1523            .clone()
1524            .with_fetch(Some(300), 200, 1)
1525            .unwrap();
1526
1527        assert_eq!(result.num_rows, Precision::Exact(300));
1528        // Column 1: byte_size 800 * (300/500) = 240, Sum = 240
1529        assert_eq!(result.total_byte_size, Precision::Inexact(240));
1530    }
1531
1532    #[test]
1533    fn test_with_fetch_multi_partition() {
1534        // Test with multiple partitions
1535        let original_stats = Statistics {
1536            num_rows: Precision::Exact(1000), // per partition
1537            total_byte_size: Precision::Exact(8000),
1538            column_statistics: vec![col_stats_i64(10)],
1539        };
1540
1541        // Fetch 100 per partition, 4 partitions = 400 total
1542        let result = original_stats.clone().with_fetch(Some(100), 0, 4).unwrap();
1543
1544        assert_eq!(result.num_rows, Precision::Exact(400));
1545        // Column 1: byte_size 800 * 0.4 = 320, Sum = 320
1546        assert_eq!(result.total_byte_size, Precision::Inexact(320));
1547    }
1548
1549    #[test]
1550    fn test_with_fetch_absent_stats() {
1551        // Test with absent statistics
1552        let original_stats = Statistics {
1553            num_rows: Precision::Absent,
1554            total_byte_size: Precision::Absent,
1555            column_statistics: vec![ColumnStatistics {
1556                null_count: Precision::Absent,
1557                max_value: Precision::Absent,
1558                min_value: Precision::Absent,
1559                sum_value: Precision::Absent,
1560                distinct_count: Precision::Absent,
1561                byte_size: Precision::Absent,
1562            }],
1563        };
1564
1565        let result = original_stats.clone().with_fetch(Some(100), 0, 1).unwrap();
1566
1567        // With absent input stats, output should be inexact estimate
1568        assert_eq!(result.num_rows, Precision::Inexact(100));
1569        assert_eq!(result.total_byte_size, Precision::Absent);
1570        // Column stats should remain absent
1571        assert_eq!(result.column_statistics[0].null_count, Precision::Absent);
1572    }
1573
1574    #[test]
1575    fn test_with_fetch_fetch_exceeds_rows() {
1576        // Test when fetch is larger than available rows after skip
1577        let original_stats = Statistics {
1578            num_rows: Precision::Exact(100),
1579            total_byte_size: Precision::Exact(800),
1580            column_statistics: vec![col_stats_i64(10)],
1581        };
1582
1583        // Skip 50, fetch 100, but only 50 rows remain
1584        let result = original_stats.clone().with_fetch(Some(100), 50, 1).unwrap();
1585
1586        assert_eq!(result.num_rows, Precision::Exact(50));
1587        // 50/100 = 0.5, so 800 * 0.5 = 400
1588        assert_eq!(result.total_byte_size, Precision::Inexact(400));
1589    }
1590
1591    #[test]
1592    fn test_with_fetch_preserves_all_column_stats() {
1593        // Comprehensive test that all column statistic fields are preserved
1594        let original_col_stats = ColumnStatistics {
1595            null_count: Precision::Exact(42),
1596            max_value: Precision::Exact(ScalarValue::Int32(Some(999))),
1597            min_value: Precision::Exact(ScalarValue::Int32(Some(-100))),
1598            sum_value: Precision::Exact(ScalarValue::Int32(Some(123456))),
1599            distinct_count: Precision::Exact(789),
1600            byte_size: Precision::Exact(4000),
1601        };
1602
1603        let original_stats = Statistics {
1604            num_rows: Precision::Exact(1000),
1605            total_byte_size: Precision::Exact(8000),
1606            column_statistics: vec![original_col_stats.clone()],
1607        };
1608
1609        let result = original_stats.with_fetch(Some(250), 0, 1).unwrap();
1610
1611        let result_col_stats = &result.column_statistics[0];
1612
1613        // All values should be preserved but marked as inexact
1614        assert_eq!(result_col_stats.null_count, Precision::Inexact(42));
1615        assert_eq!(
1616            result_col_stats.max_value,
1617            Precision::Inexact(ScalarValue::Int32(Some(999)))
1618        );
1619        assert_eq!(
1620            result_col_stats.min_value,
1621            Precision::Inexact(ScalarValue::Int32(Some(-100)))
1622        );
1623        assert_eq!(
1624            result_col_stats.sum_value,
1625            Precision::Inexact(ScalarValue::Int32(Some(123456)))
1626        );
1627        assert_eq!(result_col_stats.distinct_count, Precision::Inexact(789));
1628    }
1629
1630    #[test]
1631    fn test_byte_size_try_merge() {
1632        // Test that byte_size is summed correctly in try_merge
1633        let col_stats1 = ColumnStatistics {
1634            null_count: Precision::Exact(10),
1635            max_value: Precision::Absent,
1636            min_value: Precision::Absent,
1637            sum_value: Precision::Absent,
1638            distinct_count: Precision::Absent,
1639            byte_size: Precision::Exact(1000),
1640        };
1641        let col_stats2 = ColumnStatistics {
1642            null_count: Precision::Exact(20),
1643            max_value: Precision::Absent,
1644            min_value: Precision::Absent,
1645            sum_value: Precision::Absent,
1646            distinct_count: Precision::Absent,
1647            byte_size: Precision::Exact(2000),
1648        };
1649
1650        let stats1 = Statistics {
1651            num_rows: Precision::Exact(50),
1652            total_byte_size: Precision::Exact(1000),
1653            column_statistics: vec![col_stats1],
1654        };
1655        let stats2 = Statistics {
1656            num_rows: Precision::Exact(100),
1657            total_byte_size: Precision::Exact(2000),
1658            column_statistics: vec![col_stats2],
1659        };
1660
1661        let merged = stats1.try_merge(&stats2).unwrap();
1662        assert_eq!(
1663            merged.column_statistics[0].byte_size,
1664            Precision::Exact(3000) // 1000 + 2000
1665        );
1666    }
1667
1668    #[test]
1669    fn test_byte_size_to_inexact() {
1670        let col_stats = ColumnStatistics {
1671            null_count: Precision::Exact(10),
1672            max_value: Precision::Absent,
1673            min_value: Precision::Absent,
1674            sum_value: Precision::Absent,
1675            distinct_count: Precision::Absent,
1676            byte_size: Precision::Exact(5000),
1677        };
1678
1679        let inexact = col_stats.to_inexact();
1680        assert_eq!(inexact.byte_size, Precision::Inexact(5000));
1681    }
1682
1683    #[test]
1684    fn test_with_byte_size_builder() {
1685        let col_stats =
1686            ColumnStatistics::new_unknown().with_byte_size(Precision::Exact(8192));
1687        assert_eq!(col_stats.byte_size, Precision::Exact(8192));
1688    }
1689
1690    #[test]
1691    fn test_with_fetch_scales_byte_size() {
1692        // Test that byte_size is scaled by the row ratio in with_fetch
1693        let original_stats = Statistics {
1694            num_rows: Precision::Exact(1000),
1695            total_byte_size: Precision::Exact(8000),
1696            column_statistics: vec![
1697                ColumnStatistics {
1698                    null_count: Precision::Exact(10),
1699                    max_value: Precision::Absent,
1700                    min_value: Precision::Absent,
1701                    sum_value: Precision::Absent,
1702                    distinct_count: Precision::Absent,
1703                    byte_size: Precision::Exact(4000),
1704                },
1705                ColumnStatistics {
1706                    null_count: Precision::Exact(20),
1707                    max_value: Precision::Absent,
1708                    min_value: Precision::Absent,
1709                    sum_value: Precision::Absent,
1710                    distinct_count: Precision::Absent,
1711                    byte_size: Precision::Exact(8000),
1712                },
1713            ],
1714        };
1715
1716        // Apply fetch of 100 rows (10% of original)
1717        let result = original_stats.with_fetch(Some(100), 0, 1).unwrap();
1718
1719        // byte_size should be scaled: 4000 * 0.1 = 400, 8000 * 0.1 = 800
1720        assert_eq!(
1721            result.column_statistics[0].byte_size,
1722            Precision::Inexact(400)
1723        );
1724        assert_eq!(
1725            result.column_statistics[1].byte_size,
1726            Precision::Inexact(800)
1727        );
1728
1729        // total_byte_size should be computed as sum of byte_size values: 400 + 800 = 1200
1730        assert_eq!(result.total_byte_size, Precision::Inexact(1200));
1731    }
1732
1733    #[test]
1734    fn test_with_fetch_total_byte_size_fallback() {
1735        // Test that total_byte_size falls back to scaling when not all columns have byte_size
1736        let original_stats = Statistics {
1737            num_rows: Precision::Exact(1000),
1738            total_byte_size: Precision::Exact(8000),
1739            column_statistics: vec![
1740                ColumnStatistics {
1741                    null_count: Precision::Exact(10),
1742                    max_value: Precision::Absent,
1743                    min_value: Precision::Absent,
1744                    sum_value: Precision::Absent,
1745                    distinct_count: Precision::Absent,
1746                    byte_size: Precision::Exact(4000),
1747                },
1748                ColumnStatistics {
1749                    null_count: Precision::Exact(20),
1750                    max_value: Precision::Absent,
1751                    min_value: Precision::Absent,
1752                    sum_value: Precision::Absent,
1753                    distinct_count: Precision::Absent,
1754                    byte_size: Precision::Absent, // One column has no byte_size
1755                },
1756            ],
1757        };
1758
1759        // Apply fetch of 100 rows (10% of original)
1760        let result = original_stats.with_fetch(Some(100), 0, 1).unwrap();
1761
1762        // total_byte_size should fall back to scaling: 8000 * 0.1 = 800
1763        assert_eq!(result.total_byte_size, Precision::Inexact(800));
1764    }
1765}