reifydb_sub_flow/operator/window/
sliding.rs1use reifydb_core::{
4 common::{WindowSize, WindowSlide, WindowType},
5 interface::change::{Change, Diff},
6 value::column::columns::Columns,
7};
8use reifydb_runtime::hash::Hash128;
9use reifydb_type::Result;
10
11use super::{WindowEvent, WindowOperator};
12use crate::transaction::FlowTransaction;
13
14impl WindowOperator {
15 pub fn get_sliding_window_ids(&self, timestamp_or_row_index: u64) -> Vec<u64> {
19 match (&self.window_type, &self.size, &self.slide) {
20 (
21 WindowType::Time(_),
22 WindowSize::Duration(duration),
23 Some(WindowSlide::Duration(slide_duration)),
24 ) => {
25 let window_size_ms = duration.as_millis() as u64;
26 let slide_ms = slide_duration.as_millis() as u64;
27
28 let timestamp = timestamp_or_row_index;
29
30 if slide_ms >= window_size_ms {
31 let window_id = timestamp / slide_ms;
33 vec![window_id]
34 } else {
35 let mut windows = Vec::new();
37
38 let min_window_id = if timestamp >= window_size_ms {
44 (timestamp - window_size_ms + 1) / slide_ms
45 } else {
46 0
47 };
48 let max_window_id = timestamp / slide_ms;
49
50 for window_id in min_window_id..=max_window_id {
51 let window_start = window_id * slide_ms;
52 let window_end = window_start + window_size_ms;
53
54 if timestamp >= window_start && timestamp < window_end {
55 windows.push(window_id);
56 }
57 }
58 windows
59 }
60 }
61 (WindowType::Time(_), WindowSize::Duration(duration), Some(WindowSlide::Count(_))) => {
62 let window_size_ms = duration.as_millis() as u64;
64 let timestamp = timestamp_or_row_index;
65 let base_window = timestamp / window_size_ms;
66 vec![base_window]
67 }
68 (WindowType::Count, WindowSize::Count(count), Some(WindowSlide::Count(slide_count))) => {
69 let global_count = timestamp_or_row_index; let mut windows = Vec::new();
77
78 let row_number = global_count + 1; let min_window = if row_number > *count {
88 (row_number - *count) / *slide_count
89 } else {
90 0
91 };
92 let max_window = (row_number - 1) / *slide_count;
93
94 for window_id in min_window..=max_window {
95 let window_start_row = window_id * *slide_count + 1; let window_end_row = window_start_row + *count - 1; let belongs_to_window =
100 row_number >= window_start_row && row_number <= window_end_row;
101
102 if belongs_to_window {
103 windows.push(window_id);
104 }
105 }
106
107 windows
108 }
109 _ => {
110 vec![0]
112 }
113 }
114 }
115
116 pub fn set_sliding_window_start(&self, timestamp: u64, window_id: u64) -> u64 {
118 match (&self.window_type, &self.size, &self.slide) {
119 (WindowType::Time(_), WindowSize::Duration(_), Some(WindowSlide::Duration(slide_duration))) => {
120 let slide_ms = slide_duration.as_millis() as u64;
121 let window_start = window_id * slide_ms;
122 window_start
123 }
124 _ => {
125 timestamp
127 }
128 }
129 }
130}
131
132fn process_sliding_insert(
134 operator: &WindowOperator,
135 txn: &mut FlowTransaction,
136 columns: &Columns,
137) -> Result<Vec<Diff>> {
138 let mut result = Vec::new();
139 let row_count = columns.row_count();
140 if row_count == 0 {
141 return Ok(result);
142 }
143
144 let group_hashes = operator.compute_group_keys(columns)?;
145
146 let groups = columns.partition_by_keys(&group_hashes);
147
148 for (group_hash, group_columns) in groups {
149 let group_result = process_sliding_group_insert(operator, txn, &group_columns, group_hash)?;
150 result.extend(group_result);
151 }
152
153 Ok(result)
154}
155
156fn process_sliding_group_insert(
158 operator: &WindowOperator,
159 txn: &mut FlowTransaction,
160 columns: &Columns,
161 group_hash: Hash128,
162) -> Result<Vec<Diff>> {
163 let mut result = Vec::new();
164 let row_count = columns.row_count();
165 if row_count == 0 {
166 return Ok(result);
167 }
168
169 let timestamps = operator.extract_timestamps(columns)?;
170
171 for row_idx in 0..row_count {
172 let timestamp = timestamps[row_idx];
173 let (event_timestamp, window_ids) = match &operator.window_type {
174 WindowType::Time(_) => {
175 let window_ids = operator.get_sliding_window_ids(timestamp);
176 (timestamp, window_ids)
177 }
178 WindowType::Count => {
179 let event_timestamp = operator.current_timestamp();
181 let group_count = operator.get_and_increment_global_count(txn, group_hash)?;
182 let window_ids = operator.get_sliding_window_ids(group_count);
183 (event_timestamp, window_ids)
184 }
185 };
186
187 let single_row_columns = columns.extract_row(row_idx);
188 let row = single_row_columns.to_single_row();
189
190 for window_id in window_ids {
191 let window_key = operator.create_window_key(group_hash, window_id);
192 let mut window_state = operator.load_window_state(txn, &window_key)?;
193
194 let previous_aggregation = if window_state.events.len() >= operator.min_events {
195 operator.apply_aggregations(txn, &window_key, &window_state.events)?
196 } else {
197 None
198 };
199
200 let event = WindowEvent::from_row(&row, event_timestamp);
201 window_state.events.push(event);
202 window_state.event_count += 1;
203
204 if window_state.window_start == 0 {
205 window_state.window_start =
206 operator.set_sliding_window_start(event_timestamp, window_id);
207 }
208
209 let trigger_check_time = operator.current_timestamp();
210 let should_trigger = operator.should_trigger_window(&window_state, trigger_check_time);
211
212 if should_trigger {
213 if let Some((aggregated_row, is_new)) =
214 operator.apply_aggregations(txn, &window_key, &window_state.events)?
215 {
216 if is_new {
217 result.push(Diff::Insert {
218 post: Columns::from_row(&aggregated_row),
219 });
220 } else {
221 if let Some((previous_row, _)) = previous_aggregation {
223 result.push(Diff::Update {
224 pre: Columns::from_row(&previous_row),
225 post: Columns::from_row(&aggregated_row),
226 });
227 } else {
228 result.push(Diff::Insert {
230 post: Columns::from_row(&aggregated_row),
231 });
232 }
233 }
234 }
235 }
236
237 operator.save_window_state(txn, &window_key, &window_state)?;
238 }
239 }
240
241 Ok(result)
242}
243
244pub fn apply_sliding_window(operator: &WindowOperator, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
246 let mut result = Vec::new();
247 let current_timestamp = operator.current_timestamp();
248
249 let expired_diffs = operator.process_expired_windows(txn, current_timestamp)?;
251 result.extend(expired_diffs);
252
253 for diff in change.diffs.iter() {
254 match diff {
255 Diff::Insert {
256 post,
257 } => {
258 let insert_result = process_sliding_insert(operator, txn, post)?;
259 result.extend(insert_result);
260 }
261 Diff::Update {
262 pre: _,
263 post,
264 } => {
265 let update_result = process_sliding_insert(operator, txn, post)?;
266 result.extend(update_result);
267 }
268 Diff::Remove {
269 pre: _,
270 } => {
271 }
274 }
275 }
276
277 Ok(Change::from_flow(operator.node, change.version, result))
278}