Skip to main content

reifydb_sub_flow/operator/window/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::Arc;
5
6use reifydb_core::{
7	common::{CommitVersion, WindowKind, WindowSize},
8	error::diagnostic::flow::{flow_window_timestamp_column_not_found, flow_window_timestamp_column_type_mismatch},
9	interface::catalog::flow::FlowNodeId,
10	internal,
11	key::{EncodableKey, flow_node_state::FlowNodeStateKey},
12};
13use serde::{Deserialize, Serialize};
14
15use crate::{
16	operator::{Operator, Operators},
17	transaction::FlowTransaction,
18};
19
20pub mod rolling;
21pub mod session;
22pub mod sliding;
23pub mod tumbling;
24
25use rolling::apply_rolling_window;
26use session::apply_session_window;
27use sliding::apply_sliding_window;
28use tumbling::apply_tumbling_window;
29
30static EMPTY_PARAMS: Params = Params::None;
31
32use std::{ops, sync::LazyLock, time::Duration};
33
34use postcard::{from_bytes, to_stdvec};
35use reifydb_core::{
36	encoded::{
37		key::{EncodedKey, EncodedKeyRange},
38		row::EncodedRow,
39		shape::{RowShape, RowShapeField},
40	},
41	interface::change::{Change, Diff},
42	row::Row,
43	util::encoding::keycode::serializer::KeySerializer,
44	value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns},
45};
46use reifydb_engine::{
47	expression::{
48		compile::{CompiledExpr, compile_expression},
49		context::{CompileContext, EvalContext},
50	},
51	vm::stack::SymbolTable,
52};
53use reifydb_routine::routine::registry::Routines;
54use reifydb_rql::expression::{
55	Expression,
56	name::{collect_all_column_names, display_label},
57};
58use reifydb_runtime::{
59	context::RuntimeContext,
60	hash::{Hash128, xxh3_128},
61};
62use reifydb_type::{
63	Result,
64	error::Error,
65	fragment::Fragment,
66	params::Params,
67	util::cowvec::CowVec,
68	value::{Value, blob::Blob, datetime::DateTime, identity::IdentityId, row_number::RowNumber, r#type::Type},
69};
70
71use crate::operator::stateful::{raw::RawStatefulOperator, row::RowNumberProvider, window::WindowStateful};
72
73#[inline]
74fn build_aggregation_shape(names: &[String], types: &[Type]) -> RowShape {
75	let fields: Vec<RowShapeField> = names
76		.iter()
77		.zip(types.iter())
78		.map(|(name, ty)| RowShapeField::unconstrained(name.clone(), ty.clone()))
79		.collect();
80	RowShape::new(fields)
81}
82
83static EMPTY_SYMBOL_TABLE: LazyLock<SymbolTable> = LazyLock::new(SymbolTable::new);
84
85/// RowShape layout shared across all events in a window (stored once, not per event)
86#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct WindowLayout {
88	pub names: Vec<String>,
89	pub types: Vec<Type>,
90}
91
92impl WindowLayout {
93	pub fn from_row(row: &Row) -> Self {
94		Self {
95			names: row.shape.field_names().map(|s| s.to_string()).collect(),
96			types: row.shape.fields().iter().map(|f| f.constraint.get_type()).collect(),
97		}
98	}
99
100	pub fn to_shape(&self) -> RowShape {
101		let fields: Vec<RowShapeField> = self
102			.names
103			.iter()
104			.zip(self.types.iter())
105			.map(|(name, ty)| RowShapeField::unconstrained(name.clone(), ty.clone()))
106			.collect();
107		RowShape::new(fields)
108	}
109}
110
111/// A single event stored within a window
112#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct WindowEvent {
114	pub row_number: RowNumber,
115	pub timestamp: u64,
116	#[serde(with = "serde_bytes")]
117	pub encoded_bytes: Vec<u8>,
118}
119
120impl WindowEvent {
121	pub fn from_row(row: &Row, timestamp: u64) -> Self {
122		Self {
123			row_number: row.number,
124			timestamp,
125			encoded_bytes: row.encoded.as_slice().to_vec(),
126		}
127	}
128
129	pub fn to_row(&self, layout: &WindowLayout) -> Row {
130		let shape = layout.to_shape();
131		let encoded = EncodedRow(CowVec::new(self.encoded_bytes.clone()));
132		Row {
133			number: self.row_number,
134			encoded,
135			shape,
136		}
137	}
138}
139
140/// State for a single window
141#[derive(Debug, Clone, Serialize, Deserialize, Default)]
142pub struct WindowState {
143	/// All events in this window (stored in insertion order)
144	pub events: Vec<WindowEvent>,
145	/// RowShape layout shared by all events (set on first event)
146	pub window_layout: Option<WindowLayout>,
147	/// Window creation timestamp
148	pub window_start: u64,
149	/// Count of events in window (for count-based windows)
150	pub event_count: u64,
151	/// Timestamp of last event (for session windows)
152	pub last_event_time: u64,
153}
154
155impl WindowState {
156	/// Get the layout, panics if not set (should always be set after first event)
157	pub fn layout(&self) -> &WindowLayout {
158		self.window_layout.as_ref().expect("WindowState layout must be set before accessing")
159	}
160}
161
162/// Configuration for constructing a WindowOperator.
163pub struct WindowConfig {
164	pub parent: Arc<Operators>,
165	pub node: FlowNodeId,
166	pub kind: WindowKind,
167	pub group_by: Vec<Expression>,
168	pub aggregations: Vec<Expression>,
169	pub ts: Option<String>,
170	pub runtime_context: RuntimeContext,
171	pub routines: Routines,
172}
173
174/// The main window operator
175pub struct WindowOperator {
176	pub parent: Arc<Operators>,
177	pub node: FlowNodeId,
178	pub kind: WindowKind,
179	pub group_by: Vec<Expression>,
180	pub aggregations: Vec<Expression>,
181	pub ts: Option<String>,
182	pub compiled_group_by: Vec<CompiledExpr>,
183	pub compiled_aggregations: Vec<CompiledExpr>,
184	pub layout: RowShape,
185	pub routines: Routines,
186	pub row_number_provider: RowNumberProvider,
187	pub runtime_context: RuntimeContext,
188	/// Column names needed by group_by + aggregations expressions.
189	/// When empty, no projection is applied (all columns stored).
190	pub projected_columns: Vec<String>,
191}
192
193impl WindowOperator {
194	pub fn new(config: WindowConfig) -> Self {
195		let symbols = SymbolTable::new();
196		let compile_ctx = CompileContext {
197			symbols: &symbols,
198		};
199
200		// Compile group_by expressions
201		let compiled_group_by: Vec<CompiledExpr> = config
202			.group_by
203			.iter()
204			.map(|e| compile_expression(&compile_ctx, e).expect("Failed to compile group_by expression"))
205			.collect();
206
207		// Compile aggregation expressions
208		let compiled_aggregations: Vec<CompiledExpr> = config
209			.aggregations
210			.iter()
211			.map(|e| compile_expression(&compile_ctx, e).expect("Failed to compile aggregation expression"))
212			.collect();
213
214		let mut needed = collect_all_column_names(&config.group_by);
215		needed.extend(collect_all_column_names(&config.aggregations));
216		let mut projected_columns: Vec<String> = needed.into_iter().collect();
217		projected_columns.sort();
218
219		Self {
220			parent: config.parent,
221			node: config.node,
222			kind: config.kind,
223			group_by: config.group_by,
224			aggregations: config.aggregations,
225			ts: config.ts,
226			compiled_group_by,
227			compiled_aggregations,
228			layout: RowShape::testing(&[Type::Blob]),
229			routines: config.routines,
230			row_number_provider: RowNumberProvider::new(config.node),
231			runtime_context: config.runtime_context,
232			projected_columns,
233		}
234	}
235
236	/// Get the current timestamp in milliseconds
237	pub fn current_timestamp(&self) -> u64 {
238		self.runtime_context.clock.now_millis()
239	}
240
241	/// Project a single-row Columns down to only the columns needed by window expressions.
242	pub fn project_columns(&self, columns: &Columns) -> Columns {
243		if self.projected_columns.is_empty() {
244			return columns.clone();
245		}
246		columns.project_by_names(&self.projected_columns)
247	}
248
249	/// Whether this is a count-based window
250	pub fn is_count_based(&self) -> bool {
251		self.kind.size().is_some_and(|m| m.is_count())
252	}
253
254	/// Get the window size as duration (if time-based)
255	pub fn size_duration(&self) -> Option<Duration> {
256		self.kind.size().and_then(|m| m.as_duration())
257	}
258
259	/// Get the window size as count (if count-based)
260	pub fn size_count(&self) -> Option<u64> {
261		self.kind.size().and_then(|m| m.as_count())
262	}
263
264	fn eval_session(&self, is_aggregate: bool) -> EvalContext<'_> {
265		EvalContext {
266			params: &EMPTY_PARAMS,
267			symbols: &EMPTY_SYMBOL_TABLE,
268			routines: &self.routines,
269			runtime_context: &self.runtime_context,
270			arena: None,
271			identity: IdentityId::root(),
272			is_aggregate_context: is_aggregate,
273			columns: Columns::empty(),
274			row_count: 1,
275			target: None,
276			take: None,
277		}
278	}
279
280	/// Compute group keys for all rows in Columns
281	pub fn compute_group_keys(&self, columns: &Columns) -> Result<Vec<Hash128>> {
282		let row_count = columns.row_count();
283		if row_count == 0 {
284			return Ok(Vec::new());
285		}
286
287		if self.compiled_group_by.is_empty() {
288			return Ok(vec![Hash128::from(0u128); row_count]);
289		}
290
291		let session = self.eval_session(false);
292		let exec_ctx = session.with_eval(columns.clone(), row_count);
293
294		let mut group_columns: Vec<ColumnWithName> = Vec::new();
295		for compiled_expr in &self.compiled_group_by {
296			let col = compiled_expr.execute(&exec_ctx)?;
297			group_columns.push(col);
298		}
299
300		let mut hashes = Vec::with_capacity(row_count);
301		for row_idx in 0..row_count {
302			let mut data = Vec::new();
303			for col in &group_columns {
304				let value = col.data().get_value(row_idx);
305				let value_str = value.to_string();
306				data.extend_from_slice(value_str.as_bytes());
307			}
308			hashes.push(xxh3_128(&data));
309		}
310
311		Ok(hashes)
312	}
313
314	/// Resolve event timestamps for all rows.
315	/// When `ts` is configured, reads from the named DateTime column.
316	/// Otherwise falls back to processing time (current clock).
317	pub fn resolve_event_timestamps(&self, columns: &Columns, row_count: usize) -> Result<Vec<u64>> {
318		if row_count == 0 {
319			return Ok(Vec::new());
320		}
321		match &self.ts {
322			Some(ts_col) => {
323				let col = columns.column(ts_col).ok_or_else(|| {
324					Error(Box::new(flow_window_timestamp_column_not_found(ts_col)))
325				})?;
326				let mut timestamps = Vec::with_capacity(row_count);
327				for i in 0..row_count {
328					match col.data().get_value(i) {
329						Value::DateTime(dt) => timestamps.push(dt.timestamp_millis() as u64),
330						other => {
331							return Err(Error(Box::new(
332								flow_window_timestamp_column_type_mismatch(
333									ts_col,
334									other.get_type(),
335								),
336							)));
337						}
338					}
339				}
340				Ok(timestamps)
341			}
342			None => {
343				let now = self.current_timestamp();
344				Ok(vec![now; row_count])
345			}
346		}
347	}
348
349	/// Create a window key for storage
350	pub fn create_window_key(&self, group_hash: Hash128, window_id: u64) -> EncodedKey {
351		let mut serializer = KeySerializer::with_capacity(32);
352		serializer.extend_bytes(b"win:");
353		serializer.extend_u128(group_hash);
354		serializer.extend_u64(window_id);
355		EncodedKey::new(serializer.finish())
356	}
357
358	/// Create a row index key for mapping row_number → window_id
359	fn create_row_index_key(&self, group_hash: Hash128, row_number: RowNumber) -> EncodedKey {
360		let mut serializer = KeySerializer::with_capacity(32);
361		serializer.extend_bytes(b"idx:");
362		serializer.extend_u128(group_hash);
363		serializer.extend_u64(row_number.0);
364		EncodedKey::new(serializer.finish())
365	}
366
367	/// Store a row_number → window_ids mapping.
368	/// Appends window_id to the existing list (supports sliding windows with multiple windows per event).
369	pub fn store_row_index(
370		&self,
371		txn: &mut FlowTransaction,
372		group_hash: Hash128,
373		row_number: RowNumber,
374		window_id: u64,
375	) -> Result<()> {
376		let index_key = self.create_row_index_key(group_hash, row_number);
377		let mut window_ids = self.lookup_row_index(txn, group_hash, row_number)?;
378		if !window_ids.contains(&window_id) {
379			window_ids.push(window_id);
380		}
381		let serialized = to_stdvec(&window_ids)
382			.map_err(|e| Error(Box::new(internal!("Failed to serialize row index: {}", e))))?;
383		let mut state_row = self.layout.allocate();
384		let blob = Blob::from(serialized);
385		self.layout.set_blob(&mut state_row, 0, &blob);
386		self.save_state(txn, &index_key, state_row)
387	}
388
389	/// Look up all window_ids for a given row_number
390	fn lookup_row_index(
391		&self,
392		txn: &mut FlowTransaction,
393		group_hash: Hash128,
394		row_number: RowNumber,
395	) -> Result<Vec<u64>> {
396		let index_key = self.create_row_index_key(group_hash, row_number);
397		let state_row = self.load_state(txn, &index_key)?;
398		if state_row.is_empty() || !state_row.is_defined(0) {
399			return Ok(Vec::new());
400		}
401		let blob = self.layout.get_blob(&state_row, 0);
402		if blob.is_empty() {
403			return Ok(Vec::new());
404		}
405		let window_ids: Vec<u64> = from_bytes(blob.as_ref())
406			.map_err(|e| Error(Box::new(internal!("Failed to deserialize row index: {}", e))))?;
407		Ok(window_ids)
408	}
409
410	/// Replace an event across all its windows in-place (for UPDATE handling).
411	/// For sliding windows, an event may exist in multiple windows - all are updated.
412	fn replace_event_in_windows(
413		&self,
414		txn: &mut FlowTransaction,
415		group_hash: Hash128,
416		row_number: RowNumber,
417		post_row: &Row,
418		post_timestamp: u64,
419	) -> Result<Vec<Diff>> {
420		let window_ids = self.lookup_row_index(txn, group_hash, row_number)?;
421		if window_ids.is_empty() {
422			return Ok(Vec::new());
423		}
424
425		let mut result = Vec::new();
426
427		for window_id in &window_ids {
428			let window_key = self.create_window_key(group_hash, *window_id);
429			let mut window_state = self.load_window_state(txn, &window_key)?;
430
431			let event_idx = window_state.events.iter().position(|e| e.row_number == row_number);
432			if let Some(idx) = event_idx {
433				let layout = match &window_state.window_layout {
434					Some(l) => l.clone(),
435					None => continue,
436				};
437
438				let changed_at = DateTime::from_nanos(post_timestamp);
439				let pre_aggregation = self.apply_aggregations(
440					txn,
441					&window_key,
442					&layout,
443					&window_state.events,
444					changed_at,
445				)?;
446
447				window_state.events[idx] = WindowEvent::from_row(post_row, post_timestamp);
448
449				let post_aggregation = self.apply_aggregations(
450					txn,
451					&window_key,
452					&layout,
453					&window_state.events,
454					changed_at,
455				)?;
456
457				self.save_window_state(txn, &window_key, &window_state)?;
458
459				if let (Some((pre_row, _)), Some((post_row, _))) = (pre_aggregation, post_aggregation) {
460					result.push(Diff::update(
461						Columns::from_row(&pre_row),
462						Columns::from_row(&post_row),
463					));
464				}
465			}
466		}
467
468		Ok(result)
469	}
470
471	/// Process Update diffs by replacing events in-place within their windows.
472	fn process_event_updates(&self, txn: &mut FlowTransaction, pre: &Columns, post: &Columns) -> Result<Vec<Diff>> {
473		let row_count = pre.row_count();
474		if row_count == 0 {
475			return Ok(Vec::new());
476		}
477
478		let group_hashes = self.compute_group_keys(pre)?;
479		let post_timestamps = self.resolve_event_timestamps(post, row_count)?;
480		let mut result = Vec::new();
481
482		for row_idx in 0..row_count {
483			let row_number = pre.row_numbers[row_idx];
484			let group_hash = group_hashes[row_idx];
485			let post_timestamp = post_timestamps[row_idx];
486
487			let single_row = post.extract_row(row_idx);
488			let projected = self.project_columns(&single_row);
489			let post_row = projected.to_single_row();
490
491			let diffs =
492				self.replace_event_in_windows(txn, group_hash, row_number, &post_row, post_timestamp)?;
493			result.extend(diffs);
494		}
495
496		Ok(result)
497	}
498
499	fn remove_event_from_windows(
500		&self,
501		txn: &mut FlowTransaction,
502		group_hash: Hash128,
503		row_number: RowNumber,
504	) -> Result<Vec<Diff>> {
505		let window_ids = self.lookup_row_index(txn, group_hash, row_number)?;
506		if window_ids.is_empty() {
507			return Ok(Vec::new());
508		}
509
510		let mut result = Vec::new();
511		for window_id in &window_ids {
512			let window_key = self.create_window_key(group_hash, *window_id);
513			if let Some(diff) = self.remove_event_from_one_window(txn, &window_key, row_number)? {
514				result.push(diff);
515			}
516		}
517
518		let index_key = self.create_row_index_key(group_hash, row_number);
519		let empty = self.layout.allocate();
520		self.save_state(txn, &index_key, empty)?;
521
522		Ok(result)
523	}
524
525	#[inline]
526	fn remove_event_from_one_window(
527		&self,
528		txn: &mut FlowTransaction,
529		window_key: &EncodedKey,
530		row_number: RowNumber,
531	) -> Result<Option<Diff>> {
532		let mut window_state = self.load_window_state(txn, window_key)?;
533		let Some(idx) = window_state.events.iter().position(|e| e.row_number == row_number) else {
534			return Ok(None);
535		};
536		let Some(layout) = window_state.window_layout.clone() else {
537			return Ok(None);
538		};
539
540		let changed_at = DateTime::from_nanos(txn.clock().now_nanos());
541		let pre_aggregation =
542			self.apply_aggregations(txn, window_key, &layout, &window_state.events, changed_at)?;
543
544		window_state.events.remove(idx);
545		window_state.event_count = window_state.event_count.saturating_sub(1);
546
547		if window_state.events.is_empty() {
548			self.save_window_state(txn, window_key, &window_state)?;
549			return Ok(pre_aggregation.map(|(pre_row, _)| Diff::remove(Columns::from_row(&pre_row))));
550		}
551
552		let post_aggregation =
553			self.apply_aggregations(txn, window_key, &layout, &window_state.events, changed_at)?;
554		self.save_window_state(txn, window_key, &window_state)?;
555
556		Ok(match (pre_aggregation, post_aggregation) {
557			(Some((pre_row, _)), Some((post_row, _))) => {
558				Some(Diff::update(Columns::from_row(&pre_row), Columns::from_row(&post_row)))
559			}
560			_ => None,
561		})
562	}
563
564	/// Process Remove diffs by removing events from their windows.
565	fn process_event_removals(&self, txn: &mut FlowTransaction, pre: &Columns) -> Result<Vec<Diff>> {
566		let row_count = pre.row_count();
567		if row_count == 0 {
568			return Ok(Vec::new());
569		}
570
571		let group_hashes = self.compute_group_keys(pre)?;
572		let mut result = Vec::new();
573
574		for (&row_number, &group_hash) in pre.row_numbers.iter().zip(group_hashes.iter()) {
575			let diffs = self.remove_event_from_windows(txn, group_hash, row_number)?;
576			result.extend(diffs);
577		}
578
579		Ok(result)
580	}
581
582	/// Extract group values from window events (all events in a group have the same group values).
583	/// Evaluates compiled_group_by expressions on the first row of the events.
584	pub fn extract_group_values(
585		&self,
586		window_layout: &WindowLayout,
587		events: &[WindowEvent],
588	) -> Result<(Vec<Value>, Vec<String>)> {
589		if events.is_empty() || self.group_by.is_empty() {
590			return Ok((Vec::new(), Vec::new()));
591		}
592
593		let columns = self.events_to_columns(window_layout, events)?;
594		let row_count = columns.row_count();
595		if row_count == 0 {
596			return Ok((Vec::new(), Vec::new()));
597		}
598
599		let session = self.eval_session(false);
600		let exec_ctx = session.with_eval(columns, row_count);
601
602		let mut values = Vec::new();
603		let mut names = Vec::new();
604		for (i, compiled_expr) in self.compiled_group_by.iter().enumerate() {
605			let col = compiled_expr.execute(&exec_ctx)?;
606			values.push(col.data().get_value(0).clone());
607			names.push(display_label(&self.group_by[i]).text().to_string());
608		}
609
610		Ok((values, names))
611	}
612
613	/// Convert window events to columnar format for aggregation
614	pub fn events_to_columns(&self, window_layout: &WindowLayout, events: &[WindowEvent]) -> Result<Columns> {
615		if events.is_empty() {
616			return Ok(Columns::new(Vec::new()));
617		}
618
619		let mut builders: Vec<ColumnBuffer> = window_layout
620			.types
621			.iter()
622			.map(|ty| ColumnBuffer::with_capacity(ty.clone(), events.len()))
623			.collect();
624
625		for event in events.iter() {
626			let row = event.to_row(window_layout);
627			for (idx, builder) in builders.iter_mut().enumerate() {
628				let value = row.shape.get_value(&row.encoded, idx);
629				builder.push_value(value);
630			}
631		}
632
633		let columns = window_layout
634			.names
635			.iter()
636			.zip(builders)
637			.map(|(name, data)| ColumnWithName {
638				name: Fragment::internal(name.clone()),
639				data,
640			})
641			.collect();
642
643		Ok(Columns::new(columns))
644	}
645
646	pub fn apply_aggregations(
647		&self,
648		txn: &mut FlowTransaction,
649		window_key: &EncodedKey,
650		window_layout: &WindowLayout,
651		events: &[WindowEvent],
652		changed_at: DateTime,
653	) -> Result<Option<(Row, bool)>> {
654		if events.is_empty() {
655			return Ok(None);
656		}
657
658		if self.aggregations.is_empty() {
659			let (result_row_number, is_new) =
660				self.row_number_provider.get_or_create_row_number(txn, window_key)?;
661			let mut result_row = events[0].to_row(window_layout);
662			result_row.number = result_row_number;
663			return Ok(Some((result_row, is_new)));
664		}
665
666		let (result_values, result_names, result_types) =
667			self.compute_aggregation_outputs(window_layout, events)?;
668
669		let layout = build_aggregation_shape(&result_names, &result_types);
670		let mut encoded = layout.allocate();
671		layout.set_values(&mut encoded, &result_values);
672
673		let ts_nanos = changed_at.to_nanos();
674		encoded.set_timestamps(ts_nanos, ts_nanos);
675
676		let (result_row_number, is_new) = self.row_number_provider.get_or_create_row_number(txn, window_key)?;
677		Ok(Some((
678			Row {
679				number: result_row_number,
680				encoded,
681				shape: layout,
682			},
683			is_new,
684		)))
685	}
686
687	#[inline]
688	fn compute_aggregation_outputs(
689		&self,
690		window_layout: &WindowLayout,
691		events: &[WindowEvent],
692	) -> Result<(Vec<Value>, Vec<String>, Vec<Type>)> {
693		let columns = self.events_to_columns(window_layout, events)?;
694		let agg_session = self.eval_session(true);
695		let exec_ctx = agg_session.with_eval(columns, events.len());
696		let (group_values, group_names) = self.extract_group_values(window_layout, events)?;
697
698		let mut values = Vec::new();
699		let mut names = Vec::new();
700		let mut types = Vec::new();
701
702		for (value, name) in group_values.into_iter().zip(group_names.into_iter()) {
703			types.push(value.get_type());
704			values.push(value);
705			names.push(name);
706		}
707
708		for (i, compiled_aggregation) in self.compiled_aggregations.iter().enumerate() {
709			let agg_column = compiled_aggregation.execute(&exec_ctx)?;
710			let value = agg_column.data().get_value(0);
711			types.push(value.get_type());
712			values.push(value);
713			names.push(display_label(&self.aggregations[i]).text().to_string());
714		}
715
716		Ok((values, names, types))
717	}
718
719	/// Process expired windows: emit Remove diffs for each, then delete state.
720	/// Uses the group registry for per-group targeted expiration.
721	pub fn process_expired_windows(&self, txn: &mut FlowTransaction, current_timestamp: u64) -> Result<Vec<Diff>> {
722		let mut result = Vec::new();
723
724		if let Some(duration) = self.size_duration() {
725			let window_size_ms = duration.as_millis() as u64;
726			if window_size_ms > 0 {
727				let expire_before = current_timestamp.saturating_sub(window_size_ms * 2);
728				let cutoff_id = expire_before / window_size_ms;
729				if cutoff_id == 0 {
730					return Ok(result);
731				}
732
733				let groups = self.load_group_registry(txn)?;
734				for group_hash in &groups {
735					// Keycode uses inverted ordering (NOT of big-endian)
736					let low_key = self.create_window_key(*group_hash, cutoff_id);
737					let high_key = self.create_window_key(*group_hash, 0);
738					let range = EncodedKeyRange::new(
739						ops::Bound::Excluded(low_key),
740						ops::Bound::Included(high_key),
741					);
742
743					let expired_keys = self.scan_keys_in_range(txn, &range)?;
744					let changed_at = DateTime::from_nanos(current_timestamp);
745					for key in &expired_keys {
746						let window_state = self.load_window_state(txn, key)?;
747						if !window_state.events.is_empty()
748							&& let Some(layout) = &window_state.window_layout && let Some((
749							row,
750							_,
751						)) = self
752							.apply_aggregations(
753								txn,
754								key,
755								layout,
756								&window_state.events,
757								changed_at,
758							)? {
759							result.push(Diff::remove(Columns::from_row(&row)));
760						}
761					}
762
763					if !expired_keys.is_empty() {
764						let low_key = self.create_window_key(*group_hash, cutoff_id);
765						let high_key = self.create_window_key(*group_hash, 0);
766						let range = EncodedKeyRange::new(
767							ops::Bound::Excluded(low_key),
768							ops::Bound::Included(high_key),
769						);
770						let _ = self.expire_range(txn, range)?;
771					}
772				}
773			}
774		}
775
776		Ok(result)
777	}
778
779	/// Load window state from storage
780	pub fn load_window_state(&self, txn: &mut FlowTransaction, window_key: &EncodedKey) -> Result<WindowState> {
781		let state_row = self.load_state(txn, window_key)?;
782
783		if state_row.is_empty() || !state_row.is_defined(0) {
784			return Ok(WindowState::default());
785		}
786
787		let blob = self.layout.get_blob(&state_row, 0);
788		if blob.is_empty() {
789			return Ok(WindowState::default());
790		}
791
792		from_bytes(blob.as_ref())
793			.map_err(|e| Error(Box::new(internal!("Failed to deserialize WindowState: {}", e))))
794	}
795
796	/// Save window state to storage
797	pub fn save_window_state(
798		&self,
799		txn: &mut FlowTransaction,
800		window_key: &EncodedKey,
801		state: &WindowState,
802	) -> Result<()> {
803		let serialized = to_stdvec(state)
804			.map_err(|e| Error(Box::new(internal!("Failed to serialize WindowState: {}", e))))?;
805
806		let mut state_row = self.layout.allocate();
807		let blob = Blob::from(serialized);
808		self.layout.set_blob(&mut state_row, 0, &blob);
809
810		self.save_state(txn, window_key, state_row)
811	}
812
813	/// Get and increment global event count for count-based windows
814	pub fn get_and_increment_global_count(&self, txn: &mut FlowTransaction, group_hash: Hash128) -> Result<u64> {
815		let count_key = self.create_count_key(group_hash);
816		let count_row = self.load_state(txn, &count_key)?;
817
818		let current_count = if count_row.is_empty() || !count_row.is_defined(0) {
819			0
820		} else {
821			let blob = self.layout.get_blob(&count_row, 0);
822			if blob.is_empty() {
823				0
824			} else {
825				from_bytes(blob.as_ref()).unwrap_or(0)
826			}
827		};
828
829		let new_count = current_count + 1;
830
831		let serialized = to_stdvec(&new_count)
832			.map_err(|e| Error(Box::new(internal!("Failed to serialize count: {}", e))))?;
833
834		let mut count_state_row = self.layout.allocate();
835		let blob = Blob::from(serialized);
836		self.layout.set_blob(&mut count_state_row, 0, &blob);
837
838		self.save_state(txn, &count_key, count_state_row)?;
839
840		Ok(current_count)
841	}
842
843	/// Create a count key for global event counting
844	pub fn create_count_key(&self, group_hash: Hash128) -> EncodedKey {
845		let mut serializer = KeySerializer::with_capacity(32);
846		serializer.extend_bytes(b"cnt:");
847		serializer.extend_u128(group_hash);
848		EncodedKey::new(serializer.finish())
849	}
850
851	/// Create the group registry key
852	fn create_group_registry_key(&self) -> EncodedKey {
853		EncodedKey::new(b"grp:")
854	}
855
856	/// Load the set of active group hashes from the registry.
857	pub fn load_group_registry(&self, txn: &mut FlowTransaction) -> Result<Vec<Hash128>> {
858		let key = self.create_group_registry_key();
859		let state_row = self.load_state(txn, &key)?;
860		if state_row.is_empty() || !state_row.is_defined(0) {
861			return Ok(Vec::new());
862		}
863		let blob = self.layout.get_blob(&state_row, 0);
864		if blob.is_empty() {
865			return Ok(Vec::new());
866		}
867		let groups: Vec<u128> = from_bytes(blob.as_ref()).unwrap_or_default();
868		Ok(groups.into_iter().map(Hash128::from).collect())
869	}
870
871	/// Save the group registry.
872	fn save_group_registry(&self, txn: &mut FlowTransaction, groups: &[Hash128]) -> Result<()> {
873		let key = self.create_group_registry_key();
874		let raw: Vec<u128> = groups.iter().map(|h| (*h).into()).collect();
875		let serialized = to_stdvec(&raw)
876			.map_err(|e| Error(Box::new(internal!("Failed to serialize group registry: {}", e))))?;
877		let mut state_row = self.layout.allocate();
878		let blob = Blob::from(serialized);
879		self.layout.set_blob(&mut state_row, 0, &blob);
880		self.save_state(txn, &key, state_row)
881	}
882
883	/// Register a group hash in the registry if not already present.
884	pub fn register_group(&self, txn: &mut FlowTransaction, group_hash: Hash128) -> Result<()> {
885		let mut groups = self.load_group_registry(txn)?;
886		if !groups.contains(&group_hash) {
887			groups.push(group_hash);
888			self.save_group_registry(txn, &groups)?;
889		}
890		Ok(())
891	}
892
893	pub fn tick_expire_windows(&self, txn: &mut FlowTransaction, current_timestamp: u64) -> Result<Vec<Diff>> {
894		let mut result = Vec::new();
895		let window_size_ms = match self.size_duration() {
896			Some(d) => d.as_millis() as u64,
897			None => return Ok(result),
898		};
899		if window_size_ms == 0 {
900			return Ok(result);
901		}
902
903		let mut keys_to_remove = Vec::new();
904		for window_key in self.scan_window_keys(txn)? {
905			if let Some(diff) =
906				self.expire_window_if_due(txn, &window_key, current_timestamp, window_size_ms)?
907			{
908				result.push(diff);
909				keys_to_remove.push(window_key);
910			}
911		}
912
913		for key in &keys_to_remove {
914			let empty = self.create_state();
915			self.save_state(txn, key, empty)?;
916		}
917
918		Ok(result)
919	}
920
921	#[inline]
922	fn scan_window_keys(&self, txn: &mut FlowTransaction) -> Result<Vec<EncodedKey>> {
923		let all_state = txn.state_scan(self.node)?;
924		let prefix = FlowNodeStateKey::new(self.node, vec![]).encode();
925		let win_marker = b"win:";
926
927		let mut keys = Vec::new();
928		for item in &all_state.items {
929			let full_key = &item.key;
930			if full_key.len() <= prefix.len() {
931				continue;
932			}
933			let inner = &full_key[prefix.len()..];
934			if !inner.starts_with(win_marker) {
935				continue;
936			}
937			keys.push(EncodedKey::new(inner));
938		}
939		Ok(keys)
940	}
941
942	#[inline]
943	fn expire_window_if_due(
944		&self,
945		txn: &mut FlowTransaction,
946		window_key: &EncodedKey,
947		current_timestamp: u64,
948		window_size_ms: u64,
949	) -> Result<Option<Diff>> {
950		let window_state = self.load_window_state(txn, window_key)?;
951		if window_state.events.is_empty() {
952			return Ok(None);
953		}
954
955		let newest_event_time = window_state.events.iter().map(|e| e.timestamp).max().unwrap_or(0);
956		if current_timestamp.saturating_sub(newest_event_time) <= window_size_ms {
957			return Ok(None);
958		}
959
960		let changed_at = DateTime::from_nanos(current_timestamp);
961		if let Some(layout) = &window_state.window_layout
962			&& let Some((row, _)) =
963				self.apply_aggregations(txn, window_key, layout, &window_state.events, changed_at)?
964		{
965			return Ok(Some(Diff::remove(Columns::from_row(&row))));
966		}
967		Ok(None)
968	}
969
970	/// Shared: partition columns by group keys and call `group_fn` for each group.
971	pub fn process_insert(
972		&self,
973		txn: &mut FlowTransaction,
974		columns: &Columns,
975		changed_at: DateTime,
976		group_fn: impl Fn(&WindowOperator, &mut FlowTransaction, &Columns, Hash128, DateTime) -> Result<Vec<Diff>>,
977	) -> Result<Vec<Diff>> {
978		let row_count = columns.row_count();
979		if row_count == 0 {
980			return Ok(Vec::new());
981		}
982		let group_hashes = self.compute_group_keys(columns)?;
983		let groups = columns.partition_by_keys(&group_hashes);
984		let mut result = Vec::new();
985		for (group_hash, group_columns) in groups {
986			self.register_group(txn, group_hash)?;
987			let group_result = group_fn(self, txn, &group_columns, group_hash, changed_at)?;
988			result.extend(group_result);
989		}
990		Ok(result)
991	}
992
993	/// Shared: iterate change diffs and process inserts/updates via `process_fn`.
994	/// Optionally runs expiration first (all kinds except rolling).
995	pub fn apply_window_change(
996		&self,
997		txn: &mut FlowTransaction,
998		change: &Change,
999		expire: bool,
1000		process_fn: impl Fn(&WindowOperator, &mut FlowTransaction, &Columns) -> Result<Vec<Diff>>,
1001	) -> Result<Vec<Diff>> {
1002		let mut result = Vec::new();
1003		if expire {
1004			let current_timestamp = self.current_timestamp();
1005			let expired_diffs = self.process_expired_windows(txn, current_timestamp)?;
1006			result.extend(expired_diffs);
1007		}
1008		for diff in change.diffs.iter() {
1009			match diff {
1010				Diff::Insert {
1011					post,
1012				} => result.extend(process_fn(self, txn, post)?),
1013				Diff::Update {
1014					pre,
1015					post,
1016				} => result.extend(self.apply_window_update_diff(txn, pre, post, &process_fn)?),
1017				Diff::Remove {
1018					pre,
1019				} => result.extend(self.process_event_removals(txn, pre)?),
1020			}
1021		}
1022		Ok(result)
1023	}
1024
1025	#[inline]
1026	fn apply_window_update_diff(
1027		&self,
1028		txn: &mut FlowTransaction,
1029		pre: &Columns,
1030		post: &Columns,
1031		process_fn: &impl Fn(&WindowOperator, &mut FlowTransaction, &Columns) -> Result<Vec<Diff>>,
1032	) -> Result<Vec<Diff>> {
1033		let group_hashes = self.compute_group_keys(pre)?;
1034		let mut update_indices: Vec<usize> = Vec::new();
1035		let mut insert_indices: Vec<usize> = Vec::new();
1036		for (row_idx, &group_hash) in group_hashes.iter().enumerate() {
1037			let row_number = pre.row_numbers[row_idx];
1038			if self.lookup_row_index(txn, group_hash, row_number)?.is_empty() {
1039				insert_indices.push(row_idx);
1040			} else {
1041				update_indices.push(row_idx);
1042			}
1043		}
1044
1045		let mut result = Vec::new();
1046		if !update_indices.is_empty() {
1047			let pre_subset = pre.extract_by_indices(&update_indices);
1048			let post_subset = post.extract_by_indices(&update_indices);
1049			result.extend(self.process_event_updates(txn, &pre_subset, &post_subset)?);
1050		}
1051		if !insert_indices.is_empty() {
1052			let post_subset = post.extract_by_indices(&insert_indices);
1053			result.extend(process_fn(self, txn, &post_subset)?);
1054		}
1055		Ok(result)
1056	}
1057
1058	/// Shared: emit an Insert or Update diff for an aggregation result.
1059	/// `previous_aggregation` is the pre-update state (if the window already existed).
1060	pub fn emit_aggregation_diff(
1061		aggregated_row: &Row,
1062		is_new: bool,
1063		previous_aggregation: Option<(Row, bool)>,
1064	) -> Diff {
1065		if is_new {
1066			Diff::insert(Columns::from_row(aggregated_row))
1067		} else if let Some((previous_row, _)) = previous_aggregation {
1068			Diff::update(Columns::from_row(&previous_row), Columns::from_row(aggregated_row))
1069		} else {
1070			Diff::insert(Columns::from_row(aggregated_row))
1071		}
1072	}
1073}
1074
1075impl RawStatefulOperator for WindowOperator {}
1076
1077impl WindowStateful for WindowOperator {
1078	fn layout(&self) -> RowShape {
1079		self.layout.clone()
1080	}
1081}
1082
1083impl Operator for WindowOperator {
1084	fn id(&self) -> FlowNodeId {
1085		self.node
1086	}
1087
1088	fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
1089		match &self.kind {
1090			WindowKind::Tumbling {
1091				..
1092			} => apply_tumbling_window(self, txn, change),
1093			WindowKind::Sliding {
1094				..
1095			} => apply_sliding_window(self, txn, change),
1096			WindowKind::Rolling {
1097				..
1098			} => apply_rolling_window(self, txn, change),
1099			WindowKind::Session {
1100				..
1101			} => apply_session_window(self, txn, change),
1102		}
1103	}
1104
1105	fn tick(&self, txn: &mut FlowTransaction, timestamp: DateTime) -> Result<Option<Change>> {
1106		let current_timestamp = timestamp.to_nanos() / 1_000_000;
1107		let diffs = match &self.kind {
1108			WindowKind::Tumbling {
1109				..
1110			}
1111			| WindowKind::Sliding {
1112				..
1113			} => self.tick_expire_windows(txn, current_timestamp)?,
1114			WindowKind::Rolling {
1115				size: WindowSize::Duration(_),
1116			} => self.tick_rolling_eviction(txn, current_timestamp)?,
1117			WindowKind::Session {
1118				..
1119			} => self.tick_session_expiration(txn, current_timestamp)?,
1120			_ => vec![],
1121		};
1122
1123		if diffs.is_empty() {
1124			Ok(None)
1125		} else {
1126			Ok(Some(Change::from_flow(
1127				self.node,
1128				CommitVersion(0),
1129				diffs,
1130				DateTime::from_nanos(self.runtime_context.clock.now_nanos()),
1131			)))
1132		}
1133	}
1134
1135	fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
1136		self.parent.pull(txn, rows)
1137	}
1138}