Skip to main content

reifydb_sub_flow/operator/window/
session.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use postcard::{from_bytes, to_stdvec};
5use reifydb_core::{
6	common::WindowKind,
7	encoded::key::EncodedKey,
8	interface::change::{Change, Diff},
9	internal,
10	key::{EncodableKey, flow_node_state::FlowNodeStateKey},
11	util::encoding::keycode::serializer::KeySerializer,
12	value::column::columns::Columns,
13};
14use reifydb_runtime::hash::Hash128;
15use reifydb_type::{Result, error::Error, value::blob::Blob};
16
17use super::{WindowEvent, WindowLayout, WindowOperator};
18use crate::{operator::stateful::window::WindowStateful, transaction::FlowTransaction};
19
20impl WindowOperator {
21	/// Get the session gap duration in milliseconds (only valid for Session windows)
22	fn session_gap_ms(&self) -> u64 {
23		match &self.kind {
24			WindowKind::Session {
25				gap,
26			} => gap.as_millis() as u64,
27			_ => 0,
28		}
29	}
30
31	/// Create a session-tracking key that stores the current session_id for a group
32	fn create_session_tracker_key(&self, group_hash: Hash128) -> EncodedKey {
33		let mut serializer = KeySerializer::with_capacity(32);
34		serializer.extend_bytes(b"ses:");
35		serializer.extend_u128(group_hash);
36		EncodedKey::new(serializer.finish())
37	}
38
39	/// Load the current session_id for a group. Returns (session_id, last_event_time).
40	/// If no session exists yet, returns (0, 0).
41	fn load_session_tracker(&self, txn: &mut FlowTransaction, group_hash: Hash128) -> Result<(u64, u64)> {
42		let tracker_key = self.create_session_tracker_key(group_hash);
43		let state_row = self.load_state(txn, &tracker_key)?;
44
45		if state_row.is_empty() || !state_row.is_defined(0) {
46			return Ok((0, 0));
47		}
48
49		let blob = self.layout.get_blob(&state_row, 0);
50		if blob.is_empty() {
51			return Ok((0, 0));
52		}
53
54		let tracker: (u64, u64) = from_bytes(blob.as_ref()).unwrap_or((0, 0));
55		Ok(tracker)
56	}
57
58	/// Save the session tracker (session_id, last_event_time) for a group
59	fn save_session_tracker(
60		&self,
61		txn: &mut FlowTransaction,
62		group_hash: Hash128,
63		session_id: u64,
64		last_event_time: u64,
65	) -> Result<()> {
66		let tracker_key = self.create_session_tracker_key(group_hash);
67		let serialized = to_stdvec(&(session_id, last_event_time))
68			.map_err(|e| Error(internal!("Failed to serialize session tracker: {}", e)))?;
69		let mut state_row = self.layout.allocate();
70		let blob = Blob::from(serialized);
71		self.layout.set_blob(&mut state_row, 0, &blob);
72		self.save_state(txn, &tracker_key, state_row)
73	}
74
75	/// Tick-based session expiration.
76	/// Scans all operator state, finds "win:" keys with expired sessions.
77	pub fn tick_session_expiration(&self, txn: &mut FlowTransaction, current_timestamp: u64) -> Result<Vec<Diff>> {
78		let mut result = Vec::new();
79		let gap_ms = self.session_gap_ms();
80		if gap_ms == 0 {
81			return Ok(result);
82		}
83
84		let all_state = txn.state_scan(self.node)?;
85		let prefix = FlowNodeStateKey::new(self.node, vec![]).encode();
86		let win_marker = b"win:";
87
88		let mut keys_to_clear = Vec::new();
89
90		for item in &all_state.items {
91			let full_key = &item.key;
92			if full_key.len() <= prefix.len() {
93				continue;
94			}
95			let inner = &full_key[prefix.len()..];
96			if !inner.starts_with(win_marker) {
97				continue;
98			}
99
100			let window_key = EncodedKey::new(inner);
101			let state = self.load_window_state(txn, &window_key)?;
102			if state.events.is_empty() || state.last_event_time == 0 {
103				continue;
104			}
105
106			if current_timestamp.saturating_sub(state.last_event_time) > gap_ms {
107				if let Some(layout) = &state.window_layout {
108					if let Some((row, _)) =
109						self.apply_aggregations(txn, &window_key, layout, &state.events)?
110					{
111						result.push(Diff::Remove {
112							pre: Columns::from_row(&row),
113						});
114					}
115				}
116				keys_to_clear.push(window_key);
117			}
118		}
119
120		for key in &keys_to_clear {
121			let empty = self.create_state();
122			self.save_state(txn, key, empty)?;
123		}
124
125		Ok(result)
126	}
127}
128
129/// Process inserts for a single group in session windows
130fn process_session_group_insert(
131	operator: &WindowOperator,
132	txn: &mut FlowTransaction,
133	columns: &Columns,
134	group_hash: Hash128,
135) -> Result<Vec<Diff>> {
136	let mut result = Vec::new();
137	let row_count = columns.row_count();
138	if row_count == 0 {
139		return Ok(result);
140	}
141
142	let gap_ms = operator.session_gap_ms();
143	let timestamps = operator.resolve_event_timestamps(columns, row_count)?;
144
145	let (mut session_id, mut last_event_time) = operator.load_session_tracker(txn, group_hash)?;
146
147	for row_idx in 0..row_count {
148		let event_timestamp = timestamps[row_idx];
149
150		// Check if the gap has been exceeded → close old session, open new
151		let gap_exceeded = last_event_time > 0 && (event_timestamp - last_event_time) > gap_ms;
152
153		if gap_exceeded {
154			// Emit Remove for the old session before starting a new one
155			let pre_window_key = operator.create_window_key(group_hash, session_id);
156			let pre_state = operator.load_window_state(txn, &pre_window_key)?;
157			if !pre_state.events.is_empty() {
158				if let Some(layout) = &pre_state.window_layout {
159					if let Some((pre_row, _)) = operator.apply_aggregations(
160						txn,
161						&pre_window_key,
162						layout,
163						&pre_state.events,
164					)? {
165						result.push(Diff::Remove {
166							pre: Columns::from_row(&pre_row),
167						});
168					}
169				}
170			}
171			session_id += 1;
172		}
173
174		let window_key = operator.create_window_key(group_hash, session_id);
175		let mut window_state = operator.load_window_state(txn, &window_key)?;
176
177		let single_row_columns = columns.extract_row(row_idx);
178		let projected = operator.project_columns(&single_row_columns);
179		let row = projected.to_single_row();
180
181		if window_state.window_layout.is_none() {
182			window_state.window_layout = Some(WindowLayout::from_row(&row));
183		}
184		let layout = window_state.layout().clone();
185
186		let previous_aggregation = if !window_state.events.is_empty() {
187			operator.apply_aggregations(txn, &window_key, &layout, &window_state.events)?
188		} else {
189			None
190		};
191
192		let event = WindowEvent::from_row(&row, event_timestamp);
193		let event_row_number = event.row_number;
194		window_state.events.push(event);
195		window_state.event_count += 1;
196		window_state.last_event_time = event_timestamp;
197
198		if window_state.window_start == 0 {
199			window_state.window_start = event_timestamp;
200		}
201
202		if let Some((aggregated_row, is_new)) =
203			operator.apply_aggregations(txn, &window_key, &layout, &window_state.events)?
204		{
205			result.push(WindowOperator::emit_aggregation_diff(
206				&aggregated_row,
207				is_new,
208				previous_aggregation,
209			));
210		}
211
212		operator.save_window_state(txn, &window_key, &window_state)?;
213		operator.store_row_index(txn, group_hash, event_row_number, session_id)?;
214		last_event_time = event_timestamp;
215	}
216
217	operator.save_session_tracker(txn, group_hash, session_id, last_event_time)?;
218
219	Ok(result)
220}
221
222/// Apply changes for session windows (no time-based expiration — sessions close lazily)
223pub fn apply_session_window(operator: &WindowOperator, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
224	let diffs = operator.apply_window_change(txn, &change, false, |op, txn, columns| {
225		op.process_insert(txn, columns, process_session_group_insert)
226	})?;
227	Ok(Change::from_flow(operator.node, change.version, diffs))
228}