use std::fmt::{self, Debug, Display};
use crate::{Result, ScalarValue};
use crate::error::_plan_err;
use crate::utils::aggregate::precision_add;
use arrow::datatypes::{DataType, Schema};
#[derive(Clone, PartialEq, Eq, Default, Copy)]
pub enum Precision<T: Debug + Clone + PartialEq + Eq + PartialOrd> {
Exact(T),
Inexact(T),
#[default]
Absent,
}
impl<T: Debug + Clone + PartialEq + Eq + PartialOrd> Precision<T> {
pub fn get_value(&self) -> Option<&T> {
match self {
Precision::Exact(value) | Precision::Inexact(value) => Some(value),
Precision::Absent => None,
}
}
pub fn map<U, F>(self, f: F) -> Precision<U>
where
F: Fn(T) -> U,
U: Debug + Clone + PartialEq + Eq + PartialOrd,
{
match self {
Precision::Exact(val) => Precision::Exact(f(val)),
Precision::Inexact(val) => Precision::Inexact(f(val)),
_ => Precision::<U>::Absent,
}
}
pub fn is_exact(&self) -> Option<bool> {
match self {
Precision::Exact(_) => Some(true),
Precision::Inexact(_) => Some(false),
_ => None,
}
}
pub fn max(&self, other: &Precision<T>) -> Precision<T> {
match (self, other) {
(Precision::Exact(a), Precision::Exact(b)) => {
Precision::Exact(if a >= b { a.clone() } else { b.clone() })
}
(Precision::Inexact(a), Precision::Exact(b))
| (Precision::Exact(a), Precision::Inexact(b))
| (Precision::Inexact(a), Precision::Inexact(b)) => {
Precision::Inexact(if a >= b { a.clone() } else { b.clone() })
}
(_, _) => Precision::Absent,
}
}
pub fn min(&self, other: &Precision<T>) -> Precision<T> {
match (self, other) {
(Precision::Exact(a), Precision::Exact(b)) => {
Precision::Exact(if a >= b { b.clone() } else { a.clone() })
}
(Precision::Inexact(a), Precision::Exact(b))
| (Precision::Exact(a), Precision::Inexact(b))
| (Precision::Inexact(a), Precision::Inexact(b)) => {
Precision::Inexact(if a >= b { b.clone() } else { a.clone() })
}
(_, _) => Precision::Absent,
}
}
pub fn to_inexact(self) -> Self {
match self {
Precision::Exact(value) => Precision::Inexact(value),
_ => self,
}
}
}
impl Precision<usize> {
pub fn add(&self, other: &Precision<usize>) -> Precision<usize> {
match (self, other) {
(Precision::Exact(a), Precision::Exact(b)) => a.checked_add(*b).map_or_else(
|| Precision::Inexact(a.saturating_add(*b)),
Precision::Exact,
),
(Precision::Inexact(a), Precision::Exact(b))
| (Precision::Exact(a), Precision::Inexact(b))
| (Precision::Inexact(a), Precision::Inexact(b)) => {
Precision::Inexact(a.saturating_add(*b))
}
(_, _) => Precision::Absent,
}
}
pub fn sub(&self, other: &Precision<usize>) -> Precision<usize> {
match (self, other) {
(Precision::Exact(a), Precision::Exact(b)) => a.checked_sub(*b).map_or_else(
|| Precision::Inexact(a.saturating_sub(*b)),
Precision::Exact,
),
(Precision::Inexact(a), Precision::Exact(b))
| (Precision::Exact(a), Precision::Inexact(b))
| (Precision::Inexact(a), Precision::Inexact(b)) => {
Precision::Inexact(a.saturating_sub(*b))
}
(_, _) => Precision::Absent,
}
}
pub fn multiply(&self, other: &Precision<usize>) -> Precision<usize> {
match (self, other) {
(Precision::Exact(a), Precision::Exact(b)) => a.checked_mul(*b).map_or_else(
|| Precision::Inexact(a.saturating_mul(*b)),
Precision::Exact,
),
(Precision::Inexact(a), Precision::Exact(b))
| (Precision::Exact(a), Precision::Inexact(b))
| (Precision::Inexact(a), Precision::Inexact(b)) => {
Precision::Inexact(a.saturating_mul(*b))
}
(_, _) => Precision::Absent,
}
}
pub fn with_estimated_selectivity(self, selectivity: f64) -> Self {
self.map(|v| ((v as f64 * selectivity).ceil()) as usize)
.to_inexact()
}
}
impl Precision<ScalarValue> {
fn sum_data_type(data_type: &DataType) -> DataType {
match data_type {
DataType::Int8 | DataType::Int16 | DataType::Int32 => DataType::Int64,
DataType::UInt8 | DataType::UInt16 | DataType::UInt32 => DataType::UInt64,
_ => data_type.clone(),
}
}
fn cast_scalar_to_sum_type(value: &ScalarValue) -> Result<ScalarValue> {
let source_type = value.data_type();
let target_type = Self::sum_data_type(&source_type);
if source_type == target_type {
Ok(value.clone())
} else {
value.cast_to(&target_type)
}
}
pub fn add(&self, other: &Precision<ScalarValue>) -> Precision<ScalarValue> {
match (self, other) {
(Precision::Exact(a), Precision::Exact(b)) => a
.add_checked(b)
.map(Precision::Exact)
.unwrap_or(Precision::Absent),
(Precision::Inexact(a), Precision::Exact(b))
| (Precision::Exact(a), Precision::Inexact(b))
| (Precision::Inexact(a), Precision::Inexact(b)) => a
.add_checked(b)
.map(Precision::Inexact)
.unwrap_or(Precision::Absent),
(_, _) => Precision::Absent,
}
}
pub fn cast_to_sum_type(&self) -> Precision<ScalarValue> {
match (self.is_exact(), self.get_value()) {
(Some(true), Some(value)) => Self::cast_scalar_to_sum_type(value)
.map(Precision::Exact)
.unwrap_or(Precision::Absent),
(Some(false), Some(value)) => Self::cast_scalar_to_sum_type(value)
.map(Precision::Inexact)
.unwrap_or(Precision::Absent),
(_, _) => Precision::Absent,
}
}
pub fn add_for_sum(&self, other: &Precision<ScalarValue>) -> Precision<ScalarValue> {
let mut lhs = self.cast_to_sum_type();
let rhs = other.cast_to_sum_type();
precision_add(&mut lhs, &rhs);
lhs
}
pub fn sub(&self, other: &Precision<ScalarValue>) -> Precision<ScalarValue> {
match (self, other) {
(Precision::Exact(a), Precision::Exact(b)) => {
a.sub(b).map(Precision::Exact).unwrap_or(Precision::Absent)
}
(Precision::Inexact(a), Precision::Exact(b))
| (Precision::Exact(a), Precision::Inexact(b))
| (Precision::Inexact(a), Precision::Inexact(b)) => a
.sub(b)
.map(Precision::Inexact)
.unwrap_or(Precision::Absent),
(_, _) => Precision::Absent,
}
}
pub fn multiply(&self, other: &Precision<ScalarValue>) -> Precision<ScalarValue> {
match (self, other) {
(Precision::Exact(a), Precision::Exact(b)) => a
.mul_checked(b)
.map(Precision::Exact)
.unwrap_or(Precision::Absent),
(Precision::Inexact(a), Precision::Exact(b))
| (Precision::Exact(a), Precision::Inexact(b))
| (Precision::Inexact(a), Precision::Inexact(b)) => a
.mul_checked(b)
.map(Precision::Inexact)
.unwrap_or(Precision::Absent),
(_, _) => Precision::Absent,
}
}
pub fn cast_to(&self, data_type: &DataType) -> Result<Precision<ScalarValue>> {
match self {
Precision::Exact(value) => value.cast_to(data_type).map(Precision::Exact),
Precision::Inexact(value) => value.cast_to(data_type).map(Precision::Inexact),
Precision::Absent => Ok(Precision::Absent),
}
}
}
impl<T: Debug + Clone + PartialEq + Eq + PartialOrd> Debug for Precision<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Precision::Exact(inner) => write!(f, "Exact({inner:?})"),
Precision::Inexact(inner) => write!(f, "Inexact({inner:?})"),
Precision::Absent => write!(f, "Absent"),
}
}
}
impl<T: Debug + Clone + PartialEq + Eq + PartialOrd> Display for Precision<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Precision::Exact(inner) => write!(f, "Exact({inner:?})"),
Precision::Inexact(inner) => write!(f, "Inexact({inner:?})"),
Precision::Absent => write!(f, "Absent"),
}
}
}
impl From<Precision<usize>> for Precision<ScalarValue> {
fn from(value: Precision<usize>) -> Self {
match value {
Precision::Exact(v) => Precision::Exact(ScalarValue::UInt64(Some(v as u64))),
Precision::Inexact(v) => {
Precision::Inexact(ScalarValue::UInt64(Some(v as u64)))
}
Precision::Absent => Precision::Absent,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Statistics {
pub num_rows: Precision<usize>,
pub total_byte_size: Precision<usize>,
pub column_statistics: Vec<ColumnStatistics>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum NdvFallback {
#[default]
Max,
Sum,
}
impl NdvFallback {
fn merge(self, left: usize, right: usize) -> usize {
match self {
Self::Max => usize::max(left, right),
Self::Sum => left.saturating_add(right),
}
}
}
impl Default for Statistics {
fn default() -> Self {
Self {
num_rows: Precision::Absent,
total_byte_size: Precision::Absent,
column_statistics: vec![],
}
}
}
impl Statistics {
pub fn new_unknown(schema: &Schema) -> Self {
Self {
num_rows: Precision::Absent,
total_byte_size: Precision::Absent,
column_statistics: Statistics::unknown_column(schema),
}
}
pub fn calculate_total_byte_size(&mut self, schema: &Schema) {
let mut row_size = Some(0);
for field in schema.fields() {
match field.data_type().primitive_width() {
Some(width) => {
row_size = row_size.map(|s| s + width);
}
None => {
row_size = None;
break;
}
}
}
match row_size {
None => {
self.total_byte_size = self.total_byte_size.to_inexact();
}
Some(size) => {
self.total_byte_size = self.num_rows.multiply(&Precision::Exact(size));
}
}
}
pub fn unknown_column(schema: &Schema) -> Vec<ColumnStatistics> {
schema
.fields()
.iter()
.map(|_| ColumnStatistics::new_unknown())
.collect()
}
pub fn with_num_rows(mut self, num_rows: Precision<usize>) -> Self {
self.num_rows = num_rows;
self
}
pub fn with_total_byte_size(mut self, total_byte_size: Precision<usize>) -> Self {
self.total_byte_size = total_byte_size;
self
}
pub fn add_column_statistics(mut self, column_stats: ColumnStatistics) -> Self {
self.column_statistics.push(column_stats);
self
}
pub fn to_inexact(mut self) -> Self {
self.num_rows = self.num_rows.to_inexact();
self.total_byte_size = self.total_byte_size.to_inexact();
self.column_statistics = self
.column_statistics
.into_iter()
.map(|s| s.to_inexact())
.collect();
self
}
pub fn project(self, projection: Option<&impl AsRef<[usize]>>) -> Self {
let projection = projection.map(AsRef::as_ref);
self.project_impl(projection)
}
fn project_impl(mut self, projection: Option<&[usize]>) -> Self {
let Some(projection) = projection.map(AsRef::as_ref) else {
return self;
};
#[expect(clippy::large_enum_variant)]
enum Slot {
Taken(usize),
Present(ColumnStatistics),
}
let mut columns: Vec<_> = std::mem::take(&mut self.column_statistics)
.into_iter()
.map(Slot::Present)
.collect();
for idx in projection.iter() {
let next_idx = self.column_statistics.len();
let slot = std::mem::replace(
columns.get_mut(*idx).expect("projection out of bounds"),
Slot::Taken(next_idx),
);
match slot {
Slot::Present(col) => self.column_statistics.push(col),
Slot::Taken(prev_idx) => self
.column_statistics
.push(self.column_statistics[prev_idx].clone()),
}
}
self
}
pub fn with_fetch(
mut self,
fetch: Option<usize>,
skip: usize,
n_partitions: usize,
) -> Result<Self> {
let fetch_val = fetch.unwrap_or(usize::MAX);
let num_rows_before = self.num_rows;
self.num_rows = match self {
Statistics {
num_rows: Precision::Exact(nr),
..
}
| Statistics {
num_rows: Precision::Inexact(nr),
..
} => {
if nr <= skip {
check_num_rows(Some(0), self.num_rows.is_exact().unwrap())
} else if nr <= fetch_val && skip == 0 {
return Ok(self);
} else if nr - skip <= fetch_val {
check_num_rows(
(nr - skip).checked_mul(n_partitions),
self.num_rows.is_exact().unwrap(),
)
} else {
check_num_rows(
fetch_val.checked_mul(n_partitions),
self.num_rows.is_exact().unwrap(),
)
}
}
Statistics {
num_rows: Precision::Absent,
..
} => check_num_rows(fetch.and_then(|v| v.checked_mul(n_partitions)), false),
};
let ratio: f64 = match (num_rows_before, self.num_rows) {
(
Precision::Exact(nr_before) | Precision::Inexact(nr_before),
Precision::Exact(nr_after) | Precision::Inexact(nr_after),
) => {
if nr_before == 0 {
0.0
} else {
nr_after as f64 / nr_before as f64
}
}
_ => 0.0,
};
self.column_statistics = self
.column_statistics
.into_iter()
.map(|cs| {
let mut cs = cs.to_inexact();
cs.byte_size = match cs.byte_size {
Precision::Exact(n) | Precision::Inexact(n) => {
Precision::Inexact((n as f64 * ratio) as usize)
}
Precision::Absent => Precision::Absent,
};
if let Some(&rows) = self.num_rows.get_value() {
cs.distinct_count = cs.distinct_count.min(&Precision::Inexact(rows));
}
cs
})
.collect();
let sum_scan_bytes: Option<usize> = self
.column_statistics
.iter()
.map(|cs| cs.byte_size.get_value().copied())
.try_fold(0usize, |acc, val| val.map(|v| acc + v));
self.total_byte_size = match sum_scan_bytes {
Some(sum) => Precision::Inexact(sum),
None => {
match &self.total_byte_size {
Precision::Exact(n) | Precision::Inexact(n) => {
Precision::Inexact((*n as f64 * ratio) as usize)
}
Precision::Absent => Precision::Absent,
}
}
};
Ok(self)
}
pub fn try_merge_iter<'a, I>(items: I, schema: &Schema) -> Result<Statistics>
where
I: IntoIterator<Item = &'a Statistics>,
{
Self::try_merge_iter_with_ndv_fallback(items, schema, NdvFallback::Max)
}
pub fn try_merge_iter_with_ndv_fallback<'a, I>(
items: I,
schema: &Schema,
ndv_fallback: NdvFallback,
) -> Result<Statistics>
where
I: IntoIterator<Item = &'a Statistics>,
{
let mut items = items.into_iter();
let Some(first) = items.next() else {
return Ok(Statistics::new_unknown(schema));
};
let Some(second) = items.next() else {
return Ok(first.clone());
};
let num_cols = first.column_statistics.len();
let mut num_rows = first.num_rows;
let mut total_byte_size = first.total_byte_size;
let mut column_statistics = first.column_statistics.clone();
for col_stats in &mut column_statistics {
cast_sum_value_to_sum_type_in_place(&mut col_stats.sum_value);
}
for (i, stat) in std::iter::once(second).chain(items).enumerate() {
if stat.column_statistics.len() != num_cols {
return _plan_err!(
"Cannot merge statistics with different number of columns: {} vs {} (item {})",
num_cols,
stat.column_statistics.len(),
i + 1
);
}
num_rows = num_rows.add(&stat.num_rows);
total_byte_size = total_byte_size.add(&stat.total_byte_size);
for (col_stats, item_cs) in
column_statistics.iter_mut().zip(&stat.column_statistics)
{
col_stats.null_count = col_stats.null_count.add(&item_cs.null_count);
col_stats.distinct_count = match (
col_stats.distinct_count.get_value(),
item_cs.distinct_count.get_value(),
) {
(Some(&l), Some(&r)) => Precision::Inexact(
estimate_ndv_with_overlap(col_stats, item_cs, l, r)
.unwrap_or_else(|| ndv_fallback.merge(l, r)),
),
_ => Precision::Absent,
};
precision_min(&mut col_stats.min_value, &item_cs.min_value);
precision_max(&mut col_stats.max_value, &item_cs.max_value);
precision_add_for_sum_in_place(
&mut col_stats.sum_value,
&item_cs.sum_value,
);
col_stats.byte_size = col_stats.byte_size.add(&item_cs.byte_size);
}
}
Ok(Statistics {
num_rows,
total_byte_size,
column_statistics,
})
}
}
pub fn estimate_ndv_with_overlap(
left: &ColumnStatistics,
right: &ColumnStatistics,
ndv_left: usize,
ndv_right: usize,
) -> Option<usize> {
let left_min = left.min_value.get_value()?;
let left_max = left.max_value.get_value()?;
let right_min = right.min_value.get_value()?;
let right_max = right.max_value.get_value()?;
let range_left = left_max.distance(left_min)?;
let range_right = right_max.distance(right_min)?;
if range_left == 0 || range_right == 0 {
let overlaps = left_min <= right_max && right_min <= left_max;
return Some(if overlaps {
usize::max(ndv_left, ndv_right)
} else {
ndv_left + ndv_right
});
}
let overlap_min = if left_min >= right_min {
left_min
} else {
right_min
};
let overlap_max = if left_max <= right_max {
left_max
} else {
right_max
};
if overlap_min > overlap_max {
return Some(ndv_left + ndv_right);
}
let overlap_range = overlap_max.distance(overlap_min)? as f64;
let overlap_left = overlap_range / range_left as f64;
let overlap_right = overlap_range / range_right as f64;
let intersection = f64::max(
overlap_left * ndv_left as f64,
overlap_right * ndv_right as f64,
);
let only_left = (1.0 - overlap_left) * ndv_left as f64;
let only_right = (1.0 - overlap_right) * ndv_right as f64;
Some((intersection + only_left + only_right).round() as usize)
}
#[inline]
fn precision_min<T>(lhs: &mut Precision<T>, rhs: &Precision<T>)
where
T: Debug + Clone + PartialEq + Eq + PartialOrd,
{
*lhs = match (std::mem::take(lhs), rhs) {
(Precision::Exact(left), Precision::Exact(right)) => {
if left <= *right {
Precision::Exact(left)
} else {
Precision::Exact(right.clone())
}
}
(Precision::Exact(left), Precision::Inexact(right))
| (Precision::Inexact(left), Precision::Exact(right))
| (Precision::Inexact(left), Precision::Inexact(right)) => {
if left <= *right {
Precision::Inexact(left)
} else {
Precision::Inexact(right.clone())
}
}
(_, _) => Precision::Absent,
};
}
#[inline]
fn precision_max<T>(lhs: &mut Precision<T>, rhs: &Precision<T>)
where
T: Debug + Clone + PartialEq + Eq + PartialOrd,
{
*lhs = match (std::mem::take(lhs), rhs) {
(Precision::Exact(left), Precision::Exact(right)) => {
if left >= *right {
Precision::Exact(left)
} else {
Precision::Exact(right.clone())
}
}
(Precision::Exact(left), Precision::Inexact(right))
| (Precision::Inexact(left), Precision::Exact(right))
| (Precision::Inexact(left), Precision::Inexact(right)) => {
if left >= *right {
Precision::Inexact(left)
} else {
Precision::Inexact(right.clone())
}
}
(_, _) => Precision::Absent,
};
}
#[inline]
fn cast_sum_value_to_sum_type_in_place(value: &mut Precision<ScalarValue>) {
let (is_exact, inner) = match std::mem::take(value) {
Precision::Exact(v) => (true, v),
Precision::Inexact(v) => (false, v),
Precision::Absent => return,
};
let source_type = inner.data_type();
let target_type = Precision::<ScalarValue>::sum_data_type(&source_type);
let wrap_precision_fn: fn(ScalarValue) -> Precision<ScalarValue> = if is_exact {
Precision::Exact
} else {
Precision::Inexact
};
*value = if source_type == target_type {
wrap_precision_fn(inner)
} else {
inner
.cast_to(&target_type)
.map(wrap_precision_fn)
.unwrap_or(Precision::Absent)
};
}
#[inline]
fn precision_add_for_sum_in_place(
lhs: &mut Precision<ScalarValue>,
rhs: &Precision<ScalarValue>,
) {
let (value, wrap_fn): (&ScalarValue, fn(ScalarValue) -> Precision<ScalarValue>) =
match rhs {
Precision::Exact(v) => (v, Precision::Exact),
Precision::Inexact(v) => (v, Precision::Inexact),
Precision::Absent => {
*lhs = Precision::Absent;
return;
}
};
let source_type = value.data_type();
let target_type = Precision::<ScalarValue>::sum_data_type(&source_type);
if source_type == target_type {
precision_add(lhs, rhs);
} else {
let rhs = value
.cast_to(&target_type)
.map(wrap_fn)
.unwrap_or(Precision::Absent);
precision_add(lhs, &rhs);
}
}
fn check_num_rows(value: Option<usize>, is_exact: bool) -> Precision<usize> {
if let Some(value) = value {
if is_exact {
Precision::Exact(value)
} else {
Precision::Inexact(value)
}
} else {
Precision::Absent
}
}
impl Display for Statistics {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let column_stats = self
.column_statistics
.iter()
.enumerate()
.map(|(i, cs)| {
let s = format!("(Col[{i}]:");
let s = if cs.min_value != Precision::Absent {
format!("{} Min={}", s, cs.min_value)
} else {
s
};
let s = if cs.max_value != Precision::Absent {
format!("{} Max={}", s, cs.max_value)
} else {
s
};
let s = if cs.sum_value != Precision::Absent {
format!("{} Sum={}", s, cs.sum_value)
} else {
s
};
let s = if cs.null_count != Precision::Absent {
format!("{} Null={}", s, cs.null_count)
} else {
s
};
let s = if cs.distinct_count != Precision::Absent {
format!("{} Distinct={}", s, cs.distinct_count)
} else {
s
};
let s = if cs.byte_size != Precision::Absent {
format!("{} ScanBytes={}", s, cs.byte_size)
} else {
s
};
s + ")"
})
.collect::<Vec<_>>()
.join(",");
write!(
f,
"Rows={}, Bytes={}, [{}]",
self.num_rows, self.total_byte_size, column_stats
)?;
Ok(())
}
}
#[derive(Clone, Debug, PartialEq, Eq, Default)]
pub struct ColumnStatistics {
pub null_count: Precision<usize>,
pub max_value: Precision<ScalarValue>,
pub min_value: Precision<ScalarValue>,
pub sum_value: Precision<ScalarValue>,
pub distinct_count: Precision<usize>,
pub byte_size: Precision<usize>,
}
impl ColumnStatistics {
pub fn is_singleton(&self) -> bool {
match (&self.min_value, &self.max_value) {
(Precision::Exact(min), Precision::Exact(max)) => {
!min.is_null() && !max.is_null() && (min == max)
}
(_, _) => false,
}
}
pub fn new_unknown() -> Self {
Self {
null_count: Precision::Absent,
max_value: Precision::Absent,
min_value: Precision::Absent,
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
byte_size: Precision::Absent,
}
}
pub fn with_null_count(mut self, null_count: Precision<usize>) -> Self {
self.null_count = null_count;
self
}
pub fn with_max_value(mut self, max_value: Precision<ScalarValue>) -> Self {
self.max_value = max_value;
self
}
pub fn with_min_value(mut self, min_value: Precision<ScalarValue>) -> Self {
self.min_value = min_value;
self
}
pub fn with_sum_value(mut self, sum_value: Precision<ScalarValue>) -> Self {
self.sum_value = match sum_value {
Precision::Exact(value) => {
Precision::<ScalarValue>::cast_scalar_to_sum_type(&value)
.map(Precision::Exact)
.unwrap_or(Precision::Absent)
}
Precision::Inexact(value) => {
Precision::<ScalarValue>::cast_scalar_to_sum_type(&value)
.map(Precision::Inexact)
.unwrap_or(Precision::Absent)
}
Precision::Absent => Precision::Absent,
};
self
}
pub fn with_distinct_count(mut self, distinct_count: Precision<usize>) -> Self {
self.distinct_count = distinct_count;
self
}
pub fn with_byte_size(mut self, byte_size: Precision<usize>) -> Self {
self.byte_size = byte_size;
self
}
pub fn to_inexact(mut self) -> Self {
self.null_count = self.null_count.to_inexact();
self.max_value = self.max_value.to_inexact();
self.min_value = self.min_value.to_inexact();
self.sum_value = self.sum_value.to_inexact();
self.distinct_count = self.distinct_count.to_inexact();
self.byte_size = self.byte_size.to_inexact();
self
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::assert_contains;
use arrow::datatypes::Field;
use std::sync::Arc;
#[test]
fn test_get_value() {
let exact_precision = Precision::Exact(42);
let inexact_precision = Precision::Inexact(23);
let absent_precision = Precision::<i32>::Absent;
assert_eq!(*exact_precision.get_value().unwrap(), 42);
assert_eq!(*inexact_precision.get_value().unwrap(), 23);
assert_eq!(absent_precision.get_value(), None);
}
#[test]
fn test_map() {
let exact_precision = Precision::Exact(42);
let inexact_precision = Precision::Inexact(23);
let absent_precision = Precision::Absent;
let squared = |x| x * x;
assert_eq!(exact_precision.map(squared), Precision::Exact(1764));
assert_eq!(inexact_precision.map(squared), Precision::Inexact(529));
assert_eq!(absent_precision.map(squared), Precision::Absent);
}
#[test]
fn test_is_exact() {
let exact_precision = Precision::Exact(42);
let inexact_precision = Precision::Inexact(23);
let absent_precision = Precision::<i32>::Absent;
assert_eq!(exact_precision.is_exact(), Some(true));
assert_eq!(inexact_precision.is_exact(), Some(false));
assert_eq!(absent_precision.is_exact(), None);
}
#[test]
fn test_max() {
let precision1 = Precision::Exact(42);
let precision2 = Precision::Inexact(23);
let precision3 = Precision::Exact(30);
let absent_precision = Precision::Absent;
assert_eq!(precision1.max(&precision2), Precision::Inexact(42));
assert_eq!(precision1.max(&precision3), Precision::Exact(42));
assert_eq!(precision2.max(&precision3), Precision::Inexact(30));
assert_eq!(precision1.max(&absent_precision), Precision::Absent);
}
#[test]
fn test_min() {
let precision1 = Precision::Exact(42);
let precision2 = Precision::Inexact(23);
let precision3 = Precision::Exact(30);
let absent_precision = Precision::Absent;
assert_eq!(precision1.min(&precision2), Precision::Inexact(23));
assert_eq!(precision1.min(&precision3), Precision::Exact(30));
assert_eq!(precision2.min(&precision3), Precision::Inexact(23));
assert_eq!(precision1.min(&absent_precision), Precision::Absent);
}
#[test]
fn test_to_inexact() {
let exact_precision = Precision::Exact(42);
let inexact_precision = Precision::Inexact(42);
let absent_precision = Precision::<i32>::Absent;
assert_eq!(exact_precision.to_inexact(), inexact_precision);
assert_eq!(inexact_precision.to_inexact(), inexact_precision);
assert_eq!(absent_precision.to_inexact(), absent_precision);
}
#[test]
fn test_add() {
let precision1 = Precision::Exact(42);
let precision2 = Precision::Inexact(23);
let precision3 = Precision::Exact(30);
let absent_precision = Precision::Absent;
let precision_max_exact = Precision::Exact(usize::MAX);
let precision_max_inexact = Precision::Exact(usize::MAX);
assert_eq!(precision1.add(&precision2), Precision::Inexact(65));
assert_eq!(precision1.add(&precision3), Precision::Exact(72));
assert_eq!(precision2.add(&precision3), Precision::Inexact(53));
assert_eq!(precision1.add(&absent_precision), Precision::Absent);
assert_eq!(
precision_max_exact.add(&precision1),
Precision::Inexact(usize::MAX)
);
assert_eq!(
precision_max_inexact.add(&precision1),
Precision::Inexact(usize::MAX)
);
}
#[test]
fn test_add_scalar() {
let precision = Precision::Exact(ScalarValue::Int32(Some(42)));
assert_eq!(
precision.add(&Precision::Exact(ScalarValue::Int32(Some(23)))),
Precision::Exact(ScalarValue::Int32(Some(65))),
);
assert_eq!(
precision.add(&Precision::Inexact(ScalarValue::Int32(Some(23)))),
Precision::Inexact(ScalarValue::Int32(Some(65))),
);
assert_eq!(
precision.add(&Precision::Exact(ScalarValue::Int32(None))),
Precision::Exact(ScalarValue::Int32(None)),
);
assert_eq!(precision.add(&Precision::Absent), Precision::Absent);
}
#[test]
fn test_add_for_sum_scalar_integer_widening() {
let precision = Precision::Exact(ScalarValue::Int32(Some(42)));
assert_eq!(
precision.add_for_sum(&Precision::Exact(ScalarValue::Int32(Some(23)))),
Precision::Exact(ScalarValue::Int64(Some(65))),
);
assert_eq!(
precision.add_for_sum(&Precision::Inexact(ScalarValue::Int32(Some(23)))),
Precision::Inexact(ScalarValue::Int64(Some(65))),
);
}
#[test]
fn test_add_for_sum_prevents_int32_overflow() {
let lhs = Precision::Exact(ScalarValue::Int32(Some(i32::MAX)));
let rhs = Precision::Exact(ScalarValue::Int32(Some(1)));
assert_eq!(
lhs.add_for_sum(&rhs),
Precision::Exact(ScalarValue::Int64(Some(i64::from(i32::MAX) + 1))),
);
}
#[test]
fn test_add_for_sum_scalar_unsigned_integer_widening() {
let precision = Precision::Exact(ScalarValue::UInt32(Some(42)));
assert_eq!(
precision.add_for_sum(&Precision::Exact(ScalarValue::UInt32(Some(23)))),
Precision::Exact(ScalarValue::UInt64(Some(65))),
);
assert_eq!(
precision.add_for_sum(&Precision::Inexact(ScalarValue::UInt32(Some(23)))),
Precision::Inexact(ScalarValue::UInt64(Some(65))),
);
}
#[test]
fn test_sub() {
let precision1 = Precision::Exact(42);
let precision2 = Precision::Inexact(23);
let precision3 = Precision::Exact(30);
let absent_precision = Precision::Absent;
assert_eq!(precision1.sub(&precision2), Precision::Inexact(19));
assert_eq!(precision1.sub(&precision3), Precision::Exact(12));
assert_eq!(precision2.sub(&precision1), Precision::Inexact(0));
assert_eq!(precision3.sub(&precision1), Precision::Inexact(0));
assert_eq!(precision1.sub(&absent_precision), Precision::Absent);
}
#[test]
fn test_sub_scalar() {
let precision = Precision::Exact(ScalarValue::Int32(Some(42)));
assert_eq!(
precision.sub(&Precision::Exact(ScalarValue::Int32(Some(23)))),
Precision::Exact(ScalarValue::Int32(Some(19))),
);
assert_eq!(
precision.sub(&Precision::Inexact(ScalarValue::Int32(Some(23)))),
Precision::Inexact(ScalarValue::Int32(Some(19))),
);
assert_eq!(
precision.sub(&Precision::Exact(ScalarValue::Int32(None))),
Precision::Exact(ScalarValue::Int32(None)),
);
assert_eq!(precision.sub(&Precision::Absent), Precision::Absent);
}
#[test]
fn test_multiply() {
let precision1 = Precision::Exact(6);
let precision2 = Precision::Inexact(3);
let precision3 = Precision::Exact(5);
let precision_max_exact = Precision::Exact(usize::MAX);
let precision_max_inexact = Precision::Exact(usize::MAX);
let absent_precision = Precision::Absent;
assert_eq!(precision1.multiply(&precision2), Precision::Inexact(18));
assert_eq!(precision1.multiply(&precision3), Precision::Exact(30));
assert_eq!(precision2.multiply(&precision3), Precision::Inexact(15));
assert_eq!(precision1.multiply(&absent_precision), Precision::Absent);
assert_eq!(
precision_max_exact.multiply(&precision1),
Precision::Inexact(usize::MAX)
);
assert_eq!(
precision_max_inexact.multiply(&precision1),
Precision::Inexact(usize::MAX)
);
}
#[test]
fn test_multiply_scalar() {
let precision = Precision::Exact(ScalarValue::Int32(Some(6)));
assert_eq!(
precision.multiply(&Precision::Exact(ScalarValue::Int32(Some(5)))),
Precision::Exact(ScalarValue::Int32(Some(30))),
);
assert_eq!(
precision.multiply(&Precision::Inexact(ScalarValue::Int32(Some(5)))),
Precision::Inexact(ScalarValue::Int32(Some(30))),
);
assert_eq!(
precision.multiply(&Precision::Exact(ScalarValue::Int32(None))),
Precision::Exact(ScalarValue::Int32(None)),
);
assert_eq!(precision.multiply(&Precision::Absent), Precision::Absent);
}
#[test]
fn test_cast_to() {
assert_eq!(
Precision::Exact(ScalarValue::Int32(Some(42)))
.cast_to(&DataType::Int64)
.unwrap(),
Precision::Exact(ScalarValue::Int64(Some(42))),
);
assert_eq!(
Precision::Inexact(ScalarValue::Int32(Some(42)))
.cast_to(&DataType::Int64)
.unwrap(),
Precision::Inexact(ScalarValue::Int64(Some(42))),
);
assert_eq!(
Precision::Exact(ScalarValue::Int32(None))
.cast_to(&DataType::Int64)
.unwrap(),
Precision::Exact(ScalarValue::Int64(None)),
);
assert!(
Precision::Exact(ScalarValue::Int32(Some(256)))
.cast_to(&DataType::Int8)
.is_err()
);
}
#[test]
fn test_precision_cloning() {
let precision: Precision<usize> = Precision::Exact(42);
let p2 = precision;
assert_eq!(precision, p2);
let precision: Precision<ScalarValue> =
Precision::Exact(ScalarValue::Int64(Some(42)));
let p2 = precision.clone();
assert_eq!(precision, p2);
}
#[test]
fn test_project_none() {
let projection: Option<Vec<usize>> = None;
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
assert_eq!(stats, make_stats(vec![10, 20, 30]));
}
#[test]
fn test_project_empty() {
let projection = Some(vec![]);
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
assert_eq!(stats, make_stats(vec![]));
}
#[test]
fn test_project_swap() {
let projection = Some(vec![2, 1]);
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
assert_eq!(stats, make_stats(vec![30, 20]));
}
#[test]
fn test_project_repeated() {
let projection = Some(vec![1, 2, 1, 1, 0, 2]);
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
assert_eq!(stats, make_stats(vec![20, 30, 20, 20, 10, 30]));
}
fn make_stats(counts: impl IntoIterator<Item = usize>) -> Statistics {
Statistics {
num_rows: Precision::Exact(42),
total_byte_size: Precision::Exact(500),
column_statistics: counts.into_iter().map(col_stats_i64).collect(),
}
}
fn col_stats_i64(null_count: usize) -> ColumnStatistics {
ColumnStatistics {
null_count: Precision::Exact(null_count),
max_value: Precision::Exact(ScalarValue::Int64(Some(42))),
min_value: Precision::Exact(ScalarValue::Int64(Some(64))),
sum_value: Precision::Exact(ScalarValue::Int64(Some(4600))),
distinct_count: Precision::Exact(100),
byte_size: Precision::Exact(800),
}
}
fn make_single_i64_ndv_stats(
distinct_count: Precision<usize>,
min_value: Option<i64>,
max_value: Option<i64>,
) -> Statistics {
let to_precision = |value| Precision::Exact(ScalarValue::Int64(Some(value)));
Statistics::default()
.with_num_rows(Precision::Exact(10))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_distinct_count(distinct_count)
.with_min_value(
min_value.map(to_precision).unwrap_or(Precision::Absent),
)
.with_max_value(
max_value.map(to_precision).unwrap_or(Precision::Absent),
),
)
}
fn merge_single_i64_ndv_distinct_count(
left: Statistics,
right: Statistics,
ndv_fallback: NdvFallback,
) -> Precision<usize> {
let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
Statistics::try_merge_iter_with_ndv_fallback(
[&left, &right],
&schema,
ndv_fallback,
)
.unwrap()
.column_statistics[0]
.distinct_count
}
#[test]
fn test_try_merge() {
let schema = Arc::new(Schema::new(vec![
Field::new("col1", DataType::Int32, false),
Field::new("col2", DataType::Int32, false),
]));
let stats1 = Statistics {
num_rows: Precision::Exact(10),
total_byte_size: Precision::Exact(100),
column_statistics: vec![
ColumnStatistics {
null_count: Precision::Exact(1),
max_value: Precision::Exact(ScalarValue::Int32(Some(100))),
min_value: Precision::Exact(ScalarValue::Int32(Some(1))),
sum_value: Precision::Exact(ScalarValue::Int32(Some(500))),
distinct_count: Precision::Absent,
byte_size: Precision::Exact(40),
},
ColumnStatistics {
null_count: Precision::Exact(2),
max_value: Precision::Exact(ScalarValue::Int32(Some(200))),
min_value: Precision::Exact(ScalarValue::Int32(Some(10))),
sum_value: Precision::Exact(ScalarValue::Int32(Some(1000))),
distinct_count: Precision::Absent,
byte_size: Precision::Exact(40),
},
],
};
let stats2 = Statistics {
num_rows: Precision::Exact(15),
total_byte_size: Precision::Exact(150),
column_statistics: vec![
ColumnStatistics {
null_count: Precision::Exact(2),
max_value: Precision::Exact(ScalarValue::Int32(Some(120))),
min_value: Precision::Exact(ScalarValue::Int32(Some(-10))),
sum_value: Precision::Exact(ScalarValue::Int32(Some(600))),
distinct_count: Precision::Absent,
byte_size: Precision::Exact(60),
},
ColumnStatistics {
null_count: Precision::Exact(3),
max_value: Precision::Exact(ScalarValue::Int32(Some(180))),
min_value: Precision::Exact(ScalarValue::Int32(Some(5))),
sum_value: Precision::Exact(ScalarValue::Int32(Some(1200))),
distinct_count: Precision::Absent,
byte_size: Precision::Exact(60),
},
],
};
let items = vec![stats1, stats2];
let summary_stats = Statistics::try_merge_iter(&items, &schema).unwrap();
assert_eq!(summary_stats.num_rows, Precision::Exact(25)); assert_eq!(summary_stats.total_byte_size, Precision::Exact(250));
let col1_stats = &summary_stats.column_statistics[0];
assert_eq!(col1_stats.null_count, Precision::Exact(3)); assert_eq!(
col1_stats.max_value,
Precision::Exact(ScalarValue::Int32(Some(120)))
);
assert_eq!(
col1_stats.min_value,
Precision::Exact(ScalarValue::Int32(Some(-10)))
);
assert_eq!(
col1_stats.sum_value,
Precision::Exact(ScalarValue::Int64(Some(1100)))
);
let col2_stats = &summary_stats.column_statistics[1];
assert_eq!(col2_stats.null_count, Precision::Exact(5)); assert_eq!(
col2_stats.max_value,
Precision::Exact(ScalarValue::Int32(Some(200)))
);
assert_eq!(
col2_stats.min_value,
Precision::Exact(ScalarValue::Int32(Some(5)))
);
assert_eq!(
col2_stats.sum_value,
Precision::Exact(ScalarValue::Int64(Some(2200)))
); }
#[test]
fn test_try_merge_mixed_precision() {
let schema = Arc::new(Schema::new(vec![Field::new(
"col1",
DataType::Int32,
false,
)]));
let stats1 = Statistics {
num_rows: Precision::Exact(10),
total_byte_size: Precision::Inexact(100),
column_statistics: vec![ColumnStatistics {
null_count: Precision::Exact(1),
max_value: Precision::Exact(ScalarValue::Int32(Some(100))),
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
sum_value: Precision::Exact(ScalarValue::Int32(Some(500))),
distinct_count: Precision::Absent,
byte_size: Precision::Exact(40),
}],
};
let stats2 = Statistics {
num_rows: Precision::Inexact(15),
total_byte_size: Precision::Exact(150),
column_statistics: vec![ColumnStatistics {
null_count: Precision::Inexact(2),
max_value: Precision::Inexact(ScalarValue::Int32(Some(120))),
min_value: Precision::Exact(ScalarValue::Int32(Some(-10))),
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
byte_size: Precision::Inexact(60),
}],
};
let items = vec![stats1, stats2];
let summary_stats = Statistics::try_merge_iter(&items, &schema).unwrap();
assert_eq!(summary_stats.num_rows, Precision::Inexact(25));
assert_eq!(summary_stats.total_byte_size, Precision::Inexact(250));
let col_stats = &summary_stats.column_statistics[0];
assert_eq!(col_stats.null_count, Precision::Inexact(3));
assert_eq!(
col_stats.max_value,
Precision::Inexact(ScalarValue::Int32(Some(120)))
);
assert_eq!(
col_stats.min_value,
Precision::Inexact(ScalarValue::Int32(Some(-10)))
);
assert_eq!(col_stats.sum_value, Precision::Absent);
}
#[test]
fn test_try_merge_empty() {
let schema = Arc::new(Schema::new(vec![Field::new(
"col1",
DataType::Int32,
false,
)]));
let items: Vec<Statistics> = vec![];
let summary_stats = Statistics::try_merge_iter(&items, &schema).unwrap();
assert_eq!(summary_stats.num_rows, Precision::Absent);
assert_eq!(summary_stats.total_byte_size, Precision::Absent);
assert_eq!(summary_stats.column_statistics.len(), 1);
assert_eq!(
summary_stats.column_statistics[0].null_count,
Precision::Absent
);
}
#[test]
fn test_try_merge_mismatched_size() {
let schema = Arc::new(Schema::new(vec![Field::new(
"col1",
DataType::Int32,
false,
)]));
let stats1 = Statistics::default();
let stats2 =
Statistics::default().add_column_statistics(ColumnStatistics::new_unknown());
let items = vec![stats1, stats2];
let e = Statistics::try_merge_iter(&items, &schema).unwrap_err();
assert_contains!(
e.to_string(),
"Error during planning: Cannot merge statistics with different number of columns: 0 vs 1"
);
}
#[test]
fn test_try_merge_distinct_count_absent() {
let stats1 = Statistics::default()
.with_num_rows(Precision::Exact(10))
.with_total_byte_size(Precision::Exact(100))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_null_count(Precision::Exact(0))
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(1))))
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(10))))
.with_distinct_count(Precision::Exact(5)),
);
let stats2 = Statistics::default()
.with_num_rows(Precision::Exact(15))
.with_total_byte_size(Precision::Exact(150))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_null_count(Precision::Exact(0))
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(5))))
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(20))))
.with_distinct_count(Precision::Exact(7)),
);
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let merged_stats =
Statistics::try_merge_iter([&stats1, &stats2], &schema).unwrap();
assert_eq!(merged_stats.num_rows, Precision::Exact(25));
assert_eq!(merged_stats.total_byte_size, Precision::Exact(250));
let col_stats = &merged_stats.column_statistics[0];
assert_eq!(col_stats.null_count, Precision::Exact(0));
assert_eq!(
col_stats.min_value,
Precision::Exact(ScalarValue::Int32(Some(1)))
);
assert_eq!(
col_stats.max_value,
Precision::Exact(ScalarValue::Int32(Some(20)))
);
assert_eq!(col_stats.distinct_count, Precision::Inexact(10));
}
#[test]
fn test_try_merge_ndv_disjoint_ranges() {
let stats1 = Statistics::default()
.with_num_rows(Precision::Exact(10))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(0))))
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(10))))
.with_distinct_count(Precision::Exact(5)),
);
let stats2 = Statistics::default()
.with_num_rows(Precision::Exact(10))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(20))))
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(30))))
.with_distinct_count(Precision::Exact(8)),
);
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let merged = Statistics::try_merge_iter([&stats1, &stats2], &schema).unwrap();
assert_eq!(
merged.column_statistics[0].distinct_count,
Precision::Inexact(13)
);
}
#[test]
fn test_try_merge_ndv_identical_ranges() {
let stats1 = Statistics::default()
.with_num_rows(Precision::Exact(100))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(0))))
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(100))))
.with_distinct_count(Precision::Exact(50)),
);
let stats2 = Statistics::default()
.with_num_rows(Precision::Exact(100))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(0))))
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(100))))
.with_distinct_count(Precision::Exact(30)),
);
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let merged = Statistics::try_merge_iter([&stats1, &stats2], &schema).unwrap();
assert_eq!(
merged.column_statistics[0].distinct_count,
Precision::Inexact(50)
);
}
#[test]
fn test_try_merge_ndv_partial_overlap() {
let stats1 = Statistics::default()
.with_num_rows(Precision::Exact(100))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(0))))
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(100))))
.with_distinct_count(Precision::Exact(80)),
);
let stats2 = Statistics::default()
.with_num_rows(Precision::Exact(100))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(50))))
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(150))))
.with_distinct_count(Precision::Exact(60)),
);
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let merged = Statistics::try_merge_iter([&stats1, &stats2], &schema).unwrap();
assert_eq!(
merged.column_statistics[0].distinct_count,
Precision::Inexact(110)
);
}
#[test]
fn test_try_merge_ndv_missing_min_max() {
let stats1 = Statistics::default()
.with_num_rows(Precision::Exact(10))
.add_column_statistics(
ColumnStatistics::new_unknown().with_distinct_count(Precision::Exact(5)),
);
let stats2 = Statistics::default()
.with_num_rows(Precision::Exact(10))
.add_column_statistics(
ColumnStatistics::new_unknown().with_distinct_count(Precision::Exact(8)),
);
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let merged = Statistics::try_merge_iter([&stats1, &stats2], &schema).unwrap();
assert_eq!(
merged.column_statistics[0].distinct_count,
Precision::Inexact(8)
);
}
#[test]
fn test_try_merge_ndv_non_numeric_types() {
let stats1 = Statistics::default()
.with_num_rows(Precision::Exact(10))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Utf8(Some(
"aaa".to_string(),
))))
.with_max_value(Precision::Exact(ScalarValue::Utf8(Some(
"zzz".to_string(),
))))
.with_distinct_count(Precision::Exact(5)),
);
let stats2 = Statistics::default()
.with_num_rows(Precision::Exact(10))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Utf8(Some(
"bbb".to_string(),
))))
.with_max_value(Precision::Exact(ScalarValue::Utf8(Some(
"yyy".to_string(),
))))
.with_distinct_count(Precision::Exact(8)),
);
let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
let merged = Statistics::try_merge_iter([&stats1, &stats2], &schema).unwrap();
assert_eq!(
merged.column_statistics[0].distinct_count,
Precision::Inexact(8)
);
}
#[test]
fn test_try_merge_ndv_non_numeric_types_sum_fallback() {
let stats1 = Statistics::default()
.with_num_rows(Precision::Exact(10))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Utf8(Some(
"aaa".to_string(),
))))
.with_max_value(Precision::Exact(ScalarValue::Utf8(Some(
"zzz".to_string(),
))))
.with_distinct_count(Precision::Exact(5)),
);
let stats2 = Statistics::default()
.with_num_rows(Precision::Exact(10))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Utf8(Some(
"bbb".to_string(),
))))
.with_max_value(Precision::Exact(ScalarValue::Utf8(Some(
"yyy".to_string(),
))))
.with_distinct_count(Precision::Exact(8)),
);
let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
let merged = Statistics::try_merge_iter_with_ndv_fallback(
[&stats1, &stats2],
&schema,
NdvFallback::Sum,
)
.unwrap();
assert_eq!(
merged.column_statistics[0].distinct_count,
Precision::Inexact(13)
);
}
#[test]
fn test_try_merge_ndv_constant_columns() {
let stats1 = Statistics::default()
.with_num_rows(Precision::Exact(10))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(5))))
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(5))))
.with_distinct_count(Precision::Exact(1)),
);
let stats2 = Statistics::default()
.with_num_rows(Precision::Exact(10))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(5))))
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(5))))
.with_distinct_count(Precision::Exact(1)),
);
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let merged = Statistics::try_merge_iter([&stats1, &stats2], &schema).unwrap();
assert_eq!(
merged.column_statistics[0].distinct_count,
Precision::Inexact(1)
);
let stats3 = Statistics::default()
.with_num_rows(Precision::Exact(10))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(5))))
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(5))))
.with_distinct_count(Precision::Exact(1)),
);
let stats4 = Statistics::default()
.with_num_rows(Precision::Exact(10))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(10))))
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(10))))
.with_distinct_count(Precision::Exact(1)),
);
let merged = Statistics::try_merge_iter([&stats3, &stats4], &schema).unwrap();
assert_eq!(
merged.column_statistics[0].distinct_count,
Precision::Inexact(2)
);
}
#[test]
fn test_try_merge_ndv_original_union_edge_cases() {
struct NdvTestCase {
name: &'static str,
left_ndv: Precision<usize>,
left_min: Option<i64>,
left_max: Option<i64>,
right_ndv: Precision<usize>,
right_min: Option<i64>,
right_max: Option<i64>,
expected: Precision<usize>,
}
let cases = vec![
NdvTestCase {
name: "disjoint ranges",
left_ndv: Precision::Exact(5),
left_min: Some(0),
left_max: Some(10),
right_ndv: Precision::Exact(3),
right_min: Some(20),
right_max: Some(30),
expected: Precision::Inexact(8),
},
NdvTestCase {
name: "identical ranges",
left_ndv: Precision::Exact(10),
left_min: Some(0),
left_max: Some(100),
right_ndv: Precision::Exact(8),
right_min: Some(0),
right_max: Some(100),
expected: Precision::Inexact(10),
},
NdvTestCase {
name: "partial overlap",
left_ndv: Precision::Exact(100),
left_min: Some(0),
left_max: Some(100),
right_ndv: Precision::Exact(50),
right_min: Some(50),
right_max: Some(150),
expected: Precision::Inexact(125),
},
NdvTestCase {
name: "right contained in left",
left_ndv: Precision::Exact(100),
left_min: Some(0),
left_max: Some(100),
right_ndv: Precision::Exact(50),
right_min: Some(25),
right_max: Some(75),
expected: Precision::Inexact(100),
},
NdvTestCase {
name: "same constant value",
left_ndv: Precision::Exact(1),
left_min: Some(5),
left_max: Some(5),
right_ndv: Precision::Exact(1),
right_min: Some(5),
right_max: Some(5),
expected: Precision::Inexact(1),
},
NdvTestCase {
name: "different constant values",
left_ndv: Precision::Exact(1),
left_min: Some(5),
left_max: Some(5),
right_ndv: Precision::Exact(1),
right_min: Some(10),
right_max: Some(10),
expected: Precision::Inexact(2),
},
NdvTestCase {
name: "left constant within right range",
left_ndv: Precision::Exact(1),
left_min: Some(5),
left_max: Some(5),
right_ndv: Precision::Exact(10),
right_min: Some(0),
right_max: Some(10),
expected: Precision::Inexact(10),
},
NdvTestCase {
name: "left constant outside right range",
left_ndv: Precision::Exact(1),
left_min: Some(20),
left_max: Some(20),
right_ndv: Precision::Exact(10),
right_min: Some(0),
right_max: Some(10),
expected: Precision::Inexact(11),
},
NdvTestCase {
name: "right constant within left range",
left_ndv: Precision::Exact(10),
left_min: Some(0),
left_max: Some(10),
right_ndv: Precision::Exact(1),
right_min: Some(5),
right_max: Some(5),
expected: Precision::Inexact(10),
},
NdvTestCase {
name: "right constant outside left range",
left_ndv: Precision::Exact(10),
left_min: Some(0),
left_max: Some(10),
right_ndv: Precision::Exact(1),
right_min: Some(20),
right_max: Some(20),
expected: Precision::Inexact(11),
},
NdvTestCase {
name: "missing bounds exact plus exact",
left_ndv: Precision::Exact(10),
left_min: None,
left_max: None,
right_ndv: Precision::Exact(5),
right_min: None,
right_max: None,
expected: Precision::Inexact(15),
},
NdvTestCase {
name: "missing bounds exact plus inexact",
left_ndv: Precision::Exact(10),
left_min: None,
left_max: None,
right_ndv: Precision::Inexact(5),
right_min: None,
right_max: None,
expected: Precision::Inexact(15),
},
NdvTestCase {
name: "missing bounds inexact plus inexact",
left_ndv: Precision::Inexact(7),
left_min: None,
left_max: None,
right_ndv: Precision::Inexact(3),
right_min: None,
right_max: None,
expected: Precision::Inexact(10),
},
NdvTestCase {
name: "exact plus absent",
left_ndv: Precision::Exact(10),
left_min: None,
left_max: None,
right_ndv: Precision::Absent,
right_min: None,
right_max: None,
expected: Precision::Absent,
},
NdvTestCase {
name: "inexact plus absent",
left_ndv: Precision::Inexact(4),
left_min: None,
left_max: None,
right_ndv: Precision::Absent,
right_min: None,
right_max: None,
expected: Precision::Absent,
},
];
for case in cases {
let actual = merge_single_i64_ndv_distinct_count(
make_single_i64_ndv_stats(case.left_ndv, case.left_min, case.left_max),
make_single_i64_ndv_stats(case.right_ndv, case.right_min, case.right_max),
NdvFallback::Sum,
);
assert_eq!(actual, case.expected, "case {} failed", case.name);
}
}
#[test]
fn test_with_fetch_basic_preservation() {
let original_stats = Statistics {
num_rows: Precision::Exact(1000),
total_byte_size: Precision::Exact(8000),
column_statistics: vec![
ColumnStatistics {
null_count: Precision::Exact(10),
max_value: Precision::Exact(ScalarValue::Int32(Some(100))),
min_value: Precision::Exact(ScalarValue::Int32(Some(0))),
sum_value: Precision::Exact(ScalarValue::Int32(Some(5050))),
distinct_count: Precision::Exact(50),
byte_size: Precision::Exact(4000),
},
ColumnStatistics {
null_count: Precision::Exact(20),
max_value: Precision::Exact(ScalarValue::Int64(Some(200))),
min_value: Precision::Exact(ScalarValue::Int64(Some(10))),
sum_value: Precision::Exact(ScalarValue::Int64(Some(10100))),
distinct_count: Precision::Exact(75),
byte_size: Precision::Exact(8000),
},
],
};
let result = original_stats.clone().with_fetch(Some(100), 0, 1).unwrap();
assert_eq!(result.num_rows, Precision::Exact(100));
assert_eq!(result.total_byte_size, Precision::Inexact(1200));
assert_eq!(result.column_statistics.len(), 2);
assert_eq!(
result.column_statistics[0].null_count,
Precision::Inexact(10)
);
assert_eq!(
result.column_statistics[0].max_value,
Precision::Inexact(ScalarValue::Int32(Some(100)))
);
assert_eq!(
result.column_statistics[0].min_value,
Precision::Inexact(ScalarValue::Int32(Some(0)))
);
assert_eq!(
result.column_statistics[0].sum_value,
Precision::Inexact(ScalarValue::Int32(Some(5050)))
);
assert_eq!(
result.column_statistics[0].distinct_count,
Precision::Inexact(50)
);
assert_eq!(
result.column_statistics[1].null_count,
Precision::Inexact(20)
);
assert_eq!(
result.column_statistics[1].max_value,
Precision::Inexact(ScalarValue::Int64(Some(200)))
);
assert_eq!(
result.column_statistics[1].min_value,
Precision::Inexact(ScalarValue::Int64(Some(10)))
);
assert_eq!(
result.column_statistics[1].sum_value,
Precision::Inexact(ScalarValue::Int64(Some(10100)))
);
assert_eq!(
result.column_statistics[1].distinct_count,
Precision::Inexact(75)
);
}
#[test]
fn test_with_fetch_inexact_input() {
let original_stats = Statistics {
num_rows: Precision::Inexact(1000),
total_byte_size: Precision::Inexact(8000),
column_statistics: vec![ColumnStatistics {
null_count: Precision::Inexact(10),
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
min_value: Precision::Inexact(ScalarValue::Int32(Some(0))),
sum_value: Precision::Inexact(ScalarValue::Int32(Some(5050))),
distinct_count: Precision::Inexact(50),
byte_size: Precision::Inexact(4000),
}],
};
let result = original_stats.clone().with_fetch(Some(500), 0, 1).unwrap();
assert_eq!(result.num_rows, Precision::Inexact(500));
assert_eq!(result.total_byte_size, Precision::Inexact(2000));
assert_eq!(
result.column_statistics[0].null_count,
Precision::Inexact(10)
);
}
#[test]
fn test_with_fetch_skip_all_rows() {
let original_stats = Statistics {
num_rows: Precision::Exact(100),
total_byte_size: Precision::Exact(800),
column_statistics: vec![col_stats_i64(10)],
};
let result = original_stats.clone().with_fetch(Some(50), 100, 1).unwrap();
assert_eq!(result.num_rows, Precision::Exact(0));
assert_eq!(result.total_byte_size, Precision::Inexact(0));
}
#[test]
fn test_with_fetch_skip_all_rows_inexact() {
let original_stats = Statistics {
num_rows: Precision::Inexact(0),
total_byte_size: Precision::Inexact(0),
column_statistics: vec![col_stats_i64(10)],
};
let result = original_stats.clone().with_fetch(None, 0, 1).unwrap();
assert_eq!(result.num_rows, Precision::Inexact(0));
}
#[test]
fn test_with_fetch_no_limit() {
let original_stats = Statistics {
num_rows: Precision::Exact(100),
total_byte_size: Precision::Exact(800),
column_statistics: vec![col_stats_i64(10)],
};
let result = original_stats.clone().with_fetch(None, 0, 1).unwrap();
assert_eq!(result.num_rows, Precision::Exact(100));
assert_eq!(result.total_byte_size, Precision::Exact(800));
}
#[test]
fn test_with_fetch_with_skip() {
let original_stats = Statistics {
num_rows: Precision::Exact(1000),
total_byte_size: Precision::Exact(8000),
column_statistics: vec![col_stats_i64(10)],
};
let result = original_stats
.clone()
.with_fetch(Some(300), 200, 1)
.unwrap();
assert_eq!(result.num_rows, Precision::Exact(300));
assert_eq!(result.total_byte_size, Precision::Inexact(240));
}
#[test]
fn test_with_fetch_multi_partition() {
let original_stats = Statistics {
num_rows: Precision::Exact(1000), total_byte_size: Precision::Exact(8000),
column_statistics: vec![col_stats_i64(10)],
};
let result = original_stats.clone().with_fetch(Some(100), 0, 4).unwrap();
assert_eq!(result.num_rows, Precision::Exact(400));
assert_eq!(result.total_byte_size, Precision::Inexact(320));
}
#[test]
fn test_with_fetch_absent_stats() {
let original_stats = Statistics {
num_rows: Precision::Absent,
total_byte_size: Precision::Absent,
column_statistics: vec![ColumnStatistics {
null_count: Precision::Absent,
max_value: Precision::Absent,
min_value: Precision::Absent,
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
byte_size: Precision::Absent,
}],
};
let result = original_stats.clone().with_fetch(Some(100), 0, 1).unwrap();
assert_eq!(result.num_rows, Precision::Inexact(100));
assert_eq!(result.total_byte_size, Precision::Absent);
assert_eq!(result.column_statistics[0].null_count, Precision::Absent);
}
#[test]
fn test_with_fetch_fetch_exceeds_rows() {
let original_stats = Statistics {
num_rows: Precision::Exact(100),
total_byte_size: Precision::Exact(800),
column_statistics: vec![col_stats_i64(10)],
};
let result = original_stats.clone().with_fetch(Some(100), 50, 1).unwrap();
assert_eq!(result.num_rows, Precision::Exact(50));
assert_eq!(result.total_byte_size, Precision::Inexact(400));
}
#[test]
fn test_with_fetch_preserves_all_column_stats() {
let original_col_stats = ColumnStatistics {
null_count: Precision::Exact(42),
max_value: Precision::Exact(ScalarValue::Int32(Some(999))),
min_value: Precision::Exact(ScalarValue::Int32(Some(-100))),
sum_value: Precision::Exact(ScalarValue::Int32(Some(123456))),
distinct_count: Precision::Exact(789),
byte_size: Precision::Exact(4000),
};
let original_stats = Statistics {
num_rows: Precision::Exact(1000),
total_byte_size: Precision::Exact(8000),
column_statistics: vec![original_col_stats.clone()],
};
let result = original_stats.with_fetch(Some(250), 0, 1).unwrap();
let result_col_stats = &result.column_statistics[0];
assert_eq!(result_col_stats.null_count, Precision::Inexact(42));
assert_eq!(
result_col_stats.max_value,
Precision::Inexact(ScalarValue::Int32(Some(999)))
);
assert_eq!(
result_col_stats.min_value,
Precision::Inexact(ScalarValue::Int32(Some(-100)))
);
assert_eq!(
result_col_stats.sum_value,
Precision::Inexact(ScalarValue::Int32(Some(123456)))
);
assert_eq!(result_col_stats.distinct_count, Precision::Inexact(250));
}
#[test]
fn test_byte_size_to_inexact() {
let col_stats = ColumnStatistics {
null_count: Precision::Exact(10),
max_value: Precision::Absent,
min_value: Precision::Absent,
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
byte_size: Precision::Exact(5000),
};
let inexact = col_stats.to_inexact();
assert_eq!(inexact.byte_size, Precision::Inexact(5000));
}
#[test]
fn test_with_byte_size_builder() {
let col_stats =
ColumnStatistics::new_unknown().with_byte_size(Precision::Exact(8192));
assert_eq!(col_stats.byte_size, Precision::Exact(8192));
}
#[test]
fn test_with_sum_value_builder_widens_small_integers() {
let col_stats = ColumnStatistics::new_unknown()
.with_sum_value(Precision::Exact(ScalarValue::UInt32(Some(123))));
assert_eq!(
col_stats.sum_value,
Precision::Exact(ScalarValue::UInt64(Some(123)))
);
}
#[test]
fn test_with_fetch_scales_byte_size() {
let original_stats = Statistics {
num_rows: Precision::Exact(1000),
total_byte_size: Precision::Exact(8000),
column_statistics: vec![
ColumnStatistics {
null_count: Precision::Exact(10),
max_value: Precision::Absent,
min_value: Precision::Absent,
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
byte_size: Precision::Exact(4000),
},
ColumnStatistics {
null_count: Precision::Exact(20),
max_value: Precision::Absent,
min_value: Precision::Absent,
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
byte_size: Precision::Exact(8000),
},
],
};
let result = original_stats.with_fetch(Some(100), 0, 1).unwrap();
assert_eq!(
result.column_statistics[0].byte_size,
Precision::Inexact(400)
);
assert_eq!(
result.column_statistics[1].byte_size,
Precision::Inexact(800)
);
assert_eq!(result.total_byte_size, Precision::Inexact(1200));
}
#[test]
fn test_with_fetch_total_byte_size_fallback() {
let original_stats = Statistics {
num_rows: Precision::Exact(1000),
total_byte_size: Precision::Exact(8000),
column_statistics: vec![
ColumnStatistics {
null_count: Precision::Exact(10),
max_value: Precision::Absent,
min_value: Precision::Absent,
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
byte_size: Precision::Exact(4000),
},
ColumnStatistics {
null_count: Precision::Exact(20),
max_value: Precision::Absent,
min_value: Precision::Absent,
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
byte_size: Precision::Absent, },
],
};
let result = original_stats.with_fetch(Some(100), 0, 1).unwrap();
assert_eq!(result.total_byte_size, Precision::Inexact(800));
}
#[test]
fn test_with_fetch_caps_ndv_at_row_count() {
let stats = Statistics {
num_rows: Precision::Exact(1000),
total_byte_size: Precision::Exact(8000),
column_statistics: vec![ColumnStatistics {
distinct_count: Precision::Inexact(500),
..Default::default()
}],
};
let result = stats.with_fetch(Some(10), 0, 1).unwrap();
assert_eq!(result.num_rows, Precision::Exact(10));
assert_eq!(
result.column_statistics[0].distinct_count,
Precision::Inexact(10)
);
}
#[test]
fn test_with_fetch_caps_ndv_with_skip() {
let stats = Statistics {
num_rows: Precision::Exact(1000),
total_byte_size: Precision::Exact(8000),
column_statistics: vec![ColumnStatistics {
distinct_count: Precision::Inexact(500),
..Default::default()
}],
};
let result = stats.with_fetch(Some(10), 5, 1).unwrap();
assert_eq!(result.num_rows, Precision::Exact(10));
assert_eq!(
result.column_statistics[0].distinct_count,
Precision::Inexact(10)
);
}
#[test]
fn test_with_fetch_caps_ndv_with_large_skip() {
let stats = Statistics {
num_rows: Precision::Exact(1000),
total_byte_size: Precision::Exact(8000),
column_statistics: vec![ColumnStatistics {
distinct_count: Precision::Inexact(500),
..Default::default()
}],
};
let result = stats.with_fetch(Some(100), 995, 1).unwrap();
assert_eq!(result.num_rows, Precision::Exact(5));
assert_eq!(
result.column_statistics[0].distinct_count,
Precision::Inexact(5)
);
}
#[test]
fn test_with_fetch_ndv_below_row_count_unchanged() {
let stats = Statistics {
num_rows: Precision::Exact(1000),
total_byte_size: Precision::Exact(8000),
column_statistics: vec![ColumnStatistics {
distinct_count: Precision::Inexact(5),
..Default::default()
}],
};
let result = stats.with_fetch(Some(10), 0, 1).unwrap();
assert_eq!(result.num_rows, Precision::Exact(10));
assert_eq!(
result.column_statistics[0].distinct_count,
Precision::Inexact(5)
);
}
#[test]
fn test_try_merge_iter_basic() {
let schema = Arc::new(Schema::new(vec![
Field::new("col1", DataType::Int32, false),
Field::new("col2", DataType::Int32, false),
]));
let stats1 = Statistics {
num_rows: Precision::Exact(10),
total_byte_size: Precision::Exact(100),
column_statistics: vec![
ColumnStatistics {
null_count: Precision::Exact(1),
max_value: Precision::Exact(ScalarValue::Int32(Some(100))),
min_value: Precision::Exact(ScalarValue::Int32(Some(1))),
sum_value: Precision::Exact(ScalarValue::Int32(Some(500))),
distinct_count: Precision::Absent,
byte_size: Precision::Exact(40),
},
ColumnStatistics {
null_count: Precision::Exact(2),
max_value: Precision::Exact(ScalarValue::Int32(Some(200))),
min_value: Precision::Exact(ScalarValue::Int32(Some(10))),
sum_value: Precision::Exact(ScalarValue::Int32(Some(1000))),
distinct_count: Precision::Absent,
byte_size: Precision::Exact(40),
},
],
};
let stats2 = Statistics {
num_rows: Precision::Exact(15),
total_byte_size: Precision::Exact(150),
column_statistics: vec![
ColumnStatistics {
null_count: Precision::Exact(2),
max_value: Precision::Exact(ScalarValue::Int32(Some(120))),
min_value: Precision::Exact(ScalarValue::Int32(Some(-10))),
sum_value: Precision::Exact(ScalarValue::Int32(Some(600))),
distinct_count: Precision::Absent,
byte_size: Precision::Exact(60),
},
ColumnStatistics {
null_count: Precision::Exact(3),
max_value: Precision::Exact(ScalarValue::Int32(Some(180))),
min_value: Precision::Exact(ScalarValue::Int32(Some(5))),
sum_value: Precision::Exact(ScalarValue::Int32(Some(1200))),
distinct_count: Precision::Absent,
byte_size: Precision::Exact(60),
},
],
};
let items = vec![&stats1, &stats2];
let summary_stats = Statistics::try_merge_iter(items, &schema).unwrap();
assert_eq!(summary_stats.num_rows, Precision::Exact(25));
assert_eq!(summary_stats.total_byte_size, Precision::Exact(250));
let col1_stats = &summary_stats.column_statistics[0];
assert_eq!(col1_stats.null_count, Precision::Exact(3));
assert_eq!(
col1_stats.max_value,
Precision::Exact(ScalarValue::Int32(Some(120)))
);
assert_eq!(
col1_stats.min_value,
Precision::Exact(ScalarValue::Int32(Some(-10)))
);
assert_eq!(
col1_stats.sum_value,
Precision::Exact(ScalarValue::Int64(Some(1100)))
);
let col2_stats = &summary_stats.column_statistics[1];
assert_eq!(col2_stats.null_count, Precision::Exact(5));
assert_eq!(
col2_stats.max_value,
Precision::Exact(ScalarValue::Int32(Some(200)))
);
assert_eq!(
col2_stats.min_value,
Precision::Exact(ScalarValue::Int32(Some(5)))
);
assert_eq!(
col2_stats.sum_value,
Precision::Exact(ScalarValue::Int64(Some(2200)))
);
}
#[test]
fn test_try_merge_iter_mixed_precision() {
let schema = Arc::new(Schema::new(vec![Field::new(
"col1",
DataType::Int32,
false,
)]));
let stats1 = Statistics {
num_rows: Precision::Exact(10),
total_byte_size: Precision::Inexact(100),
column_statistics: vec![ColumnStatistics {
null_count: Precision::Exact(1),
max_value: Precision::Exact(ScalarValue::Int32(Some(100))),
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
sum_value: Precision::Exact(ScalarValue::Int32(Some(500))),
distinct_count: Precision::Absent,
byte_size: Precision::Exact(40),
}],
};
let stats2 = Statistics {
num_rows: Precision::Inexact(15),
total_byte_size: Precision::Exact(150),
column_statistics: vec![ColumnStatistics {
null_count: Precision::Inexact(2),
max_value: Precision::Inexact(ScalarValue::Int32(Some(120))),
min_value: Precision::Exact(ScalarValue::Int32(Some(-10))),
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
byte_size: Precision::Inexact(60),
}],
};
let items = vec![&stats1, &stats2];
let summary_stats = Statistics::try_merge_iter(items, &schema).unwrap();
assert_eq!(summary_stats.num_rows, Precision::Inexact(25));
assert_eq!(summary_stats.total_byte_size, Precision::Inexact(250));
let col_stats = &summary_stats.column_statistics[0];
assert_eq!(col_stats.null_count, Precision::Inexact(3));
assert_eq!(
col_stats.max_value,
Precision::Inexact(ScalarValue::Int32(Some(120)))
);
assert_eq!(
col_stats.min_value,
Precision::Inexact(ScalarValue::Int32(Some(-10)))
);
assert_eq!(col_stats.sum_value, Precision::Absent);
}
#[test]
fn test_try_merge_iter_empty() {
let schema = Arc::new(Schema::new(vec![Field::new(
"col1",
DataType::Int32,
false,
)]));
let items: Vec<&Statistics> = vec![];
let summary_stats = Statistics::try_merge_iter(items, &schema).unwrap();
assert_eq!(summary_stats.num_rows, Precision::Absent);
assert_eq!(summary_stats.total_byte_size, Precision::Absent);
assert_eq!(summary_stats.column_statistics.len(), 1);
assert_eq!(
summary_stats.column_statistics[0].null_count,
Precision::Absent
);
}
#[test]
fn test_try_merge_iter_single_item() {
let schema = Arc::new(Schema::new(vec![Field::new(
"col1",
DataType::Int32,
false,
)]));
let stats = Statistics {
num_rows: Precision::Exact(10),
total_byte_size: Precision::Exact(100),
column_statistics: vec![ColumnStatistics {
null_count: Precision::Exact(1),
max_value: Precision::Exact(ScalarValue::Int32(Some(100))),
min_value: Precision::Exact(ScalarValue::Int32(Some(1))),
sum_value: Precision::Exact(ScalarValue::Int32(Some(500))),
distinct_count: Precision::Exact(10),
byte_size: Precision::Exact(40),
}],
};
let items = vec![&stats];
let summary_stats = Statistics::try_merge_iter(items, &schema).unwrap();
assert_eq!(summary_stats, stats);
}
#[test]
fn test_try_merge_iter_mismatched_columns() {
let schema = Arc::new(Schema::new(vec![Field::new(
"col1",
DataType::Int32,
false,
)]));
let stats1 = Statistics::default();
let stats2 =
Statistics::default().add_column_statistics(ColumnStatistics::new_unknown());
let items = vec![&stats1, &stats2];
let e = Statistics::try_merge_iter(items, &schema).unwrap_err();
assert_contains!(
e.to_string(),
"Cannot merge statistics with different number of columns: 0 vs 1"
);
}
#[test]
fn test_try_merge_iter_three_items() {
let schema = Arc::new(Schema::new(vec![Field::new(
"col1",
DataType::Int64,
false,
)]));
let stats1 = Statistics {
num_rows: Precision::Exact(10),
total_byte_size: Precision::Exact(100),
column_statistics: vec![ColumnStatistics {
null_count: Precision::Exact(1),
max_value: Precision::Exact(ScalarValue::Int64(Some(100))),
min_value: Precision::Exact(ScalarValue::Int64(Some(10))),
sum_value: Precision::Exact(ScalarValue::Int64(Some(500))),
distinct_count: Precision::Exact(8),
byte_size: Precision::Exact(80),
}],
};
let stats2 = Statistics {
num_rows: Precision::Exact(20),
total_byte_size: Precision::Exact(200),
column_statistics: vec![ColumnStatistics {
null_count: Precision::Exact(2),
max_value: Precision::Exact(ScalarValue::Int64(Some(200))),
min_value: Precision::Exact(ScalarValue::Int64(Some(5))),
sum_value: Precision::Exact(ScalarValue::Int64(Some(1000))),
distinct_count: Precision::Exact(15),
byte_size: Precision::Exact(160),
}],
};
let stats3 = Statistics {
num_rows: Precision::Exact(30),
total_byte_size: Precision::Exact(300),
column_statistics: vec![ColumnStatistics {
null_count: Precision::Exact(3),
max_value: Precision::Exact(ScalarValue::Int64(Some(150))),
min_value: Precision::Exact(ScalarValue::Int64(Some(1))),
sum_value: Precision::Exact(ScalarValue::Int64(Some(2000))),
distinct_count: Precision::Exact(25),
byte_size: Precision::Exact(240),
}],
};
let items = vec![&stats1, &stats2, &stats3];
let summary_stats = Statistics::try_merge_iter(items, &schema).unwrap();
assert_eq!(summary_stats.num_rows, Precision::Exact(60));
assert_eq!(summary_stats.total_byte_size, Precision::Exact(600));
let col_stats = &summary_stats.column_statistics[0];
assert_eq!(col_stats.null_count, Precision::Exact(6));
assert_eq!(
col_stats.max_value,
Precision::Exact(ScalarValue::Int64(Some(200)))
);
assert_eq!(
col_stats.min_value,
Precision::Exact(ScalarValue::Int64(Some(1)))
);
assert_eq!(
col_stats.sum_value,
Precision::Exact(ScalarValue::Int64(Some(3500)))
);
assert_eq!(col_stats.byte_size, Precision::Exact(480));
assert_eq!(col_stats.distinct_count, Precision::Inexact(29));
}
#[test]
fn test_try_merge_iter_float_types() {
let schema = Arc::new(Schema::new(vec![Field::new(
"col1",
DataType::Float64,
false,
)]));
let stats1 = Statistics {
num_rows: Precision::Exact(10),
total_byte_size: Precision::Exact(80),
column_statistics: vec![ColumnStatistics {
null_count: Precision::Exact(0),
max_value: Precision::Exact(ScalarValue::Float64(Some(99.9))),
min_value: Precision::Exact(ScalarValue::Float64(Some(1.1))),
sum_value: Precision::Exact(ScalarValue::Float64(Some(500.5))),
distinct_count: Precision::Absent,
byte_size: Precision::Exact(80),
}],
};
let stats2 = Statistics {
num_rows: Precision::Exact(10),
total_byte_size: Precision::Exact(80),
column_statistics: vec![ColumnStatistics {
null_count: Precision::Exact(0),
max_value: Precision::Exact(ScalarValue::Float64(Some(200.0))),
min_value: Precision::Exact(ScalarValue::Float64(Some(0.5))),
sum_value: Precision::Exact(ScalarValue::Float64(Some(1000.0))),
distinct_count: Precision::Absent,
byte_size: Precision::Exact(80),
}],
};
let items = vec![&stats1, &stats2];
let summary_stats = Statistics::try_merge_iter(items, &schema).unwrap();
let col_stats = &summary_stats.column_statistics[0];
assert_eq!(
col_stats.max_value,
Precision::Exact(ScalarValue::Float64(Some(200.0)))
);
assert_eq!(
col_stats.min_value,
Precision::Exact(ScalarValue::Float64(Some(0.5)))
);
assert_eq!(
col_stats.sum_value,
Precision::Exact(ScalarValue::Float64(Some(1500.5)))
);
}
#[test]
fn test_try_merge_iter_string_types() {
let schema =
Arc::new(Schema::new(vec![Field::new("col1", DataType::Utf8, false)]));
let stats1 = Statistics {
num_rows: Precision::Exact(10),
total_byte_size: Precision::Exact(100),
column_statistics: vec![ColumnStatistics {
null_count: Precision::Exact(0),
max_value: Precision::Exact(ScalarValue::Utf8(Some("dog".to_string()))),
min_value: Precision::Exact(ScalarValue::Utf8(Some("ant".to_string()))),
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
byte_size: Precision::Exact(100),
}],
};
let stats2 = Statistics {
num_rows: Precision::Exact(10),
total_byte_size: Precision::Exact(100),
column_statistics: vec![ColumnStatistics {
null_count: Precision::Exact(0),
max_value: Precision::Exact(ScalarValue::Utf8(Some("zebra".to_string()))),
min_value: Precision::Exact(ScalarValue::Utf8(Some("bat".to_string()))),
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
byte_size: Precision::Exact(100),
}],
};
let items = vec![&stats1, &stats2];
let summary_stats = Statistics::try_merge_iter(items, &schema).unwrap();
let col_stats = &summary_stats.column_statistics[0];
assert_eq!(
col_stats.max_value,
Precision::Exact(ScalarValue::Utf8(Some("zebra".to_string())))
);
assert_eq!(
col_stats.min_value,
Precision::Exact(ScalarValue::Utf8(Some("ant".to_string())))
);
assert_eq!(col_stats.sum_value, Precision::Absent);
}
#[test]
fn test_try_merge_iter_all_inexact() {
let schema = Arc::new(Schema::new(vec![Field::new(
"col1",
DataType::Int32,
false,
)]));
let stats1 = Statistics {
num_rows: Precision::Inexact(10),
total_byte_size: Precision::Inexact(100),
column_statistics: vec![ColumnStatistics {
null_count: Precision::Inexact(1),
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
sum_value: Precision::Inexact(ScalarValue::Int32(Some(500))),
distinct_count: Precision::Absent,
byte_size: Precision::Inexact(40),
}],
};
let stats2 = Statistics {
num_rows: Precision::Inexact(20),
total_byte_size: Precision::Inexact(200),
column_statistics: vec![ColumnStatistics {
null_count: Precision::Inexact(2),
max_value: Precision::Inexact(ScalarValue::Int32(Some(200))),
min_value: Precision::Inexact(ScalarValue::Int32(Some(-5))),
sum_value: Precision::Inexact(ScalarValue::Int32(Some(1000))),
distinct_count: Precision::Absent,
byte_size: Precision::Inexact(60),
}],
};
let items = vec![&stats1, &stats2];
let summary_stats = Statistics::try_merge_iter(items, &schema).unwrap();
assert_eq!(summary_stats.num_rows, Precision::Inexact(30));
assert_eq!(summary_stats.total_byte_size, Precision::Inexact(300));
let col_stats = &summary_stats.column_statistics[0];
assert_eq!(col_stats.null_count, Precision::Inexact(3));
assert_eq!(
col_stats.max_value,
Precision::Inexact(ScalarValue::Int32(Some(200)))
);
assert_eq!(
col_stats.min_value,
Precision::Inexact(ScalarValue::Int32(Some(-5)))
);
assert_eq!(
col_stats.sum_value,
Precision::Inexact(ScalarValue::Int64(Some(1500)))
);
}
#[test]
fn test_precision_min_in_place() {
let mut lhs = Precision::Exact(10);
precision_min(&mut lhs, &Precision::Exact(20));
assert_eq!(lhs, Precision::Exact(10));
let mut lhs = Precision::Exact(20);
precision_min(&mut lhs, &Precision::Exact(10));
assert_eq!(lhs, Precision::Exact(10));
let mut lhs = Precision::Exact(5);
precision_min(&mut lhs, &Precision::Exact(5));
assert_eq!(lhs, Precision::Exact(5));
let mut lhs = Precision::Exact(10);
precision_min(&mut lhs, &Precision::Inexact(20));
assert_eq!(lhs, Precision::Inexact(10));
let mut lhs = Precision::Inexact(10);
precision_min(&mut lhs, &Precision::Exact(5));
assert_eq!(lhs, Precision::Inexact(5));
let mut lhs = Precision::Inexact(30);
precision_min(&mut lhs, &Precision::Inexact(20));
assert_eq!(lhs, Precision::Inexact(20));
let mut lhs = Precision::Exact(10);
precision_min(&mut lhs, &Precision::Absent);
assert_eq!(lhs, Precision::Absent);
let mut lhs = Precision::<i32>::Absent;
precision_min(&mut lhs, &Precision::Exact(10));
assert_eq!(lhs, Precision::Absent);
}
#[test]
fn test_precision_max_in_place() {
let mut lhs = Precision::Exact(10);
precision_max(&mut lhs, &Precision::Exact(20));
assert_eq!(lhs, Precision::Exact(20));
let mut lhs = Precision::Exact(20);
precision_max(&mut lhs, &Precision::Exact(10));
assert_eq!(lhs, Precision::Exact(20));
let mut lhs = Precision::Exact(5);
precision_max(&mut lhs, &Precision::Exact(5));
assert_eq!(lhs, Precision::Exact(5));
let mut lhs = Precision::Exact(10);
precision_max(&mut lhs, &Precision::Inexact(20));
assert_eq!(lhs, Precision::Inexact(20));
let mut lhs = Precision::Inexact(10);
precision_max(&mut lhs, &Precision::Exact(5));
assert_eq!(lhs, Precision::Inexact(10));
let mut lhs = Precision::Inexact(20);
precision_max(&mut lhs, &Precision::Inexact(30));
assert_eq!(lhs, Precision::Inexact(30));
let mut lhs = Precision::Exact(10);
precision_max(&mut lhs, &Precision::Absent);
assert_eq!(lhs, Precision::Absent);
let mut lhs = Precision::<i32>::Absent;
precision_max(&mut lhs, &Precision::Exact(10));
assert_eq!(lhs, Precision::Absent);
}
#[test]
fn test_cast_sum_value_to_sum_type_in_place_widens_int32() {
let mut value = Precision::Exact(ScalarValue::Int32(Some(42)));
cast_sum_value_to_sum_type_in_place(&mut value);
assert_eq!(value, Precision::Exact(ScalarValue::Int64(Some(42))));
}
#[test]
fn test_cast_sum_value_to_sum_type_in_place_preserves_int64() {
let mut value = Precision::Exact(ScalarValue::Int64(Some(100)));
cast_sum_value_to_sum_type_in_place(&mut value);
assert_eq!(value, Precision::Exact(ScalarValue::Int64(Some(100))));
}
#[test]
fn test_cast_sum_value_to_sum_type_in_place_inexact() {
let mut value = Precision::Inexact(ScalarValue::Int32(Some(42)));
cast_sum_value_to_sum_type_in_place(&mut value);
assert_eq!(value, Precision::Inexact(ScalarValue::Int64(Some(42))));
}
#[test]
fn test_cast_sum_value_to_sum_type_in_place_absent() {
let mut value = Precision::<ScalarValue>::Absent;
cast_sum_value_to_sum_type_in_place(&mut value);
assert_eq!(value, Precision::Absent);
}
#[test]
fn test_precision_add_for_sum_in_place_same_type() {
let mut lhs = Precision::Exact(ScalarValue::Int64(Some(10)));
let rhs = Precision::Exact(ScalarValue::Int64(Some(20)));
precision_add_for_sum_in_place(&mut lhs, &rhs);
assert_eq!(lhs, Precision::Exact(ScalarValue::Int64(Some(30))));
}
#[test]
fn test_precision_add_for_sum_in_place_widens_rhs() {
let mut lhs = Precision::Exact(ScalarValue::Int64(Some(10)));
let rhs = Precision::Exact(ScalarValue::Int32(Some(5)));
precision_add_for_sum_in_place(&mut lhs, &rhs);
assert_eq!(lhs, Precision::Exact(ScalarValue::Int64(Some(15))));
}
#[test]
fn test_precision_add_for_sum_in_place_inexact() {
let mut lhs = Precision::Inexact(ScalarValue::Int64(Some(10)));
let rhs = Precision::Inexact(ScalarValue::Int32(Some(5)));
precision_add_for_sum_in_place(&mut lhs, &rhs);
assert_eq!(lhs, Precision::Inexact(ScalarValue::Int64(Some(15))));
}
#[test]
fn test_precision_add_for_sum_in_place_absent_rhs() {
let mut lhs = Precision::Exact(ScalarValue::Int64(Some(10)));
precision_add_for_sum_in_place(&mut lhs, &Precision::Absent);
assert_eq!(lhs, Precision::Absent);
}
}