Skip to main content

reifydb_sub_flow/operator/window/
sliding.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::{
5	common::{WindowKind, WindowSize},
6	interface::change::{Change, Diff},
7	value::column::columns::Columns,
8};
9use reifydb_runtime::hash::Hash128;
10use reifydb_type::Result;
11
12use super::{WindowEvent, WindowLayout, WindowOperator};
13use crate::transaction::FlowTransaction;
14
15impl WindowOperator {
16	/// Determine which windows an event belongs to for sliding windows
17	pub fn get_sliding_window_ids(&self, timestamp_or_row_index: u64) -> Vec<u64> {
18		match &self.kind {
19			WindowKind::Sliding {
20				size: WindowSize::Duration(duration),
21				slide: WindowSize::Duration(slide_duration),
22			} => {
23				let window_size_ms = duration.as_millis() as u64;
24				let slide_ms = slide_duration.as_millis() as u64;
25				let timestamp = timestamp_or_row_index;
26
27				if slide_ms >= window_size_ms {
28					vec![timestamp / slide_ms]
29				} else {
30					let min_window_id = if timestamp >= window_size_ms {
31						(timestamp - window_size_ms + 1) / slide_ms
32					} else {
33						0
34					};
35					let max_window_id = timestamp / slide_ms;
36					(min_window_id..=max_window_id)
37						.filter(|&wid| {
38							let start = wid * slide_ms;
39							timestamp >= start && timestamp < start + window_size_ms
40						})
41						.collect()
42				}
43			}
44			WindowKind::Sliding {
45				size: WindowSize::Count(count),
46				slide: WindowSize::Count(slide_count),
47			} => {
48				let row_number = timestamp_or_row_index + 1; // 1-based
49				let min_window = if row_number > *count {
50					(row_number - *count) / *slide_count
51				} else {
52					0
53				};
54				let max_window = (row_number - 1) / *slide_count;
55				(min_window..=max_window)
56					.filter(|&wid| {
57						let start_row = wid * *slide_count + 1;
58						let end_row = start_row + *count - 1;
59						row_number >= start_row && row_number <= end_row
60					})
61					.collect()
62			}
63			_ => vec![0],
64		}
65	}
66
67	/// Set window start time for sliding windows (aligned to slide boundaries)
68	pub fn set_sliding_window_start(&self, timestamp: u64, window_id: u64) -> u64 {
69		match &self.kind {
70			WindowKind::Sliding {
71				slide: WindowSize::Duration(slide_duration),
72				..
73			} => {
74				let slide_ms = slide_duration.as_millis() as u64;
75				window_id * slide_ms
76			}
77			_ => timestamp,
78		}
79	}
80}
81
82/// Process inserts for a single group in sliding windows
83fn process_sliding_group_insert(
84	operator: &WindowOperator,
85	txn: &mut FlowTransaction,
86	columns: &Columns,
87	group_hash: Hash128,
88) -> Result<Vec<Diff>> {
89	let mut result = Vec::new();
90	let row_count = columns.row_count();
91	if row_count == 0 {
92		return Ok(result);
93	}
94
95	let timestamps = operator.resolve_event_timestamps(columns, row_count)?;
96
97	for row_idx in 0..row_count {
98		let timestamp = timestamps[row_idx];
99		let (event_timestamp, window_ids) = if operator.is_count_based() {
100			let event_timestamp = operator.current_timestamp();
101			let group_count = operator.get_and_increment_global_count(txn, group_hash)?;
102			(event_timestamp, operator.get_sliding_window_ids(group_count))
103		} else {
104			(timestamp, operator.get_sliding_window_ids(timestamp))
105		};
106
107		let single_row_columns = columns.extract_row(row_idx);
108		let projected = operator.project_columns(&single_row_columns);
109		let row = projected.to_single_row();
110		let row_layout = WindowLayout::from_row(&row);
111
112		for window_id in window_ids {
113			let window_key = operator.create_window_key(group_hash, window_id);
114			let mut window_state = operator.load_window_state(txn, &window_key)?;
115
116			if window_state.window_layout.is_none() {
117				window_state.window_layout = Some(row_layout.clone());
118			}
119			let layout = window_state.layout().clone();
120
121			let previous_aggregation = if !window_state.events.is_empty() {
122				operator.apply_aggregations(txn, &window_key, &layout, &window_state.events)?
123			} else {
124				None
125			};
126
127			let event = WindowEvent::from_row(&row, event_timestamp);
128			let event_row_number = event.row_number;
129			window_state.events.push(event);
130			window_state.event_count += 1;
131			window_state.last_event_time = event_timestamp;
132
133			if window_state.window_start == 0 {
134				window_state.window_start =
135					operator.set_sliding_window_start(event_timestamp, window_id);
136			}
137
138			if let Some((aggregated_row, is_new)) =
139				operator.apply_aggregations(txn, &window_key, &layout, &window_state.events)?
140			{
141				result.push(WindowOperator::emit_aggregation_diff(
142					&aggregated_row,
143					is_new,
144					previous_aggregation,
145				));
146			}
147
148			operator.save_window_state(txn, &window_key, &window_state)?;
149			operator.store_row_index(txn, group_hash, event_row_number, window_id)?;
150		}
151	}
152
153	Ok(result)
154}
155
156/// Apply changes for sliding windows
157pub fn apply_sliding_window(operator: &WindowOperator, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
158	let diffs = operator.apply_window_change(txn, &change, true, |op, txn, columns| {
159		op.process_insert(txn, columns, process_sliding_group_insert)
160	})?;
161	Ok(Change::from_flow(operator.node, change.version, diffs))
162}