reifydb_sub_flow/operator/window/
rolling.rs1use reifydb_core::{
5 common::{WindowKind, WindowSize},
6 encoded::key::EncodedKey,
7 interface::change::{Change, Diff},
8 key::{EncodableKey, flow_node_state::FlowNodeStateKey},
9 value::column::columns::Columns,
10};
11use reifydb_runtime::hash::Hash128;
12use reifydb_type::{Result, value::datetime::DateTime};
13
14use super::{WindowEvent, WindowLayout, WindowOperator, WindowState};
15use crate::transaction::FlowTransaction;
16
17impl WindowOperator {
18 pub fn evict_old_events(&self, state: &mut WindowState, current_timestamp: u64) {
20 match &self.kind {
21 WindowKind::Rolling {
22 size: WindowSize::Duration(duration),
23 } => {
24 let window_size_ms = duration.as_millis() as u64;
25 let cutoff_time = current_timestamp.saturating_sub(window_size_ms);
26 let original_len = state.events.len();
27 state.events.retain(|event| event.timestamp > cutoff_time);
28 let evicted_count = original_len - state.events.len();
29 state.event_count = state.event_count.saturating_sub(evicted_count as u64);
30 }
31 WindowKind::Rolling {
32 size: WindowSize::Count(count),
33 } => {
34 if state.events.len() > *count as usize {
35 let excess = state.events.len() - *count as usize;
36 state.events.drain(0..excess);
37 state.event_count = *count;
38 }
39 }
40 _ => {}
41 }
42 }
43
44 pub fn tick_rolling_eviction(&self, txn: &mut FlowTransaction, current_timestamp: u64) -> Result<Vec<Diff>> {
47 let mut result = Vec::new();
48
49 let all_state = txn.state_scan(self.node)?;
50 let prefix = FlowNodeStateKey::new(self.node, vec![]).encode();
51 let win_marker = b"win:";
52
53 for item in &all_state.items {
54 let full_key = &item.key;
55 if full_key.len() <= prefix.len() {
56 continue;
57 }
58 let inner = &full_key[prefix.len()..];
59 if !inner.starts_with(win_marker) {
60 continue;
61 }
62
63 let window_key = EncodedKey::new(inner);
64 let mut state = self.load_window_state(txn, &window_key)?;
65 if state.events.is_empty() {
66 continue;
67 }
68 let layout = match &state.window_layout {
69 Some(l) => l.clone(),
70 None => continue,
71 };
72
73 let changed_at = DateTime::from_nanos(current_timestamp);
74 let pre_agg = self.apply_aggregations(txn, &window_key, &layout, &state.events, changed_at)?;
75 let pre_count = state.events.len();
76 self.evict_old_events(&mut state, current_timestamp);
77
78 if state.events.len() < pre_count {
79 if state.events.is_empty() {
80 self.save_window_state(txn, &window_key, &state)?;
81 if let Some((row, _)) = pre_agg {
82 result.push(Diff::remove(Columns::from_row(&row)));
83 }
84 } else {
85 let post_agg = self.apply_aggregations(
86 txn,
87 &window_key,
88 &layout,
89 &state.events,
90 changed_at,
91 )?;
92 self.save_window_state(txn, &window_key, &state)?;
93 if let (Some((pre_row, _)), Some((post_row, _))) = (pre_agg, post_agg) {
94 result.push(Diff::update(
95 Columns::from_row(&pre_row),
96 Columns::from_row(&post_row),
97 ));
98 }
99 }
100 }
101 }
102
103 Ok(result)
104 }
105}
106
107fn process_rolling_group_insert(
110 operator: &WindowOperator,
111 txn: &mut FlowTransaction,
112 columns: &Columns,
113 group_hash: Hash128,
114 changed_at: DateTime,
115) -> Result<Vec<Diff>> {
116 let mut result = Vec::new();
117 let row_count = columns.row_count();
118 if row_count == 0 {
119 return Ok(result);
120 }
121
122 let current_timestamp = operator.current_timestamp();
123 let timestamps = operator.resolve_event_timestamps(columns, row_count)?;
124
125 let window_id = 0u64;
126 let window_key = operator.create_window_key(group_hash, window_id);
127 let mut window_state = operator.load_window_state(txn, &window_key)?;
128
129 for (row_idx, &event_timestamp) in timestamps.iter().enumerate() {
130 let single_row_columns = columns.extract_row(row_idx);
131 let projected = operator.project_columns(&single_row_columns);
132 let row = projected.to_single_row();
133
134 if window_state.window_layout.is_none() {
135 window_state.window_layout = Some(WindowLayout::from_row(&row));
136 }
137 let layout = window_state.layout().clone();
138
139 let previous_aggregation = if !window_state.events.is_empty() {
140 operator.apply_aggregations(txn, &window_key, &layout, &window_state.events, changed_at)?
141 } else {
142 None
143 };
144
145 let event = WindowEvent::from_row(&row, event_timestamp);
146 let event_row_number = event.row_number;
147 window_state.events.push(event);
148 window_state.event_count += 1;
149 window_state.last_event_time = event_timestamp;
150
151 if window_state.window_start == 0 {
152 window_state.window_start = event_timestamp;
153 }
154
155 operator.store_row_index(txn, group_hash, event_row_number, window_id)?;
156
157 let eviction_ts = if operator.ts.is_some() {
158 event_timestamp
159 } else {
160 current_timestamp
161 };
162 operator.evict_old_events(&mut window_state, eviction_ts);
163
164 if !window_state.events.is_empty()
165 && let Some((aggregated_row, is_new)) = operator.apply_aggregations(
166 txn,
167 &window_key,
168 &layout,
169 &window_state.events,
170 changed_at,
171 )? {
172 result.push(WindowOperator::emit_aggregation_diff(
173 &aggregated_row,
174 is_new,
175 previous_aggregation,
176 ));
177 }
178 }
179
180 operator.save_window_state(txn, &window_key, &window_state)?;
181
182 Ok(result)
183}
184
185pub fn apply_rolling_window(operator: &WindowOperator, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
187 let changed_at = change.changed_at;
188 let diffs = operator.apply_window_change(txn, &change, false, |op, txn, columns| {
189 op.process_insert(txn, columns, changed_at, process_rolling_group_insert)
190 })?;
191 Ok(Change::from_flow(operator.node, change.version, diffs, change.changed_at))
192}