use std::collections::HashMap;
use super::definition::ContinuousAggregateDef;
use super::partial::PartialAggregate;
use super::watermark::WatermarkState;
use crate::engine::timeseries::columnar_memtable::{ColumnData, ColumnarDrainResult};
use crate::engine::timeseries::time_bucket;
pub struct RefreshResult {
pub rows_processed: u64,
pub max_ts: i64,
pub o3_min_ts: Option<i64>,
}
pub fn refresh_from_drain(
def: &ContinuousAggregateDef,
drain: &ColumnarDrainResult,
watermark: &WatermarkState,
materialized: &mut HashMap<(i64, Vec<u32>), PartialAggregate>,
) -> RefreshResult {
let bucket_ms = def.bucket_interval_ms;
if bucket_ms <= 0 || drain.row_count == 0 {
return RefreshResult {
rows_processed: 0,
max_ts: watermark.watermark_ts,
o3_min_ts: None,
};
}
let ts_idx = drain.schema.timestamp_idx;
let timestamps = drain.columns[ts_idx].as_timestamps();
let value_col_idx = def
.aggregates
.iter()
.find(|e| e.source_column != "*")
.and_then(|e| {
drain
.schema
.columns
.iter()
.position(|(name, _)| name == &e.source_column)
});
let group_col_indices: Vec<Option<usize>> = def
.group_by
.iter()
.map(|col_name| {
drain
.schema
.columns
.iter()
.position(|(name, _)| name == col_name)
})
.collect();
let current_watermark = watermark.watermark_ts;
let mut max_ts = current_watermark;
let mut o3_min: Option<i64> = None;
for row in 0..drain.row_count as usize {
let ts = timestamps[row];
let bucket = time_bucket::time_bucket(bucket_ms, ts);
if ts <= current_watermark {
match o3_min {
Some(current) if ts < current => o3_min = Some(ts),
None => o3_min = Some(ts),
_ => {}
}
}
if ts > max_ts {
max_ts = ts;
}
let group_key: Vec<u32> = group_col_indices
.iter()
.map(|opt_idx| match opt_idx {
Some(idx) => match &drain.columns[*idx] {
ColumnData::Symbol(v) => v[row],
_ => 0,
},
None => 0,
})
.collect();
let val = value_col_idx
.map(|idx| match &drain.columns[idx] {
ColumnData::Float64(v) => v[row],
ColumnData::Int64(v) => v[row] as f64,
ColumnData::Timestamp(v) => v[row] as f64,
ColumnData::Symbol(_) => 0.0,
})
.unwrap_or(1.0);
let key = (bucket, group_key.clone());
match materialized.get_mut(&key) {
Some(partial) => partial.merge_sample(ts, val),
None => {
let mut partial = PartialAggregate::new(bucket, group_key, ts, val);
for expr in &def.aggregates {
if expr.function.uses_sketch() {
partial.ensure_sketch(&expr.function);
}
}
materialized.insert(key, partial);
}
}
}
RefreshResult {
rows_processed: drain.row_count,
max_ts,
o3_min_ts: o3_min,
}
}