Skip to main content

reifydb_sub_flow/operator/window/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::Arc;
5
6use reifydb_core::{
7	common::{CommitVersion, WindowKind, WindowSize},
8	error::diagnostic::flow::{flow_window_timestamp_column_not_found, flow_window_timestamp_column_type_mismatch},
9	interface::catalog::flow::FlowNodeId,
10	internal,
11	key::{EncodableKey, flow_node_state::FlowNodeStateKey},
12};
13use serde::{Deserialize, Serialize};
14
15use crate::{
16	operator::{Operator, Operators},
17	transaction::FlowTransaction,
18};
19
20pub mod rolling;
21pub mod session;
22pub mod sliding;
23pub mod tumbling;
24
25use rolling::apply_rolling_window;
26use session::apply_session_window;
27use sliding::apply_sliding_window;
28use tumbling::apply_tumbling_window;
29
30static EMPTY_PARAMS: Params = Params::None;
31
32use std::{ops, sync::LazyLock, time::Duration};
33
34use postcard::{from_bytes, to_stdvec};
35use reifydb_core::{
36	encoded::{
37		key::{EncodedKey, EncodedKeyRange},
38		row::EncodedRow,
39		schema::{RowSchema, RowSchemaField},
40	},
41	interface::change::{Change, Diff},
42	row::Row,
43	util::encoding::keycode::serializer::KeySerializer,
44	value::column::{Column, columns::Columns, data::ColumnData},
45};
46use reifydb_engine::{
47	expression::{
48		compile::{CompiledExpr, compile_expression},
49		context::{CompileContext, EvalSession},
50	},
51	vm::stack::SymbolTable,
52};
53use reifydb_routine::function::registry::Functions;
54use reifydb_rql::expression::{
55	Expression,
56	name::{collect_all_column_names, column_name_from_expression},
57};
58use reifydb_runtime::{
59	context::RuntimeContext,
60	hash::{Hash128, xxh3_128},
61};
62use reifydb_type::{
63	Result,
64	error::Error,
65	fragment::Fragment,
66	params::Params,
67	util::cowvec::CowVec,
68	value::{Value, blob::Blob, datetime::DateTime, identity::IdentityId, row_number::RowNumber, r#type::Type},
69};
70
71use crate::operator::stateful::{raw::RawStatefulOperator, row::RowNumberProvider, window::WindowStateful};
72
73static EMPTY_SYMBOL_TABLE: LazyLock<SymbolTable> = LazyLock::new(|| SymbolTable::new());
74
75/// RowSchema layout shared across all events in a window (stored once, not per event)
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct WindowLayout {
78	pub names: Vec<String>,
79	pub types: Vec<Type>,
80}
81
82impl WindowLayout {
83	pub fn from_row(row: &Row) -> Self {
84		Self {
85			names: row.schema.field_names().map(|s| s.to_string()).collect(),
86			types: row.schema.fields().iter().map(|f| f.constraint.get_type()).collect(),
87		}
88	}
89
90	pub fn to_schema(&self) -> RowSchema {
91		let fields: Vec<RowSchemaField> = self
92			.names
93			.iter()
94			.zip(self.types.iter())
95			.map(|(name, ty)| RowSchemaField::unconstrained(name.clone(), ty.clone()))
96			.collect();
97		RowSchema::new(fields)
98	}
99}
100
101/// A single event stored within a window
102#[derive(Debug, Clone, Serialize, Deserialize)]
103pub struct WindowEvent {
104	pub row_number: RowNumber,
105	pub timestamp: u64,
106	#[serde(with = "serde_bytes")]
107	pub encoded_bytes: Vec<u8>,
108}
109
110impl WindowEvent {
111	pub fn from_row(row: &Row, timestamp: u64) -> Self {
112		Self {
113			row_number: row.number,
114			timestamp,
115			encoded_bytes: row.encoded.as_slice().to_vec(),
116		}
117	}
118
119	pub fn to_row(&self, layout: &WindowLayout) -> Row {
120		let schema = layout.to_schema();
121		let encoded = EncodedRow(CowVec::new(self.encoded_bytes.clone()));
122		Row {
123			number: self.row_number,
124			encoded,
125			schema,
126		}
127	}
128}
129
130/// State for a single window
131#[derive(Debug, Clone, Serialize, Deserialize)]
132pub struct WindowState {
133	/// All events in this window (stored in insertion order)
134	pub events: Vec<WindowEvent>,
135	/// RowSchema layout shared by all events (set on first event)
136	pub window_layout: Option<WindowLayout>,
137	/// Window creation timestamp
138	pub window_start: u64,
139	/// Count of events in window (for count-based windows)
140	pub event_count: u64,
141	/// Timestamp of last event (for session windows)
142	pub last_event_time: u64,
143}
144
145impl WindowState {
146	/// Get the layout, panics if not set (should always be set after first event)
147	pub fn layout(&self) -> &WindowLayout {
148		self.window_layout.as_ref().expect("WindowState layout must be set before accessing")
149	}
150}
151
152impl Default for WindowState {
153	fn default() -> Self {
154		Self {
155			events: Vec::new(),
156			window_layout: None,
157			window_start: 0,
158			event_count: 0,
159			last_event_time: 0,
160		}
161	}
162}
163
164/// The main window operator
165pub struct WindowOperator {
166	pub parent: Arc<Operators>,
167	pub node: FlowNodeId,
168	pub kind: WindowKind,
169	pub group_by: Vec<Expression>,
170	pub aggregations: Vec<Expression>,
171	pub ts: Option<String>,
172	pub compiled_group_by: Vec<CompiledExpr>,
173	pub compiled_aggregations: Vec<CompiledExpr>,
174	pub layout: RowSchema,
175	pub functions: Functions,
176	pub row_number_provider: RowNumberProvider,
177	pub runtime_context: RuntimeContext,
178	/// Column names needed by group_by + aggregations expressions.
179	/// When empty, no projection is applied (all columns stored).
180	pub projected_columns: Vec<String>,
181}
182
183impl WindowOperator {
184	pub fn new(
185		parent: Arc<Operators>,
186		node: FlowNodeId,
187		kind: WindowKind,
188		group_by: Vec<Expression>,
189		aggregations: Vec<Expression>,
190		ts: Option<String>,
191		runtime_context: RuntimeContext,
192		functions: Functions,
193	) -> Self {
194		let symbols = SymbolTable::new();
195		let compile_ctx = CompileContext {
196			functions: &functions,
197			symbols: &symbols,
198		};
199
200		// Compile group_by expressions
201		let compiled_group_by: Vec<CompiledExpr> = group_by
202			.iter()
203			.map(|e| compile_expression(&compile_ctx, e).expect("Failed to compile group_by expression"))
204			.collect();
205
206		// Compile aggregation expressions
207		let compiled_aggregations: Vec<CompiledExpr> = aggregations
208			.iter()
209			.map(|e| compile_expression(&compile_ctx, e).expect("Failed to compile aggregation expression"))
210			.collect();
211
212		let mut needed = collect_all_column_names(&group_by);
213		needed.extend(collect_all_column_names(&aggregations));
214		let mut projected_columns: Vec<String> = needed.into_iter().collect();
215		projected_columns.sort();
216
217		Self {
218			parent,
219			node,
220			kind,
221			group_by,
222			aggregations,
223			ts,
224			compiled_group_by,
225			compiled_aggregations,
226			layout: RowSchema::testing(&[Type::Blob]),
227			functions,
228			row_number_provider: RowNumberProvider::new(node),
229			runtime_context,
230			projected_columns,
231		}
232	}
233
234	/// Get the current timestamp in milliseconds
235	pub fn current_timestamp(&self) -> u64 {
236		self.runtime_context.clock.now_millis()
237	}
238
239	/// Project a single-row Columns down to only the columns needed by window expressions.
240	pub fn project_columns(&self, columns: &Columns) -> Columns {
241		if self.projected_columns.is_empty() {
242			return columns.clone();
243		}
244		columns.project_by_names(&self.projected_columns)
245	}
246
247	/// Whether this is a count-based window
248	pub fn is_count_based(&self) -> bool {
249		self.kind.size().map_or(false, |m| m.is_count())
250	}
251
252	/// Get the window size as duration (if time-based)
253	pub fn size_duration(&self) -> Option<Duration> {
254		self.kind.size().and_then(|m| m.as_duration())
255	}
256
257	/// Get the window size as count (if count-based)
258	pub fn size_count(&self) -> Option<u64> {
259		self.kind.size().and_then(|m| m.as_count())
260	}
261
262	fn eval_session(&self, is_aggregate: bool) -> EvalSession<'_> {
263		EvalSession {
264			params: &EMPTY_PARAMS,
265			symbols: &EMPTY_SYMBOL_TABLE,
266			functions: &self.functions,
267			runtime_context: &self.runtime_context,
268			arena: None,
269			identity: IdentityId::root(),
270			is_aggregate_context: is_aggregate,
271		}
272	}
273
274	/// Compute group keys for all rows in Columns
275	pub fn compute_group_keys(&self, columns: &Columns) -> Result<Vec<Hash128>> {
276		let row_count = columns.row_count();
277		if row_count == 0 {
278			return Ok(Vec::new());
279		}
280
281		if self.compiled_group_by.is_empty() {
282			return Ok(vec![Hash128::from(0u128); row_count]);
283		}
284
285		let session = self.eval_session(false);
286		let exec_ctx = session.eval(columns.clone(), row_count);
287
288		let mut group_columns: Vec<Column> = Vec::new();
289		for compiled_expr in &self.compiled_group_by {
290			let col = compiled_expr.execute(&exec_ctx)?;
291			group_columns.push(col);
292		}
293
294		let mut hashes = Vec::with_capacity(row_count);
295		for row_idx in 0..row_count {
296			let mut data = Vec::new();
297			for col in &group_columns {
298				let value = col.data().get_value(row_idx);
299				let value_str = value.to_string();
300				data.extend_from_slice(value_str.as_bytes());
301			}
302			hashes.push(xxh3_128(&data));
303		}
304
305		Ok(hashes)
306	}
307
308	/// Resolve event timestamps for all rows.
309	/// When `ts` is configured, reads from the named DateTime column.
310	/// Otherwise falls back to processing time (current clock).
311	pub fn resolve_event_timestamps(&self, columns: &Columns, row_count: usize) -> Result<Vec<u64>> {
312		if row_count == 0 {
313			return Ok(Vec::new());
314		}
315		match &self.ts {
316			Some(ts_col) => {
317				let col = columns
318					.column(ts_col)
319					.ok_or_else(|| Error(flow_window_timestamp_column_not_found(ts_col)))?;
320				let mut timestamps = Vec::with_capacity(row_count);
321				for i in 0..row_count {
322					match col.data().get_value(i) {
323						Value::DateTime(dt) => timestamps.push(dt.timestamp_millis() as u64),
324						other => {
325							return Err(Error(flow_window_timestamp_column_type_mismatch(
326								ts_col,
327								other.get_type(),
328							)));
329						}
330					}
331				}
332				Ok(timestamps)
333			}
334			None => {
335				let now = self.current_timestamp();
336				Ok(vec![now; row_count])
337			}
338		}
339	}
340
341	/// Create a window key for storage
342	pub fn create_window_key(&self, group_hash: Hash128, window_id: u64) -> EncodedKey {
343		let mut serializer = KeySerializer::with_capacity(32);
344		serializer.extend_bytes(b"win:");
345		serializer.extend_u128(group_hash);
346		serializer.extend_u64(window_id);
347		EncodedKey::new(serializer.finish())
348	}
349
350	/// Create a row index key for mapping row_number → window_id
351	fn create_row_index_key(&self, group_hash: Hash128, row_number: RowNumber) -> EncodedKey {
352		let mut serializer = KeySerializer::with_capacity(32);
353		serializer.extend_bytes(b"idx:");
354		serializer.extend_u128(group_hash);
355		serializer.extend_u64(row_number.0);
356		EncodedKey::new(serializer.finish())
357	}
358
359	/// Store a row_number → window_ids mapping.
360	/// Appends window_id to the existing list (supports sliding windows with multiple windows per event).
361	pub fn store_row_index(
362		&self,
363		txn: &mut FlowTransaction,
364		group_hash: Hash128,
365		row_number: RowNumber,
366		window_id: u64,
367	) -> Result<()> {
368		let index_key = self.create_row_index_key(group_hash, row_number);
369		let mut window_ids = self.lookup_row_index(txn, group_hash, row_number)?;
370		if !window_ids.contains(&window_id) {
371			window_ids.push(window_id);
372		}
373		let serialized =
374			to_stdvec(&window_ids).map_err(|e| Error(internal!("Failed to serialize row index: {}", e)))?;
375		let mut state_row = self.layout.allocate();
376		let blob = Blob::from(serialized);
377		self.layout.set_blob(&mut state_row, 0, &blob);
378		self.save_state(txn, &index_key, state_row)
379	}
380
381	/// Look up all window_ids for a given row_number
382	fn lookup_row_index(
383		&self,
384		txn: &mut FlowTransaction,
385		group_hash: Hash128,
386		row_number: RowNumber,
387	) -> Result<Vec<u64>> {
388		let index_key = self.create_row_index_key(group_hash, row_number);
389		let state_row = self.load_state(txn, &index_key)?;
390		if state_row.is_empty() || !state_row.is_defined(0) {
391			return Ok(Vec::new());
392		}
393		let blob = self.layout.get_blob(&state_row, 0);
394		if blob.is_empty() {
395			return Ok(Vec::new());
396		}
397		let window_ids: Vec<u64> = from_bytes(blob.as_ref())
398			.map_err(|e| Error(internal!("Failed to deserialize row index: {}", e)))?;
399		Ok(window_ids)
400	}
401
402	/// Replace an event across all its windows in-place (for UPDATE handling).
403	/// For sliding windows, an event may exist in multiple windows — all are updated.
404	fn replace_event_in_windows(
405		&self,
406		txn: &mut FlowTransaction,
407		group_hash: Hash128,
408		row_number: RowNumber,
409		post_row: &Row,
410		post_timestamp: u64,
411	) -> Result<Vec<Diff>> {
412		let window_ids = self.lookup_row_index(txn, group_hash, row_number)?;
413		if window_ids.is_empty() {
414			return Ok(Vec::new());
415		}
416
417		let mut result = Vec::new();
418
419		for window_id in &window_ids {
420			let window_key = self.create_window_key(group_hash, *window_id);
421			let mut window_state = self.load_window_state(txn, &window_key)?;
422
423			let event_idx = window_state.events.iter().position(|e| e.row_number == row_number);
424			if let Some(idx) = event_idx {
425				let layout = match &window_state.window_layout {
426					Some(l) => l.clone(),
427					None => continue,
428				};
429
430				let pre_aggregation =
431					self.apply_aggregations(txn, &window_key, &layout, &window_state.events)?;
432
433				window_state.events[idx] = WindowEvent::from_row(post_row, post_timestamp);
434
435				let post_aggregation =
436					self.apply_aggregations(txn, &window_key, &layout, &window_state.events)?;
437
438				self.save_window_state(txn, &window_key, &window_state)?;
439
440				if let (Some((pre_row, _)), Some((post_row, _))) = (pre_aggregation, post_aggregation) {
441					result.push(Diff::Update {
442						pre: Columns::from_row(&pre_row),
443						post: Columns::from_row(&post_row),
444					});
445				}
446			}
447		}
448
449		Ok(result)
450	}
451
452	/// Process Update diffs by replacing events in-place within their windows.
453	fn process_event_updates(&self, txn: &mut FlowTransaction, pre: &Columns, post: &Columns) -> Result<Vec<Diff>> {
454		let row_count = pre.row_count();
455		if row_count == 0 {
456			return Ok(Vec::new());
457		}
458
459		let group_hashes = self.compute_group_keys(pre)?;
460		let post_timestamps = self.resolve_event_timestamps(post, row_count)?;
461		let mut result = Vec::new();
462
463		for row_idx in 0..row_count {
464			let row_number = pre.row_numbers[row_idx];
465			let group_hash = group_hashes[row_idx];
466			let post_timestamp = post_timestamps[row_idx];
467
468			let single_row = post.extract_row(row_idx);
469			let projected = self.project_columns(&single_row);
470			let post_row = projected.to_single_row();
471
472			let diffs =
473				self.replace_event_in_windows(txn, group_hash, row_number, &post_row, post_timestamp)?;
474			result.extend(diffs);
475		}
476
477		Ok(result)
478	}
479
480	/// Remove an event from all its windows (for DELETE handling).
481	fn remove_event_from_windows(
482		&self,
483		txn: &mut FlowTransaction,
484		group_hash: Hash128,
485		row_number: RowNumber,
486	) -> Result<Vec<Diff>> {
487		let window_ids = self.lookup_row_index(txn, group_hash, row_number)?;
488		if window_ids.is_empty() {
489			return Ok(Vec::new());
490		}
491
492		let mut result = Vec::new();
493
494		for window_id in &window_ids {
495			let window_key = self.create_window_key(group_hash, *window_id);
496			let mut window_state = self.load_window_state(txn, &window_key)?;
497
498			let event_idx = window_state.events.iter().position(|e| e.row_number == row_number);
499			if let Some(idx) = event_idx {
500				let layout = match &window_state.window_layout {
501					Some(l) => l.clone(),
502					None => continue,
503				};
504
505				let pre_aggregation =
506					self.apply_aggregations(txn, &window_key, &layout, &window_state.events)?;
507
508				window_state.events.remove(idx);
509				window_state.event_count = window_state.event_count.saturating_sub(1);
510
511				if window_state.events.is_empty() {
512					self.save_window_state(txn, &window_key, &window_state)?;
513					if let Some((pre_row, _)) = pre_aggregation {
514						result.push(Diff::Remove {
515							pre: Columns::from_row(&pre_row),
516						});
517					}
518				} else {
519					let post_aggregation = self.apply_aggregations(
520						txn,
521						&window_key,
522						&layout,
523						&window_state.events,
524					)?;
525					self.save_window_state(txn, &window_key, &window_state)?;
526
527					if let (Some((pre_row, _)), Some((post_row, _))) =
528						(pre_aggregation, post_aggregation)
529					{
530						result.push(Diff::Update {
531							pre: Columns::from_row(&pre_row),
532							post: Columns::from_row(&post_row),
533						});
534					}
535				}
536			}
537		}
538
539		// Clean up the index entry
540		let index_key = self.create_row_index_key(group_hash, row_number);
541		let empty = self.layout.allocate();
542		self.save_state(txn, &index_key, empty)?;
543
544		Ok(result)
545	}
546
547	/// Process Remove diffs by removing events from their windows.
548	fn process_event_removals(&self, txn: &mut FlowTransaction, pre: &Columns) -> Result<Vec<Diff>> {
549		let row_count = pre.row_count();
550		if row_count == 0 {
551			return Ok(Vec::new());
552		}
553
554		let group_hashes = self.compute_group_keys(pre)?;
555		let mut result = Vec::new();
556
557		for row_idx in 0..row_count {
558			let row_number = pre.row_numbers[row_idx];
559			let group_hash = group_hashes[row_idx];
560
561			let diffs = self.remove_event_from_windows(txn, group_hash, row_number)?;
562			result.extend(diffs);
563		}
564
565		Ok(result)
566	}
567
568	/// Extract group values from window events (all events in a group have the same group values).
569	/// Evaluates compiled_group_by expressions on the first row of the events.
570	pub fn extract_group_values(
571		&self,
572		window_layout: &WindowLayout,
573		events: &[WindowEvent],
574	) -> Result<(Vec<Value>, Vec<String>)> {
575		if events.is_empty() || self.group_by.is_empty() {
576			return Ok((Vec::new(), Vec::new()));
577		}
578
579		let columns = self.events_to_columns(window_layout, events)?;
580		let row_count = columns.row_count();
581		if row_count == 0 {
582			return Ok((Vec::new(), Vec::new()));
583		}
584
585		let session = self.eval_session(false);
586		let exec_ctx = session.eval(columns, row_count);
587
588		let mut values = Vec::new();
589		let mut names = Vec::new();
590		for (i, compiled_expr) in self.compiled_group_by.iter().enumerate() {
591			let col = compiled_expr.execute(&exec_ctx)?;
592			values.push(col.data().get_value(0).clone());
593			names.push(column_name_from_expression(&self.group_by[i]).text().to_string());
594		}
595
596		Ok((values, names))
597	}
598
599	/// Convert window events to columnar format for aggregation
600	pub fn events_to_columns(&self, window_layout: &WindowLayout, events: &[WindowEvent]) -> Result<Columns> {
601		if events.is_empty() {
602			return Ok(Columns::new(Vec::new()));
603		}
604
605		let mut builders: Vec<ColumnData> = window_layout
606			.types
607			.iter()
608			.map(|ty| ColumnData::with_capacity(ty.clone(), events.len()))
609			.collect();
610
611		for event in events.iter() {
612			let row = event.to_row(window_layout);
613			for (idx, builder) in builders.iter_mut().enumerate() {
614				let value = row.schema.get_value(&row.encoded, idx);
615				builder.push_value(value);
616			}
617		}
618
619		let columns = window_layout
620			.names
621			.iter()
622			.zip(builders.into_iter())
623			.map(|(name, data)| Column {
624				name: Fragment::internal(name.clone()),
625				data,
626			})
627			.collect();
628
629		Ok(Columns::new(columns))
630	}
631
632	/// Apply aggregations to all events in a window
633	pub fn apply_aggregations(
634		&self,
635		txn: &mut FlowTransaction,
636		window_key: &EncodedKey,
637		window_layout: &WindowLayout,
638		events: &[WindowEvent],
639	) -> Result<Option<(Row, bool)>> {
640		if events.is_empty() {
641			return Ok(None);
642		}
643
644		if self.aggregations.is_empty() {
645			// No aggregations configured, return first event as result
646			let (result_row_number, is_new) =
647				self.row_number_provider.get_or_create_row_number(txn, window_key)?;
648			let mut result_row = events[0].to_row(window_layout);
649			result_row.number = result_row_number;
650			return Ok(Some((result_row, is_new)));
651		}
652
653		let columns = self.events_to_columns(window_layout, events)?;
654
655		let agg_session = self.eval_session(true);
656		let exec_ctx = agg_session.eval(columns, events.len());
657
658		let (group_values, group_names) = self.extract_group_values(window_layout, events)?;
659
660		let mut result_values = Vec::new();
661		let mut result_names = Vec::new();
662		let mut result_types = Vec::new();
663
664		for (value, name) in group_values.into_iter().zip(group_names.into_iter()) {
665			result_values.push(value.clone());
666			result_names.push(name);
667			result_types.push(value.get_type());
668		}
669
670		for (i, compiled_aggregation) in self.compiled_aggregations.iter().enumerate() {
671			let agg_column = compiled_aggregation.execute(&exec_ctx)?;
672
673			let value = agg_column.data().get_value(0);
674			result_values.push(value.clone());
675			result_names.push(column_name_from_expression(&self.aggregations[i]).text().to_string());
676			result_types.push(value.get_type());
677		}
678
679		let fields: Vec<RowSchemaField> = result_names
680			.iter()
681			.zip(result_types.iter())
682			.map(|(name, ty)| RowSchemaField::unconstrained(name.clone(), ty.clone()))
683			.collect();
684		let layout = RowSchema::new(fields);
685		let mut encoded = layout.allocate();
686		layout.set_values(&mut encoded, &result_values);
687
688		let (result_row_number, is_new) = self.row_number_provider.get_or_create_row_number(txn, window_key)?;
689
690		let result_row = Row {
691			number: result_row_number,
692			encoded,
693			schema: layout,
694		};
695
696		Ok(Some((result_row, is_new)))
697	}
698
699	/// Process expired windows: emit Remove diffs for each, then delete state.
700	/// Uses the group registry for per-group targeted expiration.
701	pub fn process_expired_windows(&self, txn: &mut FlowTransaction, current_timestamp: u64) -> Result<Vec<Diff>> {
702		let mut result = Vec::new();
703
704		if let Some(duration) = self.size_duration() {
705			let window_size_ms = duration.as_millis() as u64;
706			if window_size_ms > 0 {
707				let expire_before = current_timestamp.saturating_sub(window_size_ms * 2);
708				let cutoff_id = expire_before / window_size_ms;
709				if cutoff_id == 0 {
710					return Ok(result);
711				}
712
713				let groups = self.load_group_registry(txn)?;
714				for group_hash in &groups {
715					// Keycode uses inverted ordering (NOT of big-endian)
716					let low_key = self.create_window_key(*group_hash, cutoff_id);
717					let high_key = self.create_window_key(*group_hash, 0);
718					let range = EncodedKeyRange::new(
719						ops::Bound::Excluded(low_key),
720						ops::Bound::Included(high_key),
721					);
722
723					let expired_keys = self.scan_keys_in_range(txn, &range)?;
724					for key in &expired_keys {
725						let window_state = self.load_window_state(txn, key)?;
726						if !window_state.events.is_empty() {
727							if let Some(layout) = &window_state.window_layout {
728								if let Some((row, _)) = self.apply_aggregations(
729									txn,
730									key,
731									layout,
732									&window_state.events,
733								)? {
734									result.push(Diff::Remove {
735										pre: Columns::from_row(&row),
736									});
737								}
738							}
739						}
740					}
741
742					if !expired_keys.is_empty() {
743						let low_key = self.create_window_key(*group_hash, cutoff_id);
744						let high_key = self.create_window_key(*group_hash, 0);
745						let range = EncodedKeyRange::new(
746							ops::Bound::Excluded(low_key),
747							ops::Bound::Included(high_key),
748						);
749						let _ = self.expire_range(txn, range)?;
750					}
751				}
752			}
753		}
754
755		Ok(result)
756	}
757
758	/// Load window state from storage
759	pub fn load_window_state(&self, txn: &mut FlowTransaction, window_key: &EncodedKey) -> Result<WindowState> {
760		let state_row = self.load_state(txn, window_key)?;
761
762		if state_row.is_empty() || !state_row.is_defined(0) {
763			return Ok(WindowState::default());
764		}
765
766		let blob = self.layout.get_blob(&state_row, 0);
767		if blob.is_empty() {
768			return Ok(WindowState::default());
769		}
770
771		from_bytes(blob.as_ref()).map_err(|e| Error(internal!("Failed to deserialize WindowState: {}", e)))
772	}
773
774	/// Save window state to storage
775	pub fn save_window_state(
776		&self,
777		txn: &mut FlowTransaction,
778		window_key: &EncodedKey,
779		state: &WindowState,
780	) -> Result<()> {
781		let serialized =
782			to_stdvec(state).map_err(|e| Error(internal!("Failed to serialize WindowState: {}", e)))?;
783
784		let mut state_row = self.layout.allocate();
785		let blob = Blob::from(serialized);
786		self.layout.set_blob(&mut state_row, 0, &blob);
787
788		self.save_state(txn, window_key, state_row)
789	}
790
791	/// Get and increment global event count for count-based windows
792	pub fn get_and_increment_global_count(&self, txn: &mut FlowTransaction, group_hash: Hash128) -> Result<u64> {
793		let count_key = self.create_count_key(group_hash);
794		let count_row = self.load_state(txn, &count_key)?;
795
796		let current_count = if count_row.is_empty() || !count_row.is_defined(0) {
797			0
798		} else {
799			let blob = self.layout.get_blob(&count_row, 0);
800			if blob.is_empty() {
801				0
802			} else {
803				from_bytes(blob.as_ref()).unwrap_or(0)
804			}
805		};
806
807		let new_count = current_count + 1;
808
809		let serialized =
810			to_stdvec(&new_count).map_err(|e| Error(internal!("Failed to serialize count: {}", e)))?;
811
812		let mut count_state_row = self.layout.allocate();
813		let blob = Blob::from(serialized);
814		self.layout.set_blob(&mut count_state_row, 0, &blob);
815
816		self.save_state(txn, &count_key, count_state_row)?;
817
818		Ok(current_count)
819	}
820
821	/// Create a count key for global event counting
822	pub fn create_count_key(&self, group_hash: Hash128) -> EncodedKey {
823		let mut serializer = KeySerializer::with_capacity(32);
824		serializer.extend_bytes(b"cnt:");
825		serializer.extend_u128(group_hash);
826		EncodedKey::new(serializer.finish())
827	}
828
829	/// Create the group registry key
830	fn create_group_registry_key(&self) -> EncodedKey {
831		EncodedKey::new(b"grp:")
832	}
833
834	/// Load the set of active group hashes from the registry.
835	pub fn load_group_registry(&self, txn: &mut FlowTransaction) -> Result<Vec<Hash128>> {
836		let key = self.create_group_registry_key();
837		let state_row = self.load_state(txn, &key)?;
838		if state_row.is_empty() || !state_row.is_defined(0) {
839			return Ok(Vec::new());
840		}
841		let blob = self.layout.get_blob(&state_row, 0);
842		if blob.is_empty() {
843			return Ok(Vec::new());
844		}
845		let groups: Vec<u128> = from_bytes(blob.as_ref()).unwrap_or_default();
846		Ok(groups.into_iter().map(Hash128::from).collect())
847	}
848
849	/// Save the group registry.
850	fn save_group_registry(&self, txn: &mut FlowTransaction, groups: &[Hash128]) -> Result<()> {
851		let key = self.create_group_registry_key();
852		let raw: Vec<u128> = groups.iter().map(|h| (*h).into()).collect();
853		let serialized =
854			to_stdvec(&raw).map_err(|e| Error(internal!("Failed to serialize group registry: {}", e)))?;
855		let mut state_row = self.layout.allocate();
856		let blob = Blob::from(serialized);
857		self.layout.set_blob(&mut state_row, 0, &blob);
858		self.save_state(txn, &key, state_row)
859	}
860
861	/// Register a group hash in the registry if not already present.
862	pub fn register_group(&self, txn: &mut FlowTransaction, group_hash: Hash128) -> Result<()> {
863		let mut groups = self.load_group_registry(txn)?;
864		if !groups.contains(&group_hash) {
865			groups.push(group_hash);
866			self.save_group_registry(txn, &groups)?;
867		}
868		Ok(())
869	}
870
871	/// Tick-based window expiration for tumbling/sliding windows.
872	/// Scans all operator state, finds expired "win:" windows, emits Remove and cleans up.
873	pub fn tick_expire_windows(&self, txn: &mut FlowTransaction, current_timestamp: u64) -> Result<Vec<Diff>> {
874		let mut result = Vec::new();
875		let window_size_ms = match self.size_duration() {
876			Some(d) => d.as_millis() as u64,
877			None => return Ok(result),
878		};
879		if window_size_ms == 0 {
880			return Ok(result);
881		}
882
883		// Scan all state for this operator
884		let all_state = txn.state_scan(self.node)?;
885		let prefix = FlowNodeStateKey::new(self.node, vec![]).encode();
886		let win_marker = b"win:";
887
888		let mut keys_to_remove = Vec::new();
889
890		for item in &all_state.items {
891			// Strip operator prefix to get the inner key
892			let full_key = &item.key;
893			if full_key.len() <= prefix.len() {
894				continue;
895			}
896			let inner = &full_key[prefix.len()..];
897
898			// Only process "win:" keys
899			if !inner.starts_with(win_marker) {
900				continue;
901			}
902
903			let window_key = EncodedKey::new(inner);
904			let window_state = self.load_window_state(txn, &window_key)?;
905			if window_state.events.is_empty() {
906				continue;
907			}
908
909			// Check if window is expired: newest event older than window size
910			let newest_event_time = window_state.events.iter().map(|e| e.timestamp).max().unwrap_or(0);
911			if current_timestamp.saturating_sub(newest_event_time) > window_size_ms {
912				if let Some(layout) = &window_state.window_layout {
913					if let Some((row, _)) =
914						self.apply_aggregations(txn, &window_key, layout, &window_state.events)?
915					{
916						result.push(Diff::Remove {
917							pre: Columns::from_row(&row),
918						});
919					}
920				}
921				keys_to_remove.push(window_key);
922			}
923		}
924
925		// Clean up expired windows
926		for key in &keys_to_remove {
927			let empty = self.create_state();
928			self.save_state(txn, key, empty)?;
929		}
930
931		Ok(result)
932	}
933
934	/// Shared: partition columns by group keys and call `group_fn` for each group.
935	pub fn process_insert(
936		&self,
937		txn: &mut FlowTransaction,
938		columns: &Columns,
939		group_fn: impl Fn(&WindowOperator, &mut FlowTransaction, &Columns, Hash128) -> Result<Vec<Diff>>,
940	) -> Result<Vec<Diff>> {
941		let row_count = columns.row_count();
942		if row_count == 0 {
943			return Ok(Vec::new());
944		}
945		let group_hashes = self.compute_group_keys(columns)?;
946		let groups = columns.partition_by_keys(&group_hashes);
947		let mut result = Vec::new();
948		for (group_hash, group_columns) in groups {
949			self.register_group(txn, group_hash)?;
950			let group_result = group_fn(self, txn, &group_columns, group_hash)?;
951			result.extend(group_result);
952		}
953		Ok(result)
954	}
955
956	/// Shared: iterate change diffs and process inserts/updates via `process_fn`.
957	/// Optionally runs expiration first (all kinds except rolling).
958	pub fn apply_window_change(
959		&self,
960		txn: &mut FlowTransaction,
961		change: &Change,
962		expire: bool,
963		process_fn: impl Fn(&WindowOperator, &mut FlowTransaction, &Columns) -> Result<Vec<Diff>>,
964	) -> Result<Vec<Diff>> {
965		let mut result = Vec::new();
966		if expire {
967			let current_timestamp = self.current_timestamp();
968			let expired_diffs = self.process_expired_windows(txn, current_timestamp)?;
969			result.extend(expired_diffs);
970		}
971		for diff in change.diffs.iter() {
972			match diff {
973				Diff::Insert {
974					post,
975				} => {
976					result.extend(process_fn(self, txn, post)?);
977				}
978				Diff::Update {
979					pre,
980					post,
981				} => {
982					result.extend(self.process_event_updates(txn, pre, post)?);
983				}
984				Diff::Remove {
985					pre,
986				} => {
987					result.extend(self.process_event_removals(txn, pre)?);
988				}
989			}
990		}
991		Ok(result)
992	}
993
994	/// Shared: emit an Insert or Update diff for an aggregation result.
995	/// `previous_aggregation` is the pre-update state (if the window already existed).
996	pub fn emit_aggregation_diff(
997		aggregated_row: &Row,
998		is_new: bool,
999		previous_aggregation: Option<(Row, bool)>,
1000	) -> Diff {
1001		if is_new {
1002			Diff::Insert {
1003				post: Columns::from_row(aggregated_row),
1004			}
1005		} else if let Some((previous_row, _)) = previous_aggregation {
1006			Diff::Update {
1007				pre: Columns::from_row(&previous_row),
1008				post: Columns::from_row(aggregated_row),
1009			}
1010		} else {
1011			Diff::Insert {
1012				post: Columns::from_row(aggregated_row),
1013			}
1014		}
1015	}
1016}
1017
1018impl RawStatefulOperator for WindowOperator {}
1019
1020impl WindowStateful for WindowOperator {
1021	fn layout(&self) -> RowSchema {
1022		self.layout.clone()
1023	}
1024}
1025
1026impl Operator for WindowOperator {
1027	fn id(&self) -> FlowNodeId {
1028		self.node
1029	}
1030
1031	fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
1032		match &self.kind {
1033			WindowKind::Tumbling {
1034				..
1035			} => apply_tumbling_window(self, txn, change),
1036			WindowKind::Sliding {
1037				..
1038			} => apply_sliding_window(self, txn, change),
1039			WindowKind::Rolling {
1040				..
1041			} => apply_rolling_window(self, txn, change),
1042			WindowKind::Session {
1043				..
1044			} => apply_session_window(self, txn, change),
1045		}
1046	}
1047
1048	fn tick(&self, txn: &mut FlowTransaction, timestamp: DateTime) -> Result<Option<Change>> {
1049		let current_timestamp = (timestamp.to_nanos() / 1_000_000) as u64;
1050		let diffs = match &self.kind {
1051			WindowKind::Tumbling {
1052				..
1053			}
1054			| WindowKind::Sliding {
1055				..
1056			} => self.tick_expire_windows(txn, current_timestamp)?,
1057			WindowKind::Rolling {
1058				size: WindowSize::Duration(_),
1059			} => self.tick_rolling_eviction(txn, current_timestamp)?,
1060			WindowKind::Session {
1061				..
1062			} => self.tick_session_expiration(txn, current_timestamp)?,
1063			_ => vec![],
1064		};
1065
1066		if diffs.is_empty() {
1067			Ok(None)
1068		} else {
1069			Ok(Some(Change::from_flow(self.node, CommitVersion(0), diffs)))
1070		}
1071	}
1072
1073	fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
1074		self.parent.pull(txn, rows)
1075	}
1076}