Skip to main content

reifydb_sub_flow/operator/window/
session.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use postcard::{from_bytes, to_stdvec};
5use reifydb_core::{
6	common::WindowKind,
7	encoded::key::EncodedKey,
8	interface::change::{Change, Diff},
9	internal,
10	key::{EncodableKey, flow_node_state::FlowNodeStateKey},
11	util::encoding::keycode::serializer::KeySerializer,
12	value::column::columns::Columns,
13};
14use reifydb_runtime::hash::Hash128;
15use reifydb_type::{
16	Result,
17	error::Error,
18	value::{blob::Blob, datetime::DateTime},
19};
20
21use super::{WindowEvent, WindowLayout, WindowOperator};
22use crate::{operator::stateful::window::WindowStateful, transaction::FlowTransaction};
23
24impl WindowOperator {
25	/// Get the session gap duration in milliseconds (only valid for Session windows)
26	fn session_gap_ms(&self) -> u64 {
27		match &self.kind {
28			WindowKind::Session {
29				gap,
30			} => gap.as_millis() as u64,
31			_ => 0,
32		}
33	}
34
35	/// Create a session-tracking key that stores the current session_id for a group
36	fn create_session_tracker_key(&self, group_hash: Hash128) -> EncodedKey {
37		let mut serializer = KeySerializer::with_capacity(32);
38		serializer.extend_bytes(b"ses:");
39		serializer.extend_u128(group_hash);
40		EncodedKey::new(serializer.finish())
41	}
42
43	/// Load the current session_id for a group. Returns (session_id, last_event_time).
44	/// If no session exists yet, returns (0, 0).
45	fn load_session_tracker(&self, txn: &mut FlowTransaction, group_hash: Hash128) -> Result<(u64, u64)> {
46		let tracker_key = self.create_session_tracker_key(group_hash);
47		let state_row = self.load_state(txn, &tracker_key)?;
48
49		if state_row.is_empty() || !state_row.is_defined(0) {
50			return Ok((0, 0));
51		}
52
53		let blob = self.layout.get_blob(&state_row, 0);
54		if blob.is_empty() {
55			return Ok((0, 0));
56		}
57
58		let tracker: (u64, u64) = from_bytes(blob.as_ref()).unwrap_or((0, 0));
59		Ok(tracker)
60	}
61
62	/// Save the session tracker (session_id, last_event_time) for a group
63	fn save_session_tracker(
64		&self,
65		txn: &mut FlowTransaction,
66		group_hash: Hash128,
67		session_id: u64,
68		last_event_time: u64,
69	) -> Result<()> {
70		let tracker_key = self.create_session_tracker_key(group_hash);
71		let serialized = to_stdvec(&(session_id, last_event_time))
72			.map_err(|e| Error(Box::new(internal!("Failed to serialize session tracker: {}", e))))?;
73		let mut state_row = self.layout.allocate();
74		let blob = Blob::from(serialized);
75		self.layout.set_blob(&mut state_row, 0, &blob);
76		self.save_state(txn, &tracker_key, state_row)
77	}
78
79	/// Tick-based session expiration.
80	/// Scans all operator state, finds "win:" keys with expired sessions.
81	pub fn tick_session_expiration(&self, txn: &mut FlowTransaction, current_timestamp: u64) -> Result<Vec<Diff>> {
82		let mut result = Vec::new();
83		let gap_ms = self.session_gap_ms();
84		if gap_ms == 0 {
85			return Ok(result);
86		}
87
88		let all_state = txn.state_scan(self.node)?;
89		let prefix = FlowNodeStateKey::new(self.node, vec![]).encode();
90		let win_marker = b"win:";
91
92		let mut keys_to_clear = Vec::new();
93
94		for item in &all_state.items {
95			let full_key = &item.key;
96			if full_key.len() <= prefix.len() {
97				continue;
98			}
99			let inner = &full_key[prefix.len()..];
100			if !inner.starts_with(win_marker) {
101				continue;
102			}
103
104			let window_key = EncodedKey::new(inner);
105			let state = self.load_window_state(txn, &window_key)?;
106			if state.events.is_empty() || state.last_event_time == 0 {
107				continue;
108			}
109
110			if current_timestamp.saturating_sub(state.last_event_time) > gap_ms {
111				let changed_at = DateTime::from_nanos(current_timestamp);
112				if let Some(layout) = &state.window_layout
113					&& let Some((row, _)) = self.apply_aggregations(
114						txn,
115						&window_key,
116						layout,
117						&state.events,
118						changed_at,
119					)? {
120					result.push(Diff::remove(Columns::from_row(&row)));
121				}
122				keys_to_clear.push(window_key);
123			}
124		}
125
126		for key in &keys_to_clear {
127			let empty = self.create_state();
128			self.save_state(txn, key, empty)?;
129		}
130
131		Ok(result)
132	}
133}
134
135fn process_session_group_insert(
136	operator: &WindowOperator,
137	txn: &mut FlowTransaction,
138	columns: &Columns,
139	group_hash: Hash128,
140	changed_at: DateTime,
141) -> Result<Vec<Diff>> {
142	let mut result = Vec::new();
143	let row_count = columns.row_count();
144	if row_count == 0 {
145		return Ok(result);
146	}
147
148	let gap_ms = operator.session_gap_ms();
149	let timestamps = operator.resolve_event_timestamps(columns, row_count)?;
150	let (mut session_id, mut last_event_time) = operator.load_session_tracker(txn, group_hash)?;
151
152	for (row_idx, &event_timestamp) in timestamps.iter().enumerate() {
153		let gap_exceeded = last_event_time > 0 && (event_timestamp - last_event_time) > gap_ms;
154		if gap_exceeded {
155			if let Some(diff) = close_session(operator, txn, group_hash, session_id, changed_at)? {
156				result.push(diff);
157			}
158			session_id += 1;
159		}
160
161		if let Some(diff) = append_event_to_session(
162			operator,
163			txn,
164			columns,
165			row_idx,
166			group_hash,
167			session_id,
168			event_timestamp,
169			changed_at,
170		)? {
171			result.push(diff);
172		}
173		last_event_time = event_timestamp;
174	}
175
176	operator.save_session_tracker(txn, group_hash, session_id, last_event_time)?;
177	Ok(result)
178}
179
180#[inline]
181fn close_session(
182	operator: &WindowOperator,
183	txn: &mut FlowTransaction,
184	group_hash: Hash128,
185	session_id: u64,
186	changed_at: DateTime,
187) -> Result<Option<Diff>> {
188	let pre_window_key = operator.create_window_key(group_hash, session_id);
189	let pre_state = operator.load_window_state(txn, &pre_window_key)?;
190	if pre_state.events.is_empty() {
191		return Ok(None);
192	}
193	let Some(layout) = &pre_state.window_layout else {
194		return Ok(None);
195	};
196	let Some((pre_row, _)) =
197		operator.apply_aggregations(txn, &pre_window_key, layout, &pre_state.events, changed_at)?
198	else {
199		return Ok(None);
200	};
201	Ok(Some(Diff::remove(Columns::from_row(&pre_row))))
202}
203
204#[inline]
205#[allow(clippy::too_many_arguments)]
206fn append_event_to_session(
207	operator: &WindowOperator,
208	txn: &mut FlowTransaction,
209	columns: &Columns,
210	row_idx: usize,
211	group_hash: Hash128,
212	session_id: u64,
213	event_timestamp: u64,
214	changed_at: DateTime,
215) -> Result<Option<Diff>> {
216	let window_key = operator.create_window_key(group_hash, session_id);
217	let mut window_state = operator.load_window_state(txn, &window_key)?;
218
219	let single_row_columns = columns.extract_row(row_idx);
220	let projected = operator.project_columns(&single_row_columns);
221	let row = projected.to_single_row();
222
223	if window_state.window_layout.is_none() {
224		window_state.window_layout = Some(WindowLayout::from_row(&row));
225	}
226	let layout = window_state.layout().clone();
227
228	let previous_aggregation = if !window_state.events.is_empty() {
229		operator.apply_aggregations(txn, &window_key, &layout, &window_state.events, changed_at)?
230	} else {
231		None
232	};
233
234	let event = WindowEvent::from_row(&row, event_timestamp);
235	let event_row_number = event.row_number;
236	window_state.events.push(event);
237	window_state.event_count += 1;
238	window_state.last_event_time = event_timestamp;
239	if window_state.window_start == 0 {
240		window_state.window_start = event_timestamp;
241	}
242
243	let diff = if let Some((aggregated_row, is_new)) =
244		operator.apply_aggregations(txn, &window_key, &layout, &window_state.events, changed_at)?
245	{
246		Some(WindowOperator::emit_aggregation_diff(&aggregated_row, is_new, previous_aggregation))
247	} else {
248		None
249	};
250
251	operator.save_window_state(txn, &window_key, &window_state)?;
252	operator.store_row_index(txn, group_hash, event_row_number, session_id)?;
253	Ok(diff)
254}
255
256/// Apply changes for session windows (no time-based expiration - sessions close lazily)
257pub fn apply_session_window(operator: &WindowOperator, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
258	let changed_at = change.changed_at;
259	let diffs = operator.apply_window_change(txn, &change, false, |op, txn, columns| {
260		op.process_insert(txn, columns, changed_at, process_session_group_insert)
261	})?;
262	Ok(Change::from_flow(operator.node, change.version, diffs, change.changed_at))
263}