Skip to main content

reifydb_sub_flow/operator/window/
rolling.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::{
5	common::{WindowKind, WindowSize},
6	encoded::key::EncodedKey,
7	interface::change::{Change, Diff},
8	key::{EncodableKey, flow_node_state::FlowNodeStateKey},
9	value::column::columns::Columns,
10};
11use reifydb_runtime::hash::Hash128;
12use reifydb_type::Result;
13
14use super::{WindowEvent, WindowLayout, WindowOperator, WindowState};
15use crate::transaction::FlowTransaction;
16
17impl WindowOperator {
18	/// Evict old events from rolling window to maintain size limit
19	pub fn evict_old_events(&self, state: &mut WindowState, current_timestamp: u64) {
20		match &self.kind {
21			WindowKind::Rolling {
22				size: WindowSize::Duration(duration),
23			} => {
24				let window_size_ms = duration.as_millis() as u64;
25				let cutoff_time = current_timestamp.saturating_sub(window_size_ms);
26				let original_len = state.events.len();
27				state.events.retain(|event| event.timestamp > cutoff_time);
28				let evicted_count = original_len - state.events.len();
29				state.event_count = state.event_count.saturating_sub(evicted_count as u64);
30			}
31			WindowKind::Rolling {
32				size: WindowSize::Count(count),
33			} => {
34				if state.events.len() > *count as usize {
35					let excess = state.events.len() - *count as usize;
36					state.events.drain(0..excess);
37					state.event_count = *count;
38				}
39			}
40			_ => {}
41		}
42	}
43
44	/// Tick-based eviction for duration-based rolling windows.
45	/// Scans all operator state, finds "win:" keys, and evicts old events.
46	pub fn tick_rolling_eviction(&self, txn: &mut FlowTransaction, current_timestamp: u64) -> Result<Vec<Diff>> {
47		let mut result = Vec::new();
48
49		let all_state = txn.state_scan(self.node)?;
50		let prefix = FlowNodeStateKey::new(self.node, vec![]).encode();
51		let win_marker = b"win:";
52
53		for item in &all_state.items {
54			let full_key = &item.key;
55			if full_key.len() <= prefix.len() {
56				continue;
57			}
58			let inner = &full_key[prefix.len()..];
59			if !inner.starts_with(win_marker) {
60				continue;
61			}
62
63			let window_key = EncodedKey::new(inner);
64			let mut state = self.load_window_state(txn, &window_key)?;
65			if state.events.is_empty() {
66				continue;
67			}
68			let layout = match &state.window_layout {
69				Some(l) => l.clone(),
70				None => continue,
71			};
72
73			let pre_agg = self.apply_aggregations(txn, &window_key, &layout, &state.events)?;
74			let pre_count = state.events.len();
75			self.evict_old_events(&mut state, current_timestamp);
76
77			if state.events.len() < pre_count {
78				if state.events.is_empty() {
79					self.save_window_state(txn, &window_key, &state)?;
80					if let Some((row, _)) = pre_agg {
81						result.push(Diff::Remove {
82							pre: Columns::from_row(&row),
83						});
84					}
85				} else {
86					let post_agg =
87						self.apply_aggregations(txn, &window_key, &layout, &state.events)?;
88					self.save_window_state(txn, &window_key, &state)?;
89					if let (Some((pre_row, _)), Some((post_row, _))) = (pre_agg, post_agg) {
90						result.push(Diff::Update {
91							pre: Columns::from_row(&pre_row),
92							post: Columns::from_row(&post_row),
93						});
94					}
95				}
96			}
97		}
98
99		Ok(result)
100	}
101}
102
103/// Process inserts for a single group in rolling windows.
104/// Rolling windows use a single window (id=0) per group and load state once per group.
105fn process_rolling_group_insert(
106	operator: &WindowOperator,
107	txn: &mut FlowTransaction,
108	columns: &Columns,
109	group_hash: Hash128,
110) -> Result<Vec<Diff>> {
111	let mut result = Vec::new();
112	let row_count = columns.row_count();
113	if row_count == 0 {
114		return Ok(result);
115	}
116
117	let current_timestamp = operator.current_timestamp();
118	let timestamps = operator.resolve_event_timestamps(columns, row_count)?;
119
120	let window_id = 0u64;
121	let window_key = operator.create_window_key(group_hash, window_id);
122	let mut window_state = operator.load_window_state(txn, &window_key)?;
123
124	for row_idx in 0..row_count {
125		let event_timestamp = timestamps[row_idx];
126
127		let single_row_columns = columns.extract_row(row_idx);
128		let projected = operator.project_columns(&single_row_columns);
129		let row = projected.to_single_row();
130
131		if window_state.window_layout.is_none() {
132			window_state.window_layout = Some(WindowLayout::from_row(&row));
133		}
134		let layout = window_state.layout().clone();
135
136		let previous_aggregation = if !window_state.events.is_empty() {
137			operator.apply_aggregations(txn, &window_key, &layout, &window_state.events)?
138		} else {
139			None
140		};
141
142		let event = WindowEvent::from_row(&row, event_timestamp);
143		let event_row_number = event.row_number;
144		window_state.events.push(event);
145		window_state.event_count += 1;
146		window_state.last_event_time = event_timestamp;
147
148		if window_state.window_start == 0 {
149			window_state.window_start = event_timestamp;
150		}
151
152		operator.store_row_index(txn, group_hash, event_row_number, window_id)?;
153
154		let eviction_ts = if operator.ts.is_some() {
155			event_timestamp
156		} else {
157			current_timestamp
158		};
159		operator.evict_old_events(&mut window_state, eviction_ts);
160
161		if !window_state.events.is_empty() {
162			if let Some((aggregated_row, is_new)) =
163				operator.apply_aggregations(txn, &window_key, &layout, &window_state.events)?
164			{
165				result.push(WindowOperator::emit_aggregation_diff(
166					&aggregated_row,
167					is_new,
168					previous_aggregation,
169				));
170			}
171		}
172	}
173
174	operator.save_window_state(txn, &window_key, &window_state)?;
175
176	Ok(result)
177}
178
179/// Apply changes for rolling windows (no expiration — eviction handles cleanup)
180pub fn apply_rolling_window(operator: &WindowOperator, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
181	let diffs = operator.apply_window_change(txn, &change, false, |op, txn, columns| {
182		op.process_insert(txn, columns, process_rolling_group_insert)
183	})?;
184	Ok(Change::from_flow(operator.node, change.version, diffs))
185}