Skip to main content

reifydb_sub_flow/operator/window/
tumbling.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3use reifydb_core::{
4	common::{WindowSize, WindowType},
5	interface::change::{Change, Diff},
6	value::column::columns::Columns,
7};
8use reifydb_runtime::hash::Hash128;
9use reifydb_type::Result;
10
11use super::{WindowEvent, WindowOperator, WindowState};
12use crate::transaction::FlowTransaction;
13
14impl WindowOperator {
15	/// Determine which window an event belongs to for tumbling windows
16	pub fn get_tumbling_window_id(&self, timestamp: u64) -> u64 {
17		match (&self.window_type, &self.size) {
18			(WindowType::Time(_), WindowSize::Duration(duration)) => {
19				let window_size_ms = duration.as_millis() as u64;
20				// Tumbling window - align to window boundaries from epoch
21				let window_start = (timestamp / window_size_ms) * window_size_ms;
22				window_start / window_size_ms
23			}
24			(WindowType::Count, WindowSize::Count(count)) => {
25				// to track event counts per group
26				timestamp / *count
27			}
28			_ => {
29				// Mismatched window type and size
30				0
31			}
32		}
33	}
34
35	/// Set window start time for tumbling windows (aligned to window boundaries)
36	pub fn set_tumbling_window_start(&self, timestamp: u64) -> u64 {
37		match &self.size {
38			WindowSize::Duration(duration) => {
39				let window_size_ms = duration.as_millis() as u64;
40				(timestamp / window_size_ms) * window_size_ms
41			}
42			_ => timestamp,
43		}
44	}
45
46	/// Check if tumbling window should be moved to a new window due to time boundaries
47	pub fn should_start_new_tumbling_window(&self, current_window_start: u64, event_timestamp: u64) -> bool {
48		match &self.size {
49			WindowSize::Duration(duration) => {
50				let window_size_ms = duration.as_millis() as u64;
51				let event_window_start = (event_timestamp / window_size_ms) * window_size_ms;
52				event_window_start != current_window_start
53			}
54			_ => false,
55		}
56	}
57
58	/// Check if a tumbling window should be expired (closed)
59	pub fn should_expire_tumbling_window(&self, state: &WindowState, current_timestamp: u64) -> bool {
60		match (&self.window_type, &self.size, &self.slide) {
61			(WindowType::Time(_), WindowSize::Duration(duration), None) => {
62				let window_size_ms = duration.as_millis() as u64;
63				let expire_time = state.window_start + window_size_ms;
64				current_timestamp >= expire_time
65			}
66			_ => false,
67		}
68	}
69}
70
71/// Process inserts for tumbling windows
72fn process_tumbling_insert(
73	operator: &WindowOperator,
74	txn: &mut FlowTransaction,
75	columns: &Columns,
76) -> Result<Vec<Diff>> {
77	let mut result = Vec::new();
78	let row_count = columns.row_count();
79	if row_count == 0 {
80		return Ok(result);
81	}
82
83	let group_hashes = operator.compute_group_keys(columns)?;
84
85	let groups = columns.partition_by_keys(&group_hashes);
86
87	for (group_hash, group_columns) in groups {
88		let group_result = process_tumbling_group_insert(operator, txn, &group_columns, group_hash)?;
89		result.extend(group_result);
90	}
91
92	Ok(result)
93}
94
95/// Process inserts for a single group in tumbling windows
96fn process_tumbling_group_insert(
97	operator: &WindowOperator,
98	txn: &mut FlowTransaction,
99	columns: &Columns,
100	group_hash: Hash128,
101) -> Result<Vec<Diff>> {
102	let mut result = Vec::new();
103	let row_count = columns.row_count();
104	if row_count == 0 {
105		return Ok(result);
106	}
107
108	let timestamps = operator.extract_timestamps(columns)?;
109
110	for row_idx in 0..row_count {
111		let timestamp = timestamps[row_idx];
112		let (event_timestamp, window_id) = match &operator.window_type {
113			WindowType::Time(_) => {
114				let window_id = operator.get_tumbling_window_id(timestamp);
115				(timestamp, window_id)
116			}
117			WindowType::Count => {
118				// window ID based on global event count
119				let event_timestamp = operator.current_timestamp();
120				let global_count = operator.get_and_increment_global_count(txn, group_hash)?;
121				let window_size = if let WindowSize::Count(count) = &operator.size {
122					*count
123				} else {
124					3 // fallback
125				};
126				let window_id = global_count / window_size;
127				(event_timestamp, window_id)
128			}
129		};
130
131		let window_key = operator.create_window_key(group_hash, window_id);
132		let mut window_state = operator.load_window_state(txn, &window_key)?;
133
134		let single_row_columns = columns.extract_row(row_idx);
135		let row = single_row_columns.to_single_row();
136
137		let event = WindowEvent::from_row(&row, event_timestamp);
138		window_state.events.push(event);
139		window_state.event_count += 1;
140
141		if window_state.window_start == 0 {
142			window_state.window_start = operator.set_tumbling_window_start(event_timestamp);
143		}
144
145		// Always emit result for count-based windows - Insert for first, Update for subsequent
146		if let Some((aggregated_row, is_new)) =
147			operator.apply_aggregations(txn, &window_key, &window_state.events)?
148		{
149			if is_new {
150				result.push(Diff::Insert {
151					post: Columns::from_row(&aggregated_row),
152				});
153			} else {
154				// Window already exists - emit Update
155
156				let previous_events = &window_state.events[..window_state.events.len() - 1];
157				if let Some((previous_aggregated, _)) =
158					operator.apply_aggregations(txn, &window_key, previous_events)?
159				{
160					result.push(Diff::Update {
161						pre: Columns::from_row(&previous_aggregated),
162						post: Columns::from_row(&aggregated_row),
163					});
164				}
165			}
166		}
167
168		operator.save_window_state(txn, &window_key, &window_state)?;
169	}
170
171	Ok(result)
172}
173
174/// Apply changes for tumbling windows
175pub fn apply_tumbling_window(operator: &WindowOperator, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
176	let mut result = Vec::new();
177	let current_timestamp = operator.current_timestamp();
178
179	// First, process any expired windows
180	let expired_diffs = operator.process_expired_windows(txn, current_timestamp)?;
181	result.extend(expired_diffs);
182
183	for diff in change.diffs.iter() {
184		match diff {
185			Diff::Insert {
186				post,
187			} => {
188				let insert_result = process_tumbling_insert(operator, txn, post)?;
189				result.extend(insert_result);
190			}
191			Diff::Update {
192				pre: _,
193				post,
194			} => {
195				let update_result = process_tumbling_insert(operator, txn, post)?;
196				result.extend(update_result);
197			}
198			Diff::Remove {
199				pre: _,
200			} => {
201				// Window operators typically don't handle removes in streaming scenarios
202				// This would require complex retraction logic
203			}
204		}
205	}
206
207	Ok(Change::from_flow(operator.node, change.version, result))
208}