use std::collections::{HashMap, HashSet, hash_map::Entry};
use reifydb_core::{
common::{WindowKind, WindowSize},
encoded::key::EncodedKey,
interface::change::{Change, Diff},
row::Row,
value::column::columns::Columns,
};
use reifydb_runtime::hash::Hash128;
use reifydb_value::{Result, value::datetime::DateTime};
use super::{WindowEvent, WindowLayout, WindowOperator, WindowState};
use crate::transaction::FlowTransaction;
impl WindowOperator {
pub fn get_tumbling_window_id(&self, timestamp: u64) -> u64 {
match &self.kind {
WindowKind::Tumbling {
size: WindowSize::Duration(duration),
} => {
let window_size_ms = duration.as_millis() as u64;
timestamp / window_size_ms
}
WindowKind::Tumbling {
size: WindowSize::Count(count),
} => timestamp / *count,
_ => 0,
}
}
pub fn set_tumbling_window_start(&self, timestamp: u64) -> u64 {
if let Some(duration) = self.size_duration() {
let window_size_ms = duration.as_millis() as u64;
(timestamp / window_size_ms) * window_size_ms
} else {
timestamp
}
}
}
fn process_tumbling_group_insert(
operator: &WindowOperator,
txn: &mut FlowTransaction,
columns: &Columns,
group_hash: Hash128,
changed_at: DateTime,
) -> Result<Vec<Diff>> {
let mut result = Vec::new();
let row_count = columns.row_count();
if row_count == 0 {
return Ok(result);
}
let timestamps = operator.resolve_event_timestamps(columns, row_count)?;
let mut row_info: Vec<(u64, u64)> = Vec::with_capacity(row_count);
for ×tamp in timestamps.iter() {
let (event_timestamp, window_id) = if operator.is_count_based() {
let event_timestamp = operator.current_timestamp();
let global_count = operator.get_and_increment_global_count(txn, group_hash)?;
let window_size = operator.size_count().unwrap_or(3);
(event_timestamp, global_count / window_size)
} else {
(timestamp, operator.get_tumbling_window_id(timestamp))
};
row_info.push((event_timestamp, window_id));
}
let mut prefetch_keys: Vec<EncodedKey> = Vec::new();
let mut seen_windows: HashSet<u64> = HashSet::new();
for &(_, window_id) in &row_info {
if seen_windows.insert(window_id) {
prefetch_keys.push(operator.create_window_key(group_hash, window_id));
}
}
txn.prefetch_state(operator.node, &prefetch_keys)?;
type WindowCache = (WindowState, Option<(Row, bool)>, EncodedKey);
let mut window_caches: HashMap<u64, WindowCache> = HashMap::new();
for (row_idx, &(event_timestamp, window_id)) in row_info.iter().enumerate() {
if let Entry::Vacant(e) = window_caches.entry(window_id) {
let window_key = operator.create_window_key(group_hash, window_id);
let state = operator.load_window_state(txn, &window_key)?;
e.insert((state, None, window_key));
}
let single_row_columns = columns.extract_row(row_idx);
let projected = operator.project_columns(&single_row_columns);
let row = projected.to_single_row();
let (window_state, last_post, window_key) = window_caches.get_mut(&window_id).unwrap();
if window_state.window_layout.is_none() {
window_state.window_layout = Some(WindowLayout::from_row(&row));
}
let layout = window_state.layout().clone();
let previous_aggregation = match last_post.take() {
Some(prev) => Some(prev),
None if !window_state.events.is_empty() => operator.apply_aggregations(
txn,
window_key,
&layout,
&window_state.events,
changed_at,
window_state,
)?,
None => None,
};
let event = WindowEvent::from_row(&row, event_timestamp);
let event_row_number = event.row_number;
window_state.events.push(event);
window_state.event_count += 1;
window_state.last_event_time = event_timestamp;
operator.update_running_totals_on_push(window_state, &row);
if window_state.window_start == 0 {
window_state.window_start = operator.set_tumbling_window_start(event_timestamp);
}
if let Some((aggregated_row, is_new)) = operator.apply_aggregations(
txn,
window_key,
&layout,
&window_state.events,
changed_at,
window_state,
)? {
let cache_value = Some((aggregated_row.clone(), is_new));
result.push(WindowOperator::emit_aggregation_diff(
&aggregated_row,
is_new,
previous_aggregation,
));
*last_post = cache_value;
} else {
*last_post = None;
}
operator.store_row_index(txn, group_hash, event_row_number, window_id)?;
}
for (window_state, _, window_key) in window_caches.values() {
operator.save_window_state(txn, window_key, window_state)?;
}
Ok(result)
}
pub fn apply_tumbling_window(operator: &WindowOperator, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
let changed_at = change.changed_at;
let diffs = operator.apply_window_change(txn, &change, true, |op, txn, columns| {
op.process_insert(txn, columns, changed_at, process_tumbling_group_insert)
})?;
Ok(Change::from_flow(operator.node, change.version, diffs, change.changed_at))
}