Skip to main content

reifydb_sub_flow/operator/window/
tumbling.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3use reifydb_core::{
4	common::{WindowKind, WindowSize},
5	interface::change::{Change, Diff},
6	value::column::columns::Columns,
7};
8use reifydb_runtime::hash::Hash128;
9use reifydb_type::Result;
10
11use super::{WindowEvent, WindowLayout, WindowOperator};
12use crate::transaction::FlowTransaction;
13
14impl WindowOperator {
15	/// Determine which window an event belongs to for tumbling windows
16	pub fn get_tumbling_window_id(&self, timestamp: u64) -> u64 {
17		match &self.kind {
18			WindowKind::Tumbling {
19				size: WindowSize::Duration(duration),
20			} => {
21				let window_size_ms = duration.as_millis() as u64;
22				timestamp / window_size_ms
23			}
24			WindowKind::Tumbling {
25				size: WindowSize::Count(count),
26			} => timestamp / *count,
27			_ => 0,
28		}
29	}
30
31	/// Set window start time for tumbling windows (aligned to window boundaries)
32	pub fn set_tumbling_window_start(&self, timestamp: u64) -> u64 {
33		if let Some(duration) = self.size_duration() {
34			let window_size_ms = duration.as_millis() as u64;
35			(timestamp / window_size_ms) * window_size_ms
36		} else {
37			timestamp
38		}
39	}
40}
41
42/// Process inserts for a single group in tumbling windows
43fn process_tumbling_group_insert(
44	operator: &WindowOperator,
45	txn: &mut FlowTransaction,
46	columns: &Columns,
47	group_hash: Hash128,
48) -> Result<Vec<Diff>> {
49	let mut result = Vec::new();
50	let row_count = columns.row_count();
51	if row_count == 0 {
52		return Ok(result);
53	}
54
55	let timestamps = operator.resolve_event_timestamps(columns, row_count)?;
56
57	for row_idx in 0..row_count {
58		let timestamp = timestamps[row_idx];
59		let (event_timestamp, window_id) = if operator.is_count_based() {
60			let event_timestamp = operator.current_timestamp();
61			let global_count = operator.get_and_increment_global_count(txn, group_hash)?;
62			let window_size = operator.size_count().unwrap_or(3);
63			(event_timestamp, global_count / window_size)
64		} else {
65			(timestamp, operator.get_tumbling_window_id(timestamp))
66		};
67
68		let window_key = operator.create_window_key(group_hash, window_id);
69		let mut window_state = operator.load_window_state(txn, &window_key)?;
70
71		let single_row_columns = columns.extract_row(row_idx);
72		let projected = operator.project_columns(&single_row_columns);
73		let row = projected.to_single_row();
74
75		if window_state.window_layout.is_none() {
76			window_state.window_layout = Some(WindowLayout::from_row(&row));
77		}
78		let layout = window_state.layout().clone();
79
80		let previous_aggregation = if !window_state.events.is_empty() {
81			operator.apply_aggregations(txn, &window_key, &layout, &window_state.events)?
82		} else {
83			None
84		};
85
86		let event = WindowEvent::from_row(&row, event_timestamp);
87		let event_row_number = event.row_number;
88		window_state.events.push(event);
89		window_state.event_count += 1;
90		window_state.last_event_time = event_timestamp;
91
92		if window_state.window_start == 0 {
93			window_state.window_start = operator.set_tumbling_window_start(event_timestamp);
94		}
95
96		if let Some((aggregated_row, is_new)) =
97			operator.apply_aggregations(txn, &window_key, &layout, &window_state.events)?
98		{
99			result.push(WindowOperator::emit_aggregation_diff(
100				&aggregated_row,
101				is_new,
102				previous_aggregation,
103			));
104		}
105
106		operator.save_window_state(txn, &window_key, &window_state)?;
107		operator.store_row_index(txn, group_hash, event_row_number, window_id)?;
108	}
109
110	Ok(result)
111}
112
113/// Apply changes for tumbling windows
114pub fn apply_tumbling_window(operator: &WindowOperator, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
115	let diffs = operator.apply_window_change(txn, &change, true, |op, txn, columns| {
116		op.process_insert(txn, columns, process_tumbling_group_insert)
117	})?;
118	Ok(Change::from_flow(operator.node, change.version, diffs))
119}