use crate::core::Value;
#[derive(Debug, Clone)]
pub struct ColumnAggregateStats {
pub sum_int: i128,
pub sum_float: f64,
pub numeric_count: u64,
pub min: Value,
pub max: Value,
pub non_null_count: u64,
}
impl Default for ColumnAggregateStats {
fn default() -> Self {
Self {
sum_int: 0,
sum_float: 0.0,
numeric_count: 0,
min: Value::Null(crate::core::DataType::Null),
max: Value::Null(crate::core::DataType::Null),
non_null_count: 0,
}
}
}
impl ColumnAggregateStats {
pub fn accumulate(&mut self, value: &Value) {
if value.is_null() {
return;
}
self.non_null_count += 1;
match value {
Value::Integer(i) => {
self.sum_int += *i as i128;
self.numeric_count += 1;
}
Value::Float(f) => {
if !f.is_nan() {
self.sum_float += *f;
self.numeric_count += 1;
}
}
_ => {}
}
if self.min.is_null() {
self.min = value.clone();
} else if let Ok(std::cmp::Ordering::Less) = value.compare(&self.min) {
self.min = value.clone();
}
if self.max.is_null() {
self.max = value.clone();
} else if let Ok(std::cmp::Ordering::Greater) = value.compare(&self.max) {
self.max = value.clone();
}
}
pub fn merge(&mut self, other: &ColumnAggregateStats) {
self.sum_int += other.sum_int;
self.sum_float += other.sum_float;
self.numeric_count += other.numeric_count;
self.non_null_count += other.non_null_count;
if other.min.is_null() {
return;
}
if self.min.is_null() {
self.min = other.min.clone();
} else if let Ok(std::cmp::Ordering::Less) = other.min.compare(&self.min) {
self.min = other.min.clone();
}
if self.max.is_null() {
self.max = other.max.clone();
} else if let Ok(std::cmp::Ordering::Greater) = other.max.compare(&self.max) {
self.max = other.max.clone();
}
}
pub fn sum_as_f64(&self) -> f64 {
self.sum_int as f64 + self.sum_float
}
pub fn sum_parts(&self) -> (i128, f64) {
(self.sum_int, self.sum_float)
}
pub fn avg(&self) -> Option<f64> {
if self.numeric_count == 0 {
None
} else {
Some(self.sum_as_f64() / self.numeric_count as f64)
}
}
}
#[derive(Debug, Clone)]
pub struct VolumeAggregateStats {
pub total_rows: u64,
pub live_rows: u64,
pub columns: Vec<ColumnAggregateStats>,
}
impl VolumeAggregateStats {
pub fn new(num_columns: usize) -> Self {
Self {
total_rows: 0,
live_rows: 0,
columns: (0..num_columns)
.map(|_| ColumnAggregateStats::default())
.collect(),
}
}
pub fn merge(&mut self, other: &VolumeAggregateStats) {
self.total_rows += other.total_rows;
self.live_rows += other.live_rows;
for (i, col) in self.columns.iter_mut().enumerate() {
if i < other.columns.len() {
col.merge(&other.columns[i]);
}
}
}
#[inline]
pub fn count_star(&self) -> u64 {
self.live_rows
}
#[inline]
pub fn sum(&self, col_idx: usize) -> f64 {
self.columns[col_idx].sum_as_f64()
}
#[inline]
pub fn min(&self, col_idx: usize) -> &Value {
&self.columns[col_idx].min
}
#[inline]
pub fn max(&self, col_idx: usize) -> &Value {
&self.columns[col_idx].max
}
#[inline]
pub fn avg(&self, col_idx: usize) -> Option<f64> {
self.columns[col_idx].avg()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_column_stats_accumulate() {
let mut stats = ColumnAggregateStats::default();
stats.accumulate(&Value::Integer(10));
stats.accumulate(&Value::Integer(20));
stats.accumulate(&Value::Integer(30));
assert_eq!(stats.sum_int, 60);
assert_eq!(stats.numeric_count, 3);
assert_eq!(stats.non_null_count, 3);
assert_eq!(stats.min, Value::Integer(10));
assert_eq!(stats.max, Value::Integer(30));
assert_eq!(stats.avg(), Some(20.0));
}
#[test]
fn test_column_stats_with_nulls() {
let mut stats = ColumnAggregateStats::default();
stats.accumulate(&Value::Float(1.5));
stats.accumulate(&Value::Null(crate::core::DataType::Float));
stats.accumulate(&Value::Float(3.5));
assert_eq!(stats.numeric_count, 2);
assert_eq!(stats.non_null_count, 2);
assert_eq!(stats.sum_float, 5.0);
}
#[test]
fn test_volume_stats_merge() {
let mut s1 = VolumeAggregateStats::new(2);
s1.live_rows = 100;
s1.columns[0].accumulate(&Value::Integer(10));
s1.columns[0].accumulate(&Value::Integer(50));
let mut s2 = VolumeAggregateStats::new(2);
s2.live_rows = 200;
s2.columns[0].accumulate(&Value::Integer(5));
s2.columns[0].accumulate(&Value::Integer(100));
s1.merge(&s2);
assert_eq!(s1.count_star(), 300);
assert_eq!(s1.min(0), &Value::Integer(5));
assert_eq!(s1.max(0), &Value::Integer(100));
assert_eq!(s1.sum(0), 165.0);
}
}