reifydb_sub_flow/operator/window/
tumbling.rs1use reifydb_core::{
4 common::{WindowKind, WindowSize},
5 interface::change::{Change, Diff},
6 value::column::columns::Columns,
7};
8use reifydb_runtime::hash::Hash128;
9use reifydb_type::Result;
10
11use super::{WindowEvent, WindowLayout, WindowOperator};
12use crate::transaction::FlowTransaction;
13
14impl WindowOperator {
15 pub fn get_tumbling_window_id(&self, timestamp: u64) -> u64 {
17 match &self.kind {
18 WindowKind::Tumbling {
19 size: WindowSize::Duration(duration),
20 } => {
21 let window_size_ms = duration.as_millis() as u64;
22 timestamp / window_size_ms
23 }
24 WindowKind::Tumbling {
25 size: WindowSize::Count(count),
26 } => timestamp / *count,
27 _ => 0,
28 }
29 }
30
31 pub fn set_tumbling_window_start(&self, timestamp: u64) -> u64 {
33 if let Some(duration) = self.size_duration() {
34 let window_size_ms = duration.as_millis() as u64;
35 (timestamp / window_size_ms) * window_size_ms
36 } else {
37 timestamp
38 }
39 }
40}
41
42fn process_tumbling_group_insert(
44 operator: &WindowOperator,
45 txn: &mut FlowTransaction,
46 columns: &Columns,
47 group_hash: Hash128,
48) -> Result<Vec<Diff>> {
49 let mut result = Vec::new();
50 let row_count = columns.row_count();
51 if row_count == 0 {
52 return Ok(result);
53 }
54
55 let timestamps = operator.resolve_event_timestamps(columns, row_count)?;
56
57 for row_idx in 0..row_count {
58 let timestamp = timestamps[row_idx];
59 let (event_timestamp, window_id) = if operator.is_count_based() {
60 let event_timestamp = operator.current_timestamp();
61 let global_count = operator.get_and_increment_global_count(txn, group_hash)?;
62 let window_size = operator.size_count().unwrap_or(3);
63 (event_timestamp, global_count / window_size)
64 } else {
65 (timestamp, operator.get_tumbling_window_id(timestamp))
66 };
67
68 let window_key = operator.create_window_key(group_hash, window_id);
69 let mut window_state = operator.load_window_state(txn, &window_key)?;
70
71 let single_row_columns = columns.extract_row(row_idx);
72 let projected = operator.project_columns(&single_row_columns);
73 let row = projected.to_single_row();
74
75 if window_state.window_layout.is_none() {
76 window_state.window_layout = Some(WindowLayout::from_row(&row));
77 }
78 let layout = window_state.layout().clone();
79
80 let previous_aggregation = if !window_state.events.is_empty() {
81 operator.apply_aggregations(txn, &window_key, &layout, &window_state.events)?
82 } else {
83 None
84 };
85
86 let event = WindowEvent::from_row(&row, event_timestamp);
87 let event_row_number = event.row_number;
88 window_state.events.push(event);
89 window_state.event_count += 1;
90 window_state.last_event_time = event_timestamp;
91
92 if window_state.window_start == 0 {
93 window_state.window_start = operator.set_tumbling_window_start(event_timestamp);
94 }
95
96 if let Some((aggregated_row, is_new)) =
97 operator.apply_aggregations(txn, &window_key, &layout, &window_state.events)?
98 {
99 result.push(WindowOperator::emit_aggregation_diff(
100 &aggregated_row,
101 is_new,
102 previous_aggregation,
103 ));
104 }
105
106 operator.save_window_state(txn, &window_key, &window_state)?;
107 operator.store_row_index(txn, group_hash, event_row_number, window_id)?;
108 }
109
110 Ok(result)
111}
112
113pub fn apply_tumbling_window(operator: &WindowOperator, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
115 let diffs = operator.apply_window_change(txn, &change, true, |op, txn, columns| {
116 op.process_insert(txn, columns, process_tumbling_group_insert)
117 })?;
118 Ok(Change::from_flow(operator.node, change.version, diffs))
119}