reifydb_sub_flow/operator/window/
rolling.rs1use reifydb_core::{
4 common::{WindowSize, WindowType},
5 interface::change::{Change, Diff},
6 value::column::columns::Columns,
7};
8use reifydb_runtime::hash::Hash128;
9use reifydb_type::Result;
10
11use super::{WindowEvent, WindowOperator, WindowState};
12use crate::transaction::FlowTransaction;
13
14impl WindowOperator {
15 pub fn should_evict_rolling_window(&self, state: &WindowState, current_timestamp: u64) -> bool {
17 match (&self.window_type, &self.size) {
18 (WindowType::Time(_), WindowSize::Duration(duration)) => {
19 if state.events.is_empty() {
20 return false;
21 }
22 let window_size_ms = duration.as_millis() as u64;
23 let oldest_timestamp = state.events[0].timestamp;
24 current_timestamp - oldest_timestamp > window_size_ms
25 }
26 (WindowType::Count, WindowSize::Count(count)) => state.event_count > *count,
27 _ => false,
28 }
29 }
30
31 pub fn evict_old_events(&self, state: &mut WindowState, current_timestamp: u64) {
33 match (&self.window_type, &self.size) {
34 (WindowType::Time(_), WindowSize::Duration(duration)) => {
35 let window_size_ms = duration.as_millis() as u64;
36 let cutoff_time = current_timestamp - window_size_ms;
37
38 let original_len = state.events.len();
39 state.events.retain(|event| event.timestamp > cutoff_time);
40 let evicted_count = original_len - state.events.len();
41 state.event_count = state.event_count.saturating_sub(evicted_count as u64);
42 }
43 (WindowType::Count, WindowSize::Count(count)) => {
44 if state.events.len() > *count as usize {
46 let excess = state.events.len() - *count as usize;
47 state.events.drain(0..excess);
48 state.event_count = *count;
49 }
50 }
51 _ => {}
52 }
53 }
54}
55
56fn process_rolling_insert(
58 operator: &WindowOperator,
59 txn: &mut FlowTransaction,
60 columns: &Columns,
61) -> Result<Vec<Diff>> {
62 let mut result = Vec::new();
63 let row_count = columns.row_count();
64 if row_count == 0 {
65 return Ok(result);
66 }
67
68 let current_timestamp = operator.current_timestamp();
69
70 let group_hashes = operator.compute_group_keys(columns)?;
71
72 let groups = columns.partition_by_keys(&group_hashes);
73
74 for (group_hash, group_columns) in groups {
75 let group_result =
76 process_rolling_group_insert(operator, txn, &group_columns, group_hash, current_timestamp)?;
77 result.extend(group_result);
78 }
79
80 Ok(result)
81}
82
83fn process_rolling_group_insert(
85 operator: &WindowOperator,
86 txn: &mut FlowTransaction,
87 columns: &Columns,
88 group_hash: Hash128,
89 current_timestamp: u64,
90) -> Result<Vec<Diff>> {
91 let mut result = Vec::new();
92 let row_count = columns.row_count();
93 if row_count == 0 {
94 return Ok(result);
95 }
96
97 let timestamps = operator.extract_timestamps(columns)?;
98
99 let window_id = 0u64;
100 let window_key = operator.create_window_key(group_hash, window_id);
101 let mut window_state = operator.load_window_state(txn, &window_key)?;
102
103 for row_idx in 0..row_count {
104 let event_timestamp = timestamps[row_idx];
105
106 let previous_aggregation = if window_state.events.len() >= operator.min_events {
107 operator.apply_aggregations(txn, &window_key, &window_state.events)?
108 } else {
109 None
110 };
111
112 let single_row_columns = columns.extract_row(row_idx);
113 let row = single_row_columns.to_single_row();
114
115 let event = WindowEvent::from_row(&row, event_timestamp);
116 window_state.events.push(event);
117 window_state.event_count += 1;
118
119 if window_state.window_start == 0 {
120 window_state.window_start = event_timestamp;
121 }
122
123 operator.evict_old_events(&mut window_state, current_timestamp);
125
126 if window_state.events.len() >= operator.min_events {
128 if let Some((aggregated_row, is_new)) =
129 operator.apply_aggregations(txn, &window_key, &window_state.events)?
130 {
131 if is_new {
132 result.push(Diff::Insert {
133 post: Columns::from_row(&aggregated_row),
134 });
135 } else {
136 if let Some((previous_row, _)) = previous_aggregation {
138 result.push(Diff::Update {
139 pre: Columns::from_row(&previous_row),
140 post: Columns::from_row(&aggregated_row),
141 });
142 } else {
143 result.push(Diff::Insert {
145 post: Columns::from_row(&aggregated_row),
146 });
147 }
148 }
149 }
150 }
151 }
152
153 operator.save_window_state(txn, &window_key, &window_state)?;
154
155 Ok(result)
156}
157
158pub fn apply_rolling_window(operator: &WindowOperator, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
160 let mut result = Vec::new();
161
162 for diff in change.diffs.iter() {
163 match diff {
164 Diff::Insert {
165 post,
166 } => {
167 let insert_result = process_rolling_insert(operator, txn, post)?;
168 result.extend(insert_result);
169 }
170 Diff::Update {
171 pre: _,
172 post,
173 } => {
174 let update_result = process_rolling_insert(operator, txn, post)?;
175 result.extend(update_result);
176 }
177 Diff::Remove {
178 pre: _,
179 } => {
180 }
183 }
184 }
185
186 Ok(Change::from_flow(operator.node, change.version, result))
187}