Skip to main content

reifydb_sub_flow/operator/window/
sliding.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3use reifydb_core::{
4	common::{WindowSize, WindowSlide, WindowType},
5	interface::change::{Change, Diff},
6	value::column::columns::Columns,
7};
8use reifydb_runtime::hash::Hash128;
9use reifydb_type::Result;
10
11use super::{WindowEvent, WindowOperator};
12use crate::transaction::FlowTransaction;
13
14impl WindowOperator {
15	/// Determine which windows an event belongs to for sliding windows
16	/// For time-based windows, pass the event timestamp
17	/// For count-based windows, pass the row index (0-based)
18	pub fn get_sliding_window_ids(&self, timestamp_or_row_index: u64) -> Vec<u64> {
19		match (&self.window_type, &self.size, &self.slide) {
20			(
21				WindowType::Time(_),
22				WindowSize::Duration(duration),
23				Some(WindowSlide::Duration(slide_duration)),
24			) => {
25				let window_size_ms = duration.as_millis() as u64;
26				let slide_ms = slide_duration.as_millis() as u64;
27
28				let timestamp = timestamp_or_row_index;
29
30				if slide_ms >= window_size_ms {
31					// Non-overlapping windows - each event belongs to exactly one window
32					let window_id = timestamp / slide_ms;
33					vec![window_id]
34				} else {
35					// Overlapping windows - event belongs to multiple windows
36					let mut windows = Vec::new();
37
38					// A window with ID w starts at w * slide_ms and ends at w * slide_ms +
39					// window_size_ms So timestamp T is in window w if: w * slide_ms <= T < w *
40					// slide_ms + window_size_ms Rearranging: (T - window_size_ms + 1) /
41					// slide_ms <= w <= T / slide_ms
42
43					let min_window_id = if timestamp >= window_size_ms {
44						(timestamp - window_size_ms + 1) / slide_ms
45					} else {
46						0
47					};
48					let max_window_id = timestamp / slide_ms;
49
50					for window_id in min_window_id..=max_window_id {
51						let window_start = window_id * slide_ms;
52						let window_end = window_start + window_size_ms;
53
54						if timestamp >= window_start && timestamp < window_end {
55							windows.push(window_id);
56						}
57					}
58					windows
59				}
60			}
61			(WindowType::Time(_), WindowSize::Duration(duration), Some(WindowSlide::Count(_))) => {
62				// Time windows with count-based slide not supported yet
63				let window_size_ms = duration.as_millis() as u64;
64				let timestamp = timestamp_or_row_index;
65				let base_window = timestamp / window_size_ms;
66				vec![base_window]
67			}
68			(WindowType::Count, WindowSize::Count(count), Some(WindowSlide::Count(slide_count))) => {
69				// Count-based sliding windows
70
71				// Window 0: rows 1,2,3 (global_count 0,1,2)
72				// Window 1: rows 3,4,5 (global_count 2,3,4)
73				// Window 2: rows 5,6,7 (global_count 4,5,6)
74
75				let global_count = timestamp_or_row_index; // 0-based global count from get_and_increment_global_count
76				let mut windows = Vec::new();
77
78				let row_number = global_count + 1; // 1-based row number as expected by test
79
80				// A row N (1-based) belongs to window W if:
81				// W * slide_count + 1 <= N <= W * slide_count + count
82				// Rearranging: (N - count) / slide_count <= W <= (N - 1) / slide_count
83
84				// Mathematical definition: row N belongs to window W if:
85				// W * slide_count <= N-1 < W * slide_count + count (using 0-based indexing)
86				// Converting to 1-based: W * slide_count + 1 <= N <= W * slide_count + count
87				let min_window = if row_number > *count {
88					(row_number - *count) / *slide_count
89				} else {
90					0
91				};
92				let max_window = (row_number - 1) / *slide_count;
93
94				for window_id in min_window..=max_window {
95					let window_start_row = window_id * *slide_count + 1; // 1-based
96					let window_end_row = window_start_row + *count - 1; // 1-based, inclusive
97
98					// Standard sliding window membership check
99					let belongs_to_window =
100						row_number >= window_start_row && row_number <= window_end_row;
101
102					if belongs_to_window {
103						windows.push(window_id);
104					}
105				}
106
107				windows
108			}
109			_ => {
110				// Fallback for unsupported combinations
111				vec![0]
112			}
113		}
114	}
115
116	/// Set window start time for sliding windows (aligned to slide boundaries)
117	pub fn set_sliding_window_start(&self, timestamp: u64, window_id: u64) -> u64 {
118		match (&self.window_type, &self.size, &self.slide) {
119			(WindowType::Time(_), WindowSize::Duration(_), Some(WindowSlide::Duration(slide_duration))) => {
120				let slide_ms = slide_duration.as_millis() as u64;
121				let window_start = window_id * slide_ms;
122				window_start
123			}
124			_ => {
125				// Fallback: use timestamp as-is
126				timestamp
127			}
128		}
129	}
130}
131
132/// Process inserts for sliding windows
133fn process_sliding_insert(
134	operator: &WindowOperator,
135	txn: &mut FlowTransaction,
136	columns: &Columns,
137) -> Result<Vec<Diff>> {
138	let mut result = Vec::new();
139	let row_count = columns.row_count();
140	if row_count == 0 {
141		return Ok(result);
142	}
143
144	let group_hashes = operator.compute_group_keys(columns)?;
145
146	let groups = columns.partition_by_keys(&group_hashes);
147
148	for (group_hash, group_columns) in groups {
149		let group_result = process_sliding_group_insert(operator, txn, &group_columns, group_hash)?;
150		result.extend(group_result);
151	}
152
153	Ok(result)
154}
155
156/// Process inserts for a single group in sliding windows
157fn process_sliding_group_insert(
158	operator: &WindowOperator,
159	txn: &mut FlowTransaction,
160	columns: &Columns,
161	group_hash: Hash128,
162) -> Result<Vec<Diff>> {
163	let mut result = Vec::new();
164	let row_count = columns.row_count();
165	if row_count == 0 {
166		return Ok(result);
167	}
168
169	let timestamps = operator.extract_timestamps(columns)?;
170
171	for row_idx in 0..row_count {
172		let timestamp = timestamps[row_idx];
173		let (event_timestamp, window_ids) = match &operator.window_type {
174			WindowType::Time(_) => {
175				let window_ids = operator.get_sliding_window_ids(timestamp);
176				(timestamp, window_ids)
177			}
178			WindowType::Count => {
179				// proper sliding window IDs based on event index
180				let event_timestamp = operator.current_timestamp();
181				let group_count = operator.get_and_increment_global_count(txn, group_hash)?;
182				let window_ids = operator.get_sliding_window_ids(group_count);
183				(event_timestamp, window_ids)
184			}
185		};
186
187		let single_row_columns = columns.extract_row(row_idx);
188		let row = single_row_columns.to_single_row();
189
190		for window_id in window_ids {
191			let window_key = operator.create_window_key(group_hash, window_id);
192			let mut window_state = operator.load_window_state(txn, &window_key)?;
193
194			let previous_aggregation = if window_state.events.len() >= operator.min_events {
195				operator.apply_aggregations(txn, &window_key, &window_state.events)?
196			} else {
197				None
198			};
199
200			let event = WindowEvent::from_row(&row, event_timestamp);
201			window_state.events.push(event);
202			window_state.event_count += 1;
203
204			if window_state.window_start == 0 {
205				window_state.window_start =
206					operator.set_sliding_window_start(event_timestamp, window_id);
207			}
208
209			let trigger_check_time = operator.current_timestamp();
210			let should_trigger = operator.should_trigger_window(&window_state, trigger_check_time);
211
212			if should_trigger {
213				if let Some((aggregated_row, is_new)) =
214					operator.apply_aggregations(txn, &window_key, &window_state.events)?
215				{
216					if is_new {
217						result.push(Diff::Insert {
218							post: Columns::from_row(&aggregated_row),
219						});
220					} else {
221						// Window exists, need to emit Update with previous state
222						if let Some((previous_row, _)) = previous_aggregation {
223							result.push(Diff::Update {
224								pre: Columns::from_row(&previous_row),
225								post: Columns::from_row(&aggregated_row),
226							});
227						} else {
228							// Fallback to Insert if we can't get previous state
229							result.push(Diff::Insert {
230								post: Columns::from_row(&aggregated_row),
231							});
232						}
233					}
234				}
235			}
236
237			operator.save_window_state(txn, &window_key, &window_state)?;
238		}
239	}
240
241	Ok(result)
242}
243
244/// Apply changes for sliding windows
245pub fn apply_sliding_window(operator: &WindowOperator, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
246	let mut result = Vec::new();
247	let current_timestamp = operator.current_timestamp();
248
249	// First, process any expired windows
250	let expired_diffs = operator.process_expired_windows(txn, current_timestamp)?;
251	result.extend(expired_diffs);
252
253	for diff in change.diffs.iter() {
254		match diff {
255			Diff::Insert {
256				post,
257			} => {
258				let insert_result = process_sliding_insert(operator, txn, post)?;
259				result.extend(insert_result);
260			}
261			Diff::Update {
262				pre: _,
263				post,
264			} => {
265				let update_result = process_sliding_insert(operator, txn, post)?;
266				result.extend(update_result);
267			}
268			Diff::Remove {
269				pre: _,
270			} => {
271				// Window operators typically don't handle removes in streaming scenarios
272				// This would require complex retraction logic
273			}
274		}
275	}
276
277	Ok(Change::from_flow(operator.node, change.version, result))
278}