reifydb_sub_flow/operator/window/
session.rs1use postcard::{from_bytes, to_stdvec};
5use reifydb_core::{
6 common::WindowKind,
7 encoded::key::EncodedKey,
8 interface::change::{Change, Diff},
9 internal,
10 key::{EncodableKey, flow_node_state::FlowNodeStateKey},
11 util::encoding::keycode::serializer::KeySerializer,
12 value::column::columns::Columns,
13};
14use reifydb_runtime::hash::Hash128;
15use reifydb_type::{Result, error::Error, value::blob::Blob};
16
17use super::{WindowEvent, WindowLayout, WindowOperator};
18use crate::{operator::stateful::window::WindowStateful, transaction::FlowTransaction};
19
20impl WindowOperator {
21 fn session_gap_ms(&self) -> u64 {
23 match &self.kind {
24 WindowKind::Session {
25 gap,
26 } => gap.as_millis() as u64,
27 _ => 0,
28 }
29 }
30
31 fn create_session_tracker_key(&self, group_hash: Hash128) -> EncodedKey {
33 let mut serializer = KeySerializer::with_capacity(32);
34 serializer.extend_bytes(b"ses:");
35 serializer.extend_u128(group_hash);
36 EncodedKey::new(serializer.finish())
37 }
38
39 fn load_session_tracker(&self, txn: &mut FlowTransaction, group_hash: Hash128) -> Result<(u64, u64)> {
42 let tracker_key = self.create_session_tracker_key(group_hash);
43 let state_row = self.load_state(txn, &tracker_key)?;
44
45 if state_row.is_empty() || !state_row.is_defined(0) {
46 return Ok((0, 0));
47 }
48
49 let blob = self.layout.get_blob(&state_row, 0);
50 if blob.is_empty() {
51 return Ok((0, 0));
52 }
53
54 let tracker: (u64, u64) = from_bytes(blob.as_ref()).unwrap_or((0, 0));
55 Ok(tracker)
56 }
57
58 fn save_session_tracker(
60 &self,
61 txn: &mut FlowTransaction,
62 group_hash: Hash128,
63 session_id: u64,
64 last_event_time: u64,
65 ) -> Result<()> {
66 let tracker_key = self.create_session_tracker_key(group_hash);
67 let serialized = to_stdvec(&(session_id, last_event_time))
68 .map_err(|e| Error(internal!("Failed to serialize session tracker: {}", e)))?;
69 let mut state_row = self.layout.allocate();
70 let blob = Blob::from(serialized);
71 self.layout.set_blob(&mut state_row, 0, &blob);
72 self.save_state(txn, &tracker_key, state_row)
73 }
74
75 pub fn tick_session_expiration(&self, txn: &mut FlowTransaction, current_timestamp: u64) -> Result<Vec<Diff>> {
78 let mut result = Vec::new();
79 let gap_ms = self.session_gap_ms();
80 if gap_ms == 0 {
81 return Ok(result);
82 }
83
84 let all_state = txn.state_scan(self.node)?;
85 let prefix = FlowNodeStateKey::new(self.node, vec![]).encode();
86 let win_marker = b"win:";
87
88 let mut keys_to_clear = Vec::new();
89
90 for item in &all_state.items {
91 let full_key = &item.key;
92 if full_key.len() <= prefix.len() {
93 continue;
94 }
95 let inner = &full_key[prefix.len()..];
96 if !inner.starts_with(win_marker) {
97 continue;
98 }
99
100 let window_key = EncodedKey::new(inner);
101 let state = self.load_window_state(txn, &window_key)?;
102 if state.events.is_empty() || state.last_event_time == 0 {
103 continue;
104 }
105
106 if current_timestamp.saturating_sub(state.last_event_time) > gap_ms {
107 if let Some(layout) = &state.window_layout {
108 if let Some((row, _)) =
109 self.apply_aggregations(txn, &window_key, layout, &state.events)?
110 {
111 result.push(Diff::Remove {
112 pre: Columns::from_row(&row),
113 });
114 }
115 }
116 keys_to_clear.push(window_key);
117 }
118 }
119
120 for key in &keys_to_clear {
121 let empty = self.create_state();
122 self.save_state(txn, key, empty)?;
123 }
124
125 Ok(result)
126 }
127}
128
129fn process_session_group_insert(
131 operator: &WindowOperator,
132 txn: &mut FlowTransaction,
133 columns: &Columns,
134 group_hash: Hash128,
135) -> Result<Vec<Diff>> {
136 let mut result = Vec::new();
137 let row_count = columns.row_count();
138 if row_count == 0 {
139 return Ok(result);
140 }
141
142 let gap_ms = operator.session_gap_ms();
143 let timestamps = operator.resolve_event_timestamps(columns, row_count)?;
144
145 let (mut session_id, mut last_event_time) = operator.load_session_tracker(txn, group_hash)?;
146
147 for row_idx in 0..row_count {
148 let event_timestamp = timestamps[row_idx];
149
150 let gap_exceeded = last_event_time > 0 && (event_timestamp - last_event_time) > gap_ms;
152
153 if gap_exceeded {
154 let pre_window_key = operator.create_window_key(group_hash, session_id);
156 let pre_state = operator.load_window_state(txn, &pre_window_key)?;
157 if !pre_state.events.is_empty() {
158 if let Some(layout) = &pre_state.window_layout {
159 if let Some((pre_row, _)) = operator.apply_aggregations(
160 txn,
161 &pre_window_key,
162 layout,
163 &pre_state.events,
164 )? {
165 result.push(Diff::Remove {
166 pre: Columns::from_row(&pre_row),
167 });
168 }
169 }
170 }
171 session_id += 1;
172 }
173
174 let window_key = operator.create_window_key(group_hash, session_id);
175 let mut window_state = operator.load_window_state(txn, &window_key)?;
176
177 let single_row_columns = columns.extract_row(row_idx);
178 let projected = operator.project_columns(&single_row_columns);
179 let row = projected.to_single_row();
180
181 if window_state.window_layout.is_none() {
182 window_state.window_layout = Some(WindowLayout::from_row(&row));
183 }
184 let layout = window_state.layout().clone();
185
186 let previous_aggregation = if !window_state.events.is_empty() {
187 operator.apply_aggregations(txn, &window_key, &layout, &window_state.events)?
188 } else {
189 None
190 };
191
192 let event = WindowEvent::from_row(&row, event_timestamp);
193 let event_row_number = event.row_number;
194 window_state.events.push(event);
195 window_state.event_count += 1;
196 window_state.last_event_time = event_timestamp;
197
198 if window_state.window_start == 0 {
199 window_state.window_start = event_timestamp;
200 }
201
202 if let Some((aggregated_row, is_new)) =
203 operator.apply_aggregations(txn, &window_key, &layout, &window_state.events)?
204 {
205 result.push(WindowOperator::emit_aggregation_diff(
206 &aggregated_row,
207 is_new,
208 previous_aggregation,
209 ));
210 }
211
212 operator.save_window_state(txn, &window_key, &window_state)?;
213 operator.store_row_index(txn, group_hash, event_row_number, session_id)?;
214 last_event_time = event_timestamp;
215 }
216
217 operator.save_session_tracker(txn, group_hash, session_id, last_event_time)?;
218
219 Ok(result)
220}
221
222pub fn apply_session_window(operator: &WindowOperator, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
224 let diffs = operator.apply_window_change(txn, &change, false, |op, txn, columns| {
225 op.process_insert(txn, columns, process_session_group_insert)
226 })?;
227 Ok(Change::from_flow(operator.node, change.version, diffs))
228}