reifydb_sub_flow/operator/window/
session.rs1use postcard::{from_bytes, to_stdvec};
5use reifydb_core::{
6 common::WindowKind,
7 encoded::key::EncodedKey,
8 interface::change::{Change, Diff},
9 internal,
10 key::{EncodableKey, flow_node_state::FlowNodeStateKey},
11 util::encoding::keycode::serializer::KeySerializer,
12 value::column::columns::Columns,
13};
14use reifydb_runtime::hash::Hash128;
15use reifydb_type::{
16 Result,
17 error::Error,
18 value::{blob::Blob, datetime::DateTime},
19};
20
21use super::{WindowEvent, WindowLayout, WindowOperator};
22use crate::{operator::stateful::window::WindowStateful, transaction::FlowTransaction};
23
24impl WindowOperator {
25 fn session_gap_ms(&self) -> u64 {
27 match &self.kind {
28 WindowKind::Session {
29 gap,
30 } => gap.as_millis() as u64,
31 _ => 0,
32 }
33 }
34
35 fn create_session_tracker_key(&self, group_hash: Hash128) -> EncodedKey {
37 let mut serializer = KeySerializer::with_capacity(32);
38 serializer.extend_bytes(b"ses:");
39 serializer.extend_u128(group_hash);
40 EncodedKey::new(serializer.finish())
41 }
42
43 fn load_session_tracker(&self, txn: &mut FlowTransaction, group_hash: Hash128) -> Result<(u64, u64)> {
46 let tracker_key = self.create_session_tracker_key(group_hash);
47 let state_row = self.load_state(txn, &tracker_key)?;
48
49 if state_row.is_empty() || !state_row.is_defined(0) {
50 return Ok((0, 0));
51 }
52
53 let blob = self.layout.get_blob(&state_row, 0);
54 if blob.is_empty() {
55 return Ok((0, 0));
56 }
57
58 let tracker: (u64, u64) = from_bytes(blob.as_ref()).unwrap_or((0, 0));
59 Ok(tracker)
60 }
61
62 fn save_session_tracker(
64 &self,
65 txn: &mut FlowTransaction,
66 group_hash: Hash128,
67 session_id: u64,
68 last_event_time: u64,
69 ) -> Result<()> {
70 let tracker_key = self.create_session_tracker_key(group_hash);
71 let serialized = to_stdvec(&(session_id, last_event_time))
72 .map_err(|e| Error(Box::new(internal!("Failed to serialize session tracker: {}", e))))?;
73 let mut state_row = self.layout.allocate();
74 let blob = Blob::from(serialized);
75 self.layout.set_blob(&mut state_row, 0, &blob);
76 self.save_state(txn, &tracker_key, state_row)
77 }
78
79 pub fn tick_session_expiration(&self, txn: &mut FlowTransaction, current_timestamp: u64) -> Result<Vec<Diff>> {
82 let mut result = Vec::new();
83 let gap_ms = self.session_gap_ms();
84 if gap_ms == 0 {
85 return Ok(result);
86 }
87
88 let all_state = txn.state_scan(self.node)?;
89 let prefix = FlowNodeStateKey::new(self.node, vec![]).encode();
90 let win_marker = b"win:";
91
92 let mut keys_to_clear = Vec::new();
93
94 for item in &all_state.items {
95 let full_key = &item.key;
96 if full_key.len() <= prefix.len() {
97 continue;
98 }
99 let inner = &full_key[prefix.len()..];
100 if !inner.starts_with(win_marker) {
101 continue;
102 }
103
104 let window_key = EncodedKey::new(inner);
105 let state = self.load_window_state(txn, &window_key)?;
106 if state.events.is_empty() || state.last_event_time == 0 {
107 continue;
108 }
109
110 if current_timestamp.saturating_sub(state.last_event_time) > gap_ms {
111 let changed_at = DateTime::from_nanos(current_timestamp);
112 if let Some(layout) = &state.window_layout
113 && let Some((row, _)) = self.apply_aggregations(
114 txn,
115 &window_key,
116 layout,
117 &state.events,
118 changed_at,
119 )? {
120 result.push(Diff::remove(Columns::from_row(&row)));
121 }
122 keys_to_clear.push(window_key);
123 }
124 }
125
126 for key in &keys_to_clear {
127 let empty = self.create_state();
128 self.save_state(txn, key, empty)?;
129 }
130
131 Ok(result)
132 }
133}
134
135fn process_session_group_insert(
136 operator: &WindowOperator,
137 txn: &mut FlowTransaction,
138 columns: &Columns,
139 group_hash: Hash128,
140 changed_at: DateTime,
141) -> Result<Vec<Diff>> {
142 let mut result = Vec::new();
143 let row_count = columns.row_count();
144 if row_count == 0 {
145 return Ok(result);
146 }
147
148 let gap_ms = operator.session_gap_ms();
149 let timestamps = operator.resolve_event_timestamps(columns, row_count)?;
150 let (mut session_id, mut last_event_time) = operator.load_session_tracker(txn, group_hash)?;
151
152 for (row_idx, &event_timestamp) in timestamps.iter().enumerate() {
153 let gap_exceeded = last_event_time > 0 && (event_timestamp - last_event_time) > gap_ms;
154 if gap_exceeded {
155 if let Some(diff) = close_session(operator, txn, group_hash, session_id, changed_at)? {
156 result.push(diff);
157 }
158 session_id += 1;
159 }
160
161 if let Some(diff) = append_event_to_session(
162 operator,
163 txn,
164 columns,
165 row_idx,
166 group_hash,
167 session_id,
168 event_timestamp,
169 changed_at,
170 )? {
171 result.push(diff);
172 }
173 last_event_time = event_timestamp;
174 }
175
176 operator.save_session_tracker(txn, group_hash, session_id, last_event_time)?;
177 Ok(result)
178}
179
180#[inline]
181fn close_session(
182 operator: &WindowOperator,
183 txn: &mut FlowTransaction,
184 group_hash: Hash128,
185 session_id: u64,
186 changed_at: DateTime,
187) -> Result<Option<Diff>> {
188 let pre_window_key = operator.create_window_key(group_hash, session_id);
189 let pre_state = operator.load_window_state(txn, &pre_window_key)?;
190 if pre_state.events.is_empty() {
191 return Ok(None);
192 }
193 let Some(layout) = &pre_state.window_layout else {
194 return Ok(None);
195 };
196 let Some((pre_row, _)) =
197 operator.apply_aggregations(txn, &pre_window_key, layout, &pre_state.events, changed_at)?
198 else {
199 return Ok(None);
200 };
201 Ok(Some(Diff::remove(Columns::from_row(&pre_row))))
202}
203
204#[inline]
205#[allow(clippy::too_many_arguments)]
206fn append_event_to_session(
207 operator: &WindowOperator,
208 txn: &mut FlowTransaction,
209 columns: &Columns,
210 row_idx: usize,
211 group_hash: Hash128,
212 session_id: u64,
213 event_timestamp: u64,
214 changed_at: DateTime,
215) -> Result<Option<Diff>> {
216 let window_key = operator.create_window_key(group_hash, session_id);
217 let mut window_state = operator.load_window_state(txn, &window_key)?;
218
219 let single_row_columns = columns.extract_row(row_idx);
220 let projected = operator.project_columns(&single_row_columns);
221 let row = projected.to_single_row();
222
223 if window_state.window_layout.is_none() {
224 window_state.window_layout = Some(WindowLayout::from_row(&row));
225 }
226 let layout = window_state.layout().clone();
227
228 let previous_aggregation = if !window_state.events.is_empty() {
229 operator.apply_aggregations(txn, &window_key, &layout, &window_state.events, changed_at)?
230 } else {
231 None
232 };
233
234 let event = WindowEvent::from_row(&row, event_timestamp);
235 let event_row_number = event.row_number;
236 window_state.events.push(event);
237 window_state.event_count += 1;
238 window_state.last_event_time = event_timestamp;
239 if window_state.window_start == 0 {
240 window_state.window_start = event_timestamp;
241 }
242
243 let diff = if let Some((aggregated_row, is_new)) =
244 operator.apply_aggregations(txn, &window_key, &layout, &window_state.events, changed_at)?
245 {
246 Some(WindowOperator::emit_aggregation_diff(&aggregated_row, is_new, previous_aggregation))
247 } else {
248 None
249 };
250
251 operator.save_window_state(txn, &window_key, &window_state)?;
252 operator.store_row_index(txn, group_hash, event_row_number, session_id)?;
253 Ok(diff)
254}
255
256pub fn apply_session_window(operator: &WindowOperator, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
258 let changed_at = change.changed_at;
259 let diffs = operator.apply_window_change(txn, &change, false, |op, txn, columns| {
260 op.process_insert(txn, columns, changed_at, process_session_group_insert)
261 })?;
262 Ok(Change::from_flow(operator.node, change.version, diffs, change.changed_at))
263}