use reifydb_core::{
common::{WindowKind, WindowSize},
encoded::key::EncodedKey,
interface::change::{Change, Diff},
key::{EncodableKey, flow_node_state::FlowNodeStateKey},
value::column::columns::Columns,
};
use reifydb_runtime::hash::Hash128;
use reifydb_type::Result;
use super::{WindowEvent, WindowLayout, WindowOperator, WindowState};
use crate::transaction::FlowTransaction;
impl WindowOperator {
pub fn evict_old_events(&self, state: &mut WindowState, current_timestamp: u64) {
match &self.kind {
WindowKind::Rolling {
size: WindowSize::Duration(duration),
} => {
let window_size_ms = duration.as_millis() as u64;
let cutoff_time = current_timestamp.saturating_sub(window_size_ms);
let original_len = state.events.len();
state.events.retain(|event| event.timestamp > cutoff_time);
let evicted_count = original_len - state.events.len();
state.event_count = state.event_count.saturating_sub(evicted_count as u64);
}
WindowKind::Rolling {
size: WindowSize::Count(count),
} => {
if state.events.len() > *count as usize {
let excess = state.events.len() - *count as usize;
state.events.drain(0..excess);
state.event_count = *count;
}
}
_ => {}
}
}
pub fn tick_rolling_eviction(&self, txn: &mut FlowTransaction, current_timestamp: u64) -> Result<Vec<Diff>> {
let mut result = Vec::new();
let all_state = txn.state_scan(self.node)?;
let prefix = FlowNodeStateKey::new(self.node, vec![]).encode();
let win_marker = b"win:";
for item in &all_state.items {
let full_key = &item.key;
if full_key.len() <= prefix.len() {
continue;
}
let inner = &full_key[prefix.len()..];
if !inner.starts_with(win_marker) {
continue;
}
let window_key = EncodedKey::new(inner);
let mut state = self.load_window_state(txn, &window_key)?;
if state.events.is_empty() {
continue;
}
let layout = match &state.window_layout {
Some(l) => l.clone(),
None => continue,
};
let pre_agg = self.apply_aggregations(txn, &window_key, &layout, &state.events)?;
let pre_count = state.events.len();
self.evict_old_events(&mut state, current_timestamp);
if state.events.len() < pre_count {
if state.events.is_empty() {
self.save_window_state(txn, &window_key, &state)?;
if let Some((row, _)) = pre_agg {
result.push(Diff::Remove {
pre: Columns::from_row(&row),
});
}
} else {
let post_agg =
self.apply_aggregations(txn, &window_key, &layout, &state.events)?;
self.save_window_state(txn, &window_key, &state)?;
if let (Some((pre_row, _)), Some((post_row, _))) = (pre_agg, post_agg) {
result.push(Diff::Update {
pre: Columns::from_row(&pre_row),
post: Columns::from_row(&post_row),
});
}
}
}
}
Ok(result)
}
}
fn process_rolling_group_insert(
operator: &WindowOperator,
txn: &mut FlowTransaction,
columns: &Columns,
group_hash: Hash128,
) -> Result<Vec<Diff>> {
let mut result = Vec::new();
let row_count = columns.row_count();
if row_count == 0 {
return Ok(result);
}
let current_timestamp = operator.current_timestamp();
let timestamps = operator.resolve_event_timestamps(columns, row_count)?;
let window_id = 0u64;
let window_key = operator.create_window_key(group_hash, window_id);
let mut window_state = operator.load_window_state(txn, &window_key)?;
for row_idx in 0..row_count {
let event_timestamp = timestamps[row_idx];
let single_row_columns = columns.extract_row(row_idx);
let projected = operator.project_columns(&single_row_columns);
let row = projected.to_single_row();
if window_state.window_layout.is_none() {
window_state.window_layout = Some(WindowLayout::from_row(&row));
}
let layout = window_state.layout().clone();
let previous_aggregation = if !window_state.events.is_empty() {
operator.apply_aggregations(txn, &window_key, &layout, &window_state.events)?
} else {
None
};
let event = WindowEvent::from_row(&row, event_timestamp);
let event_row_number = event.row_number;
window_state.events.push(event);
window_state.event_count += 1;
window_state.last_event_time = event_timestamp;
if window_state.window_start == 0 {
window_state.window_start = event_timestamp;
}
operator.store_row_index(txn, group_hash, event_row_number, window_id)?;
let eviction_ts = if operator.ts.is_some() {
event_timestamp
} else {
current_timestamp
};
operator.evict_old_events(&mut window_state, eviction_ts);
if !window_state.events.is_empty() {
if let Some((aggregated_row, is_new)) =
operator.apply_aggregations(txn, &window_key, &layout, &window_state.events)?
{
result.push(WindowOperator::emit_aggregation_diff(
&aggregated_row,
is_new,
previous_aggregation,
));
}
}
}
operator.save_window_state(txn, &window_key, &window_state)?;
Ok(result)
}
pub fn apply_rolling_window(operator: &WindowOperator, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
let diffs = operator.apply_window_change(txn, &change, false, |op, txn, columns| {
op.process_insert(txn, columns, process_rolling_group_insert)
})?;
Ok(Change::from_flow(operator.node, change.version, diffs))
}