Skip to main content

reifydb_sub_flow/operator/window/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::Arc;
5
6use reifydb_core::{
7	common::{WindowSize, WindowSlide, WindowTimeMode, WindowType},
8	interface::catalog::flow::FlowNodeId,
9	internal,
10};
11use serde::{Deserialize, Serialize};
12
13use crate::{
14	operator::{Operator, Operators},
15	transaction::FlowTransaction,
16};
17
18pub mod rolling;
19pub mod sliding;
20pub mod tumbling;
21
22use rolling::apply_rolling_window;
23use sliding::apply_sliding_window;
24use tumbling::apply_tumbling_window;
25
26static EMPTY_PARAMS: Params = Params::None;
27
28use std::{ops, sync::LazyLock, time};
29
30use postcard::{from_bytes, to_stdvec};
31use reifydb_core::{
32	encoded::{
33		encoded::EncodedValues,
34		key::{EncodedKey, EncodedKeyRange},
35		schema::{Schema, SchemaField},
36	},
37	interface::change::{Change, Diff},
38	row::Row,
39	util::encoding::keycode::serializer::KeySerializer,
40	value::column::{Column, columns::Columns, data::ColumnData},
41};
42use reifydb_engine::{
43	expression::{
44		compile::{CompiledExpr, compile_expression},
45		context::{CompileContext, EvalContext},
46	},
47	vm::stack::SymbolTable,
48};
49use reifydb_function::registry::Functions;
50use reifydb_rql::expression::{Expression, name::column_name_from_expression};
51use reifydb_runtime::{
52	clock::Clock,
53	hash::{Hash128, xxh3_128},
54};
55use reifydb_type::{
56	Result,
57	error::Error,
58	fragment::Fragment,
59	params::Params,
60	util::cowvec::CowVec,
61	value::{Value, blob::Blob, identity::IdentityId, row_number::RowNumber, r#type::Type},
62};
63
64use crate::operator::stateful::{raw::RawStatefulOperator, row::RowNumberProvider, window::WindowStateful};
65
66static EMPTY_SYMBOL_TABLE: LazyLock<SymbolTable> = LazyLock::new(|| SymbolTable::new());
67
68/// A single event stored within a window
69#[derive(Debug, Clone, Serialize, Deserialize)]
70pub struct WindowEvent {
71	pub row_number: RowNumber,
72	pub timestamp: u64, // System timestamp in milliseconds
73	#[serde(with = "serde_bytes")]
74	pub encoded_bytes: Vec<u8>,
75	pub layout_names: Vec<String>,
76	pub layout_types: Vec<Type>,
77}
78
79impl WindowEvent {
80	pub fn from_row(row: &Row, timestamp: u64) -> Self {
81		let names: Vec<String> = row.schema.field_names().map(|s| s.to_string()).collect();
82		let types: Vec<Type> = row.schema.fields().iter().map(|f| f.constraint.get_type()).collect();
83
84		let mut stored_values = Vec::new();
85		for (i, _field) in row.schema.fields().iter().enumerate() {
86			let value = row.schema.get_value(&row.encoded, i);
87			stored_values.push(format!("{:?}", value));
88		}
89
90		Self {
91			row_number: row.number,
92			timestamp,
93			encoded_bytes: row.encoded.as_slice().to_vec(),
94			layout_names: names,
95			layout_types: types,
96		}
97	}
98
99	pub fn to_row(&self) -> Row {
100		let fields: Vec<SchemaField> = self
101			.layout_names
102			.iter()
103			.zip(self.layout_types.iter())
104			.map(|(name, ty)| SchemaField::unconstrained(name.clone(), ty.clone()))
105			.collect();
106
107		let layout = Schema::new(fields);
108		let encoded = EncodedValues(CowVec::new(self.encoded_bytes.clone()));
109
110		let row = Row {
111			number: self.row_number,
112			encoded,
113			schema: layout,
114		};
115
116		row
117	}
118}
119
120/// State for a single window
121#[derive(Debug, Clone, Serialize, Deserialize)]
122pub struct WindowState {
123	/// All events in this window (stored in insertion order)
124	pub events: Vec<WindowEvent>,
125	/// Window creation timestamp
126	pub window_start: u64,
127	/// Count of events in window (for count-based windows)
128	pub event_count: u64,
129	/// Whether this window has been triggered/computed
130	pub is_triggered: bool,
131}
132
133impl Default for WindowState {
134	fn default() -> Self {
135		Self {
136			events: Vec::new(),
137			window_start: 0,
138			event_count: 0,
139			is_triggered: false,
140		}
141	}
142}
143
144/// The main window operator
145pub struct WindowOperator {
146	pub parent: Arc<Operators>,
147	pub node: FlowNodeId,
148	pub window_type: WindowType,
149	pub size: WindowSize,
150	pub slide: Option<WindowSlide>,
151	pub group_by: Vec<Expression>,
152	pub aggregations: Vec<Expression>,
153	pub compiled_group_by: Vec<CompiledExpr>,
154	pub compiled_aggregations: Vec<CompiledExpr>,
155	pub layout: Schema,
156	pub functions: Functions,
157	pub row_number_provider: RowNumberProvider,
158	pub min_events: usize,               // Minimum events required before window becomes visible
159	pub max_window_count: Option<usize>, // Maximum number of windows to keep per group
160	pub max_window_age: Option<time::Duration>, // Maximum age of windows before expiration
161	pub clock: Clock,
162}
163
164impl WindowOperator {
165	pub fn new(
166		parent: Arc<Operators>,
167		node: FlowNodeId,
168		window_type: WindowType,
169		size: WindowSize,
170		slide: Option<WindowSlide>,
171		group_by: Vec<Expression>,
172		aggregations: Vec<Expression>,
173		min_events: usize,
174		max_window_count: Option<usize>,
175		max_window_age: Option<time::Duration>,
176		clock: Clock,
177		functions: Functions,
178	) -> Self {
179		let symbol_table = SymbolTable::new();
180		let compile_ctx = CompileContext {
181			functions: &functions,
182			symbol_table: &symbol_table,
183		};
184
185		// Compile group_by expressions
186		let compiled_group_by: Vec<CompiledExpr> = group_by
187			.iter()
188			.map(|e| compile_expression(&compile_ctx, e).expect("Failed to compile group_by expression"))
189			.collect();
190
191		// Compile aggregation expressions
192		let compiled_aggregations: Vec<CompiledExpr> = aggregations
193			.iter()
194			.map(|e| compile_expression(&compile_ctx, e).expect("Failed to compile aggregation expression"))
195			.collect();
196
197		Self {
198			parent,
199			node,
200			window_type,
201			size,
202			slide,
203			group_by,
204			aggregations,
205			compiled_group_by,
206			compiled_aggregations,
207			layout: Schema::testing(&[Type::Blob]),
208			functions,
209			row_number_provider: RowNumberProvider::new(node),
210			min_events: min_events.max(1), // Ensure at least 1 event is required
211			max_window_count,
212			max_window_age,
213			clock,
214		}
215	}
216
217	/// Get the current timestamp in milliseconds
218	pub fn current_timestamp(&self) -> u64 {
219		self.clock.now_millis()
220	}
221
222	/// Compute group keys for all rows in Columns
223	pub fn compute_group_keys(&self, columns: &Columns) -> Result<Vec<Hash128>> {
224		let row_count = columns.row_count();
225		if row_count == 0 {
226			return Ok(Vec::new());
227		}
228
229		if self.compiled_group_by.is_empty() {
230			return Ok(vec![Hash128::from(0u128); row_count]);
231		}
232
233		let exec_ctx = EvalContext {
234			target: None,
235			columns: columns.clone(),
236			row_count,
237			take: None,
238			params: &EMPTY_PARAMS,
239			symbol_table: &EMPTY_SYMBOL_TABLE,
240			is_aggregate_context: false,
241			functions: &self.functions,
242			clock: &self.clock,
243			arena: None,
244			identity: IdentityId::root(),
245		};
246
247		let mut group_columns: Vec<Column> = Vec::new();
248		for compiled_expr in &self.compiled_group_by {
249			let col = compiled_expr.execute(&exec_ctx)?;
250			group_columns.push(col);
251		}
252
253		let mut hashes = Vec::with_capacity(row_count);
254		for row_idx in 0..row_count {
255			let mut data = Vec::new();
256			for col in &group_columns {
257				let value = col.data().get_value(row_idx);
258				let value_str = value.to_string();
259				data.extend_from_slice(value_str.as_bytes());
260			}
261			hashes.push(xxh3_128(&data));
262		}
263
264		Ok(hashes)
265	}
266
267	/// Extract timestamps for all rows in Columns
268	pub fn extract_timestamps(&self, columns: &Columns) -> Result<Vec<u64>> {
269		let row_count = columns.row_count();
270		if row_count == 0 {
271			return Ok(Vec::new());
272		}
273
274		match &self.window_type {
275			WindowType::Time(time_mode) => match time_mode {
276				WindowTimeMode::Processing => {
277					let now = self.current_timestamp();
278					Ok(vec![now; row_count])
279				}
280				WindowTimeMode::EventTime(column_name) => {
281					if let Some(col) = columns.column(column_name) {
282						let mut timestamps = Vec::with_capacity(row_count);
283						for row_idx in 0..row_count {
284							let value = col.data().get_value(row_idx);
285
286							let ts = match value {
287								Value::Int8(v) => v as u64,
288								Value::Uint8(v) => v,
289								Value::Int4(v) => v as u64,
290								Value::Uint4(v) => v as u64,
291								Value::DateTime(dt) => dt.timestamp_millis() as u64,
292								_ => {
293									return Err(Error(internal!(
294										"Cannot convert {:?} to timestamp",
295										value.get_type()
296									)));
297								}
298							};
299							timestamps.push(ts);
300						}
301						Ok(timestamps)
302					} else {
303						Err(Error(internal!(
304							"Event time column '{}' not found in columns",
305							column_name
306						)))
307					}
308				}
309			},
310			WindowType::Count => {
311				let now = self.current_timestamp();
312				Ok(vec![now; row_count])
313			}
314		}
315	}
316
317	/// Create a window key for storage
318	pub fn create_window_key(&self, group_hash: Hash128, window_id: u64) -> EncodedKey {
319		let mut serializer = KeySerializer::with_capacity(32);
320		serializer.extend_bytes(b"win:");
321		serializer.extend_u128(group_hash);
322		serializer.extend_u64(window_id);
323		EncodedKey::new(serializer.finish())
324	}
325
326	/// Extract timestamp from row data
327	pub fn extract_timestamp_from_row(&self, row: &Row) -> Result<u64> {
328		match &self.window_type {
329			WindowType::Time(time_mode) => match time_mode {
330				WindowTimeMode::Processing => Ok(self.current_timestamp()),
331				WindowTimeMode::EventTime(column_name) => {
332					if let Some(timestamp_index) = row.schema.find_field_index(column_name) {
333						let timestamp_value = row.schema.get_i64(&row.encoded, timestamp_index);
334						Ok(timestamp_value as u64)
335					} else {
336						let column_names: Vec<&str> = row.schema.field_names().collect();
337						Err(Error(internal!(
338							"Event time column '{}' not found in row with columns: {:?}",
339							column_name,
340							column_names
341						)))
342					}
343				}
344			},
345			WindowType::Count => {
346				unreachable!(
347					"extract_timestamp_from_row should never be called for count-based windows"
348				)
349			}
350		}
351	}
352
353	/// Extract group values from window events (all events in a group have the same group values)
354	/// TODO: Refactor to use column-based evaluation when window operator is needed
355	pub fn extract_group_values(&self, events: &[WindowEvent]) -> Result<(Vec<Value>, Vec<String>)> {
356		if events.is_empty() || self.group_by.is_empty() {
357			return Ok((Vec::new(), Vec::new()));
358		}
359
360		// DISABLED: Window operator needs refactoring to use column-based evaluation
361
362		unimplemented!("Window operator extract_group_values needs refactoring to use column-based evaluation")
363	}
364
365	/// Convert window events to columnar format for aggregation
366	pub fn events_to_columns(&self, events: &[WindowEvent]) -> Result<Columns> {
367		if events.is_empty() {
368			return Ok(Columns::new(Vec::new()));
369		}
370
371		let first_event = &events[0];
372		let mut columns = Vec::new();
373
374		for (field_idx, (field_name, field_type)) in
375			first_event.layout_names.iter().zip(first_event.layout_types.iter()).enumerate()
376		{
377			let mut column_data = ColumnData::with_capacity(field_type.clone(), events.len());
378
379			for (_event_idx, event) in events.iter().enumerate() {
380				let row = event.to_row();
381				let value = row.schema.get_value(&row.encoded, field_idx);
382				column_data.push_value(value);
383			}
384
385			columns.push(Column {
386				name: Fragment::internal(field_name.clone()),
387				data: column_data,
388			});
389		}
390
391		Ok(Columns::new(columns))
392	}
393
394	/// Apply aggregations to all events in a window
395	pub fn apply_aggregations(
396		&self,
397		txn: &mut FlowTransaction,
398		window_key: &EncodedKey,
399		events: &[WindowEvent],
400	) -> Result<Option<(Row, bool)>> {
401		if events.is_empty() {
402			return Ok(None);
403		}
404
405		if self.aggregations.is_empty() {
406			// No aggregations configured, return first event as result
407			let (result_row_number, is_new) =
408				self.row_number_provider.get_or_create_row_number(txn, window_key)?;
409			let mut result_row = events[0].to_row();
410			result_row.number = result_row_number;
411			return Ok(Some((result_row, is_new)));
412		}
413
414		let columns = self.events_to_columns(events)?;
415
416		let exec_ctx = EvalContext {
417			target: None,
418			columns,
419			row_count: events.len(),
420			take: None,
421			params: &EMPTY_PARAMS,
422			symbol_table: &EMPTY_SYMBOL_TABLE,
423			is_aggregate_context: true, // Use aggregate functions for window aggregations
424			functions: &self.functions,
425			clock: &self.clock,
426			arena: None,
427			identity: IdentityId::root(),
428		};
429
430		let (group_values, group_names) = self.extract_group_values(events)?;
431
432		let mut result_values = Vec::new();
433		let mut result_names = Vec::new();
434		let mut result_types = Vec::new();
435
436		for (value, name) in group_values.into_iter().zip(group_names.into_iter()) {
437			result_values.push(value.clone());
438			result_names.push(name);
439			result_types.push(value.get_type());
440		}
441
442		for (i, compiled_aggregation) in self.compiled_aggregations.iter().enumerate() {
443			let agg_column = compiled_aggregation.execute(&exec_ctx)?;
444
445			let value = agg_column.data().get_value(0);
446			result_values.push(value.clone());
447			result_names.push(column_name_from_expression(&self.aggregations[i]).text().to_string());
448			result_types.push(value.get_type());
449		}
450
451		let fields: Vec<SchemaField> = result_names
452			.iter()
453			.zip(result_types.iter())
454			.map(|(name, ty)| SchemaField::unconstrained(name.clone(), ty.clone()))
455			.collect();
456		let layout = Schema::new(fields);
457		let mut encoded = layout.allocate();
458		layout.set_values(&mut encoded, &result_values);
459
460		let (result_row_number, is_new) = self.row_number_provider.get_or_create_row_number(txn, window_key)?;
461
462		let result_row = Row {
463			number: result_row_number,
464			encoded,
465			schema: layout,
466		};
467
468		Ok(Some((result_row, is_new)))
469	}
470
471	/// Process expired windows and clean up state
472	pub fn process_expired_windows(&self, txn: &mut FlowTransaction, current_timestamp: u64) -> Result<Vec<Diff>> {
473		let result = Vec::new();
474
475		if let (WindowType::Time(_), WindowSize::Duration(duration)) = (&self.window_type, &self.size) {
476			let window_size_ms = duration.as_millis() as u64;
477			let expire_before = current_timestamp.saturating_sub(window_size_ms * 2); // Keep 2 window sizes
478
479			// all group keys and clean up expired windows for each group
480			let before_key = self.create_window_key(Hash128::from(0u128), expire_before / window_size_ms);
481			let range = EncodedKeyRange::new(ops::Bound::Excluded(before_key), ops::Bound::Unbounded);
482
483			let _expired_count = self.expire_range(txn, range)?;
484		}
485
486		Ok(result)
487	}
488
489	/// Load window state from storage
490	pub fn load_window_state(&self, txn: &mut FlowTransaction, window_key: &EncodedKey) -> Result<WindowState> {
491		let state_row = self.load_state(txn, window_key)?;
492
493		if state_row.is_empty() || !state_row.is_defined(0) {
494			return Ok(WindowState::default());
495		}
496
497		let blob = self.layout.get_blob(&state_row, 0);
498		if blob.is_empty() {
499			return Ok(WindowState::default());
500		}
501
502		from_bytes(blob.as_ref()).map_err(|e| Error(internal!("Failed to deserialize WindowState: {}", e)))
503	}
504
505	/// Save window state to storage
506	pub fn save_window_state(
507		&self,
508		txn: &mut FlowTransaction,
509		window_key: &EncodedKey,
510		state: &WindowState,
511	) -> Result<()> {
512		let serialized =
513			to_stdvec(state).map_err(|e| Error(internal!("Failed to serialize WindowState: {}", e)))?;
514
515		let mut state_row = self.layout.allocate();
516		let blob = Blob::from(serialized);
517		self.layout.set_blob(&mut state_row, 0, &blob);
518
519		self.save_state(txn, window_key, state_row)
520	}
521
522	/// Get and increment global event count for count-based windows
523	pub fn get_and_increment_global_count(&self, txn: &mut FlowTransaction, group_hash: Hash128) -> Result<u64> {
524		let count_key = self.create_count_key(group_hash);
525		let count_row = self.load_state(txn, &count_key)?;
526
527		let current_count = if count_row.is_empty() || !count_row.is_defined(0) {
528			0
529		} else {
530			let blob = self.layout.get_blob(&count_row, 0);
531			if blob.is_empty() {
532				0
533			} else {
534				from_bytes(blob.as_ref()).unwrap_or(0)
535			}
536		};
537
538		let new_count = current_count + 1;
539
540		let serialized =
541			to_stdvec(&new_count).map_err(|e| Error(internal!("Failed to serialize count: {}", e)))?;
542
543		let mut count_state_row = self.layout.allocate();
544		let blob = Blob::from(serialized);
545		self.layout.set_blob(&mut count_state_row, 0, &blob);
546
547		self.save_state(txn, &count_key, count_state_row)?;
548
549		Ok(current_count)
550	}
551
552	/// Create a count key for global event counting
553	pub fn create_count_key(&self, group_hash: Hash128) -> EncodedKey {
554		let mut serializer = KeySerializer::with_capacity(32);
555		serializer.extend_bytes(b"cnt:");
556		serializer.extend_u128(group_hash);
557		EncodedKey::new(serializer.finish())
558	}
559}
560
561impl RawStatefulOperator for WindowOperator {}
562
563impl WindowStateful for WindowOperator {
564	fn layout(&self) -> Schema {
565		self.layout.clone()
566	}
567}
568
569impl Operator for WindowOperator {
570	fn id(&self) -> FlowNodeId {
571		self.node
572	}
573
574	fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
575		// We'll need to refactor the architecture to support this properly.
576
577		match &self.slide {
578			Some(WindowSlide::Rolling) => apply_rolling_window(self, txn, change),
579			Some(_) => apply_sliding_window(self, txn, change),
580			None => apply_tumbling_window(self, txn, change),
581		}
582	}
583
584	fn pull(&self, _txn: &mut FlowTransaction, _rows: &[RowNumber]) -> Result<Columns> {
585		todo!()
586	}
587}
588
589/// Additional helper methods for window triggering
590impl WindowOperator {
591	/// Check if a window should be triggered (emitted)
592	pub fn should_trigger_window(&self, state: &WindowState, current_timestamp: u64) -> bool {
593		match (&self.window_type, &self.size, &self.slide) {
594			// Tumbling windows (no slide): emit immediately when events arrive (streaming behavior)
595			(WindowType::Time(_), WindowSize::Duration(_), None) => {
596				if state.event_count > 0 {
597					return true;
598				}
599				false
600			}
601			// Sliding windows: use time-based triggering
602
603			// but allow multiple triggers as the window slides
604			(WindowType::Time(_), WindowSize::Duration(duration), Some(_)) => {
605				// This allows overlapping windows to emit results independently
606				if state.event_count > 0 {
607					let window_size_ms = duration.as_millis() as u64;
608					let trigger_time = state.window_start + window_size_ms;
609					current_timestamp >= trigger_time
610				} else {
611					false
612				}
613			}
614			// Count-based tumbling windows: trigger when count threshold is reached
615			(WindowType::Count, WindowSize::Count(count), None) => state.event_count >= *count,
616			// Count-based sliding windows: trigger when min_events threshold is met
617			(WindowType::Count, WindowSize::Count(_count), Some(_)) => {
618				state.event_count >= self.min_events as u64
619			}
620			_ => false,
621		}
622	}
623}