Skip to main content

reifydb_sub_flow/operator/window/
sliding.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::{
5	common::{WindowKind, WindowSize},
6	interface::change::{Change, Diff},
7	value::column::columns::Columns,
8};
9use reifydb_runtime::hash::Hash128;
10use reifydb_type::{Result, value::datetime::DateTime};
11
12use super::{WindowEvent, WindowLayout, WindowOperator};
13use crate::transaction::FlowTransaction;
14
15impl WindowOperator {
16	/// Determine which windows an event belongs to for sliding windows
17	pub fn get_sliding_window_ids(&self, timestamp_or_row_index: u64) -> Vec<u64> {
18		match &self.kind {
19			WindowKind::Sliding {
20				size: WindowSize::Duration(duration),
21				slide: WindowSize::Duration(slide_duration),
22			} => {
23				let window_size_ms = duration.as_millis() as u64;
24				let slide_ms = slide_duration.as_millis() as u64;
25				let timestamp = timestamp_or_row_index;
26
27				if slide_ms >= window_size_ms {
28					vec![timestamp / slide_ms]
29				} else {
30					let min_window_id = if timestamp >= window_size_ms {
31						(timestamp - window_size_ms + 1) / slide_ms
32					} else {
33						0
34					};
35					let max_window_id = timestamp / slide_ms;
36					(min_window_id..=max_window_id)
37						.filter(|&wid| {
38							let start = wid * slide_ms;
39							timestamp >= start && timestamp < start + window_size_ms
40						})
41						.collect()
42				}
43			}
44			WindowKind::Sliding {
45				size: WindowSize::Count(count),
46				slide: WindowSize::Count(slide_count),
47			} => {
48				let row_number = timestamp_or_row_index + 1; // 1-based
49				let min_window = if row_number > *count {
50					(row_number - *count) / *slide_count
51				} else {
52					0
53				};
54				let max_window = (row_number - 1) / *slide_count;
55				(min_window..=max_window)
56					.filter(|&wid| {
57						let start_row = wid * *slide_count + 1;
58						let end_row = start_row + *count - 1;
59						row_number >= start_row && row_number <= end_row
60					})
61					.collect()
62			}
63			_ => vec![0],
64		}
65	}
66
67	/// Set window start time for sliding windows (aligned to slide boundaries)
68	pub fn set_sliding_window_start(&self, timestamp: u64, window_id: u64) -> u64 {
69		match &self.kind {
70			WindowKind::Sliding {
71				slide: WindowSize::Duration(slide_duration),
72				..
73			} => {
74				let slide_ms = slide_duration.as_millis() as u64;
75				window_id * slide_ms
76			}
77			_ => timestamp,
78		}
79	}
80}
81
82/// Process inserts for a single group in sliding windows
83fn process_sliding_group_insert(
84	operator: &WindowOperator,
85	txn: &mut FlowTransaction,
86	columns: &Columns,
87	group_hash: Hash128,
88	changed_at: DateTime,
89) -> Result<Vec<Diff>> {
90	let mut result = Vec::new();
91	let row_count = columns.row_count();
92	if row_count == 0 {
93		return Ok(result);
94	}
95
96	let timestamps = operator.resolve_event_timestamps(columns, row_count)?;
97
98	for (row_idx, &timestamp) in timestamps.iter().enumerate() {
99		let (event_timestamp, window_ids) = if operator.is_count_based() {
100			let event_timestamp = operator.current_timestamp();
101			let group_count = operator.get_and_increment_global_count(txn, group_hash)?;
102			(event_timestamp, operator.get_sliding_window_ids(group_count))
103		} else {
104			(timestamp, operator.get_sliding_window_ids(timestamp))
105		};
106
107		let single_row_columns = columns.extract_row(row_idx);
108		let projected = operator.project_columns(&single_row_columns);
109		let row = projected.to_single_row();
110		let row_layout = WindowLayout::from_row(&row);
111
112		for window_id in window_ids {
113			let window_key = operator.create_window_key(group_hash, window_id);
114			let mut window_state = operator.load_window_state(txn, &window_key)?;
115
116			if window_state.window_layout.is_none() {
117				window_state.window_layout = Some(row_layout.clone());
118			}
119			let layout = window_state.layout().clone();
120
121			let previous_aggregation = if !window_state.events.is_empty() {
122				operator.apply_aggregations(
123					txn,
124					&window_key,
125					&layout,
126					&window_state.events,
127					changed_at,
128				)?
129			} else {
130				None
131			};
132
133			let event = WindowEvent::from_row(&row, event_timestamp);
134			let event_row_number = event.row_number;
135			window_state.events.push(event);
136			window_state.event_count += 1;
137			window_state.last_event_time = event_timestamp;
138
139			if window_state.window_start == 0 {
140				window_state.window_start =
141					operator.set_sliding_window_start(event_timestamp, window_id);
142			}
143
144			if let Some((aggregated_row, is_new)) = operator.apply_aggregations(
145				txn,
146				&window_key,
147				&layout,
148				&window_state.events,
149				changed_at,
150			)? {
151				result.push(WindowOperator::emit_aggregation_diff(
152					&aggregated_row,
153					is_new,
154					previous_aggregation,
155				));
156			}
157
158			operator.save_window_state(txn, &window_key, &window_state)?;
159			operator.store_row_index(txn, group_hash, event_row_number, window_id)?;
160		}
161	}
162
163	Ok(result)
164}
165
166/// Apply changes for sliding windows
167pub fn apply_sliding_window(operator: &WindowOperator, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
168	let changed_at = change.changed_at;
169	let diffs = operator.apply_window_change(txn, &change, true, |op, txn, columns| {
170		op.process_insert(txn, columns, changed_at, process_sliding_group_insert)
171	})?;
172	Ok(Change::from_flow(operator.node, change.version, diffs, change.changed_at))
173}