Skip to main content

reifydb_sub_flow/operator/window/
rolling.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3use reifydb_core::{
4	common::{WindowSize, WindowType},
5	interface::change::{Change, Diff},
6	value::column::columns::Columns,
7};
8use reifydb_runtime::hash::Hash128;
9use reifydb_type::Result;
10
11use super::{WindowEvent, WindowOperator, WindowState};
12use crate::transaction::FlowTransaction;
13
14impl WindowOperator {
15	/// Check if rolling window should evict old events
16	pub fn should_evict_rolling_window(&self, state: &WindowState, current_timestamp: u64) -> bool {
17		match (&self.window_type, &self.size) {
18			(WindowType::Time(_), WindowSize::Duration(duration)) => {
19				if state.events.is_empty() {
20					return false;
21				}
22				let window_size_ms = duration.as_millis() as u64;
23				let oldest_timestamp = state.events[0].timestamp;
24				current_timestamp - oldest_timestamp > window_size_ms
25			}
26			(WindowType::Count, WindowSize::Count(count)) => state.event_count > *count,
27			_ => false,
28		}
29	}
30
31	/// Evict old events from rolling window to maintain size limit
32	pub fn evict_old_events(&self, state: &mut WindowState, current_timestamp: u64) {
33		match (&self.window_type, &self.size) {
34			(WindowType::Time(_), WindowSize::Duration(duration)) => {
35				let window_size_ms = duration.as_millis() as u64;
36				let cutoff_time = current_timestamp - window_size_ms;
37
38				let original_len = state.events.len();
39				state.events.retain(|event| event.timestamp > cutoff_time);
40				let evicted_count = original_len - state.events.len();
41				state.event_count = state.event_count.saturating_sub(evicted_count as u64);
42			}
43			(WindowType::Count, WindowSize::Count(count)) => {
44				// Keep only the most recent 'count' events
45				if state.events.len() > *count as usize {
46					let excess = state.events.len() - *count as usize;
47					state.events.drain(0..excess);
48					state.event_count = *count;
49				}
50			}
51			_ => {}
52		}
53	}
54}
55
56/// Process inserts for rolling windows
57fn process_rolling_insert(
58	operator: &WindowOperator,
59	txn: &mut FlowTransaction,
60	columns: &Columns,
61) -> Result<Vec<Diff>> {
62	let mut result = Vec::new();
63	let row_count = columns.row_count();
64	if row_count == 0 {
65		return Ok(result);
66	}
67
68	let current_timestamp = operator.current_timestamp();
69
70	let group_hashes = operator.compute_group_keys(columns)?;
71
72	let groups = columns.partition_by_keys(&group_hashes);
73
74	for (group_hash, group_columns) in groups {
75		let group_result =
76			process_rolling_group_insert(operator, txn, &group_columns, group_hash, current_timestamp)?;
77		result.extend(group_result);
78	}
79
80	Ok(result)
81}
82
83/// Process inserts for a single group in rolling windows
84fn process_rolling_group_insert(
85	operator: &WindowOperator,
86	txn: &mut FlowTransaction,
87	columns: &Columns,
88	group_hash: Hash128,
89	current_timestamp: u64,
90) -> Result<Vec<Diff>> {
91	let mut result = Vec::new();
92	let row_count = columns.row_count();
93	if row_count == 0 {
94		return Ok(result);
95	}
96
97	let timestamps = operator.extract_timestamps(columns)?;
98
99	let window_id = 0u64;
100	let window_key = operator.create_window_key(group_hash, window_id);
101	let mut window_state = operator.load_window_state(txn, &window_key)?;
102
103	for row_idx in 0..row_count {
104		let event_timestamp = timestamps[row_idx];
105
106		let previous_aggregation = if window_state.events.len() >= operator.min_events {
107			operator.apply_aggregations(txn, &window_key, &window_state.events)?
108		} else {
109			None
110		};
111
112		let single_row_columns = columns.extract_row(row_idx);
113		let row = single_row_columns.to_single_row();
114
115		let event = WindowEvent::from_row(&row, event_timestamp);
116		window_state.events.push(event);
117		window_state.event_count += 1;
118
119		if window_state.window_start == 0 {
120			window_state.window_start = event_timestamp;
121		}
122
123		// Evict old events to maintain rolling window size
124		operator.evict_old_events(&mut window_state, current_timestamp);
125
126		// Always trigger rolling windows (they continuously update)
127		if window_state.events.len() >= operator.min_events {
128			if let Some((aggregated_row, is_new)) =
129				operator.apply_aggregations(txn, &window_key, &window_state.events)?
130			{
131				if is_new {
132					result.push(Diff::Insert {
133						post: Columns::from_row(&aggregated_row),
134					});
135				} else {
136					// Rolling window exists, emit Update with previous state
137					if let Some((previous_row, _)) = previous_aggregation {
138						result.push(Diff::Update {
139							pre: Columns::from_row(&previous_row),
140							post: Columns::from_row(&aggregated_row),
141						});
142					} else {
143						// Fallback to Insert if we can't get previous state
144						result.push(Diff::Insert {
145							post: Columns::from_row(&aggregated_row),
146						});
147					}
148				}
149			}
150		}
151	}
152
153	operator.save_window_state(txn, &window_key, &window_state)?;
154
155	Ok(result)
156}
157
158/// Apply changes for rolling windows
159pub fn apply_rolling_window(operator: &WindowOperator, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
160	let mut result = Vec::new();
161
162	for diff in change.diffs.iter() {
163		match diff {
164			Diff::Insert {
165				post,
166			} => {
167				let insert_result = process_rolling_insert(operator, txn, post)?;
168				result.extend(insert_result);
169			}
170			Diff::Update {
171				pre: _,
172				post,
173			} => {
174				let update_result = process_rolling_insert(operator, txn, post)?;
175				result.extend(update_result);
176			}
177			Diff::Remove {
178				pre: _,
179			} => {
180				// Rolling windows typically don't handle removes in streaming scenarios
181				// This would require complex retraction logic
182			}
183		}
184	}
185
186	Ok(Change::from_flow(operator.node, change.version, result))
187}