reifydb_sub_flow/operator/window/
tumbling.rs1use reifydb_core::{
4 common::{WindowSize, WindowType},
5 interface::change::{Change, Diff},
6 value::column::columns::Columns,
7};
8use reifydb_runtime::hash::Hash128;
9use reifydb_type::Result;
10
11use super::{WindowEvent, WindowOperator, WindowState};
12use crate::transaction::FlowTransaction;
13
14impl WindowOperator {
15 pub fn get_tumbling_window_id(&self, timestamp: u64) -> u64 {
17 match (&self.window_type, &self.size) {
18 (WindowType::Time(_), WindowSize::Duration(duration)) => {
19 let window_size_ms = duration.as_millis() as u64;
20 let window_start = (timestamp / window_size_ms) * window_size_ms;
22 window_start / window_size_ms
23 }
24 (WindowType::Count, WindowSize::Count(count)) => {
25 timestamp / *count
27 }
28 _ => {
29 0
31 }
32 }
33 }
34
35 pub fn set_tumbling_window_start(&self, timestamp: u64) -> u64 {
37 match &self.size {
38 WindowSize::Duration(duration) => {
39 let window_size_ms = duration.as_millis() as u64;
40 (timestamp / window_size_ms) * window_size_ms
41 }
42 _ => timestamp,
43 }
44 }
45
46 pub fn should_start_new_tumbling_window(&self, current_window_start: u64, event_timestamp: u64) -> bool {
48 match &self.size {
49 WindowSize::Duration(duration) => {
50 let window_size_ms = duration.as_millis() as u64;
51 let event_window_start = (event_timestamp / window_size_ms) * window_size_ms;
52 event_window_start != current_window_start
53 }
54 _ => false,
55 }
56 }
57
58 pub fn should_expire_tumbling_window(&self, state: &WindowState, current_timestamp: u64) -> bool {
60 match (&self.window_type, &self.size, &self.slide) {
61 (WindowType::Time(_), WindowSize::Duration(duration), None) => {
62 let window_size_ms = duration.as_millis() as u64;
63 let expire_time = state.window_start + window_size_ms;
64 current_timestamp >= expire_time
65 }
66 _ => false,
67 }
68 }
69}
70
71fn process_tumbling_insert(
73 operator: &WindowOperator,
74 txn: &mut FlowTransaction,
75 columns: &Columns,
76) -> Result<Vec<Diff>> {
77 let mut result = Vec::new();
78 let row_count = columns.row_count();
79 if row_count == 0 {
80 return Ok(result);
81 }
82
83 let group_hashes = operator.compute_group_keys(columns)?;
84
85 let groups = columns.partition_by_keys(&group_hashes);
86
87 for (group_hash, group_columns) in groups {
88 let group_result = process_tumbling_group_insert(operator, txn, &group_columns, group_hash)?;
89 result.extend(group_result);
90 }
91
92 Ok(result)
93}
94
95fn process_tumbling_group_insert(
97 operator: &WindowOperator,
98 txn: &mut FlowTransaction,
99 columns: &Columns,
100 group_hash: Hash128,
101) -> Result<Vec<Diff>> {
102 let mut result = Vec::new();
103 let row_count = columns.row_count();
104 if row_count == 0 {
105 return Ok(result);
106 }
107
108 let timestamps = operator.extract_timestamps(columns)?;
109
110 for row_idx in 0..row_count {
111 let timestamp = timestamps[row_idx];
112 let (event_timestamp, window_id) = match &operator.window_type {
113 WindowType::Time(_) => {
114 let window_id = operator.get_tumbling_window_id(timestamp);
115 (timestamp, window_id)
116 }
117 WindowType::Count => {
118 let event_timestamp = operator.current_timestamp();
120 let global_count = operator.get_and_increment_global_count(txn, group_hash)?;
121 let window_size = if let WindowSize::Count(count) = &operator.size {
122 *count
123 } else {
124 3 };
126 let window_id = global_count / window_size;
127 (event_timestamp, window_id)
128 }
129 };
130
131 let window_key = operator.create_window_key(group_hash, window_id);
132 let mut window_state = operator.load_window_state(txn, &window_key)?;
133
134 let single_row_columns = columns.extract_row(row_idx);
135 let row = single_row_columns.to_single_row();
136
137 let event = WindowEvent::from_row(&row, event_timestamp);
138 window_state.events.push(event);
139 window_state.event_count += 1;
140
141 if window_state.window_start == 0 {
142 window_state.window_start = operator.set_tumbling_window_start(event_timestamp);
143 }
144
145 if let Some((aggregated_row, is_new)) =
147 operator.apply_aggregations(txn, &window_key, &window_state.events)?
148 {
149 if is_new {
150 result.push(Diff::Insert {
151 post: Columns::from_row(&aggregated_row),
152 });
153 } else {
154 let previous_events = &window_state.events[..window_state.events.len() - 1];
157 if let Some((previous_aggregated, _)) =
158 operator.apply_aggregations(txn, &window_key, previous_events)?
159 {
160 result.push(Diff::Update {
161 pre: Columns::from_row(&previous_aggregated),
162 post: Columns::from_row(&aggregated_row),
163 });
164 }
165 }
166 }
167
168 operator.save_window_state(txn, &window_key, &window_state)?;
169 }
170
171 Ok(result)
172}
173
174pub fn apply_tumbling_window(operator: &WindowOperator, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
176 let mut result = Vec::new();
177 let current_timestamp = operator.current_timestamp();
178
179 let expired_diffs = operator.process_expired_windows(txn, current_timestamp)?;
181 result.extend(expired_diffs);
182
183 for diff in change.diffs.iter() {
184 match diff {
185 Diff::Insert {
186 post,
187 } => {
188 let insert_result = process_tumbling_insert(operator, txn, post)?;
189 result.extend(insert_result);
190 }
191 Diff::Update {
192 pre: _,
193 post,
194 } => {
195 let update_result = process_tumbling_insert(operator, txn, post)?;
196 result.extend(update_result);
197 }
198 Diff::Remove {
199 pre: _,
200 } => {
201 }
204 }
205 }
206
207 Ok(Change::from_flow(operator.node, change.version, result))
208}