1use std::sync::Arc;
5
6use reifydb_core::{
7 common::{WindowSize, WindowSlide, WindowTimeMode, WindowType},
8 interface::catalog::flow::FlowNodeId,
9 internal,
10};
11use serde::{Deserialize, Serialize};
12
13use crate::{
14 operator::{Operator, Operators},
15 transaction::FlowTransaction,
16};
17
18pub mod rolling;
19pub mod sliding;
20pub mod tumbling;
21
22use rolling::apply_rolling_window;
23use sliding::apply_sliding_window;
24use tumbling::apply_tumbling_window;
25
26static EMPTY_PARAMS: Params = Params::None;
27
28use std::{ops, sync::LazyLock, time};
29
30use postcard::{from_bytes, to_stdvec};
31use reifydb_core::{
32 encoded::{
33 encoded::EncodedValues,
34 key::{EncodedKey, EncodedKeyRange},
35 schema::{Schema, SchemaField},
36 },
37 interface::change::{Change, Diff},
38 row::Row,
39 util::encoding::keycode::serializer::KeySerializer,
40 value::column::{Column, columns::Columns, data::ColumnData},
41};
42use reifydb_engine::{
43 expression::{
44 compile::{CompiledExpr, compile_expression},
45 context::{CompileContext, EvalContext},
46 },
47 vm::stack::SymbolTable,
48};
49use reifydb_function::registry::Functions;
50use reifydb_rql::expression::{Expression, name::column_name_from_expression};
51use reifydb_runtime::{
52 clock::Clock,
53 hash::{Hash128, xxh3_128},
54};
55use reifydb_type::{
56 Result,
57 error::Error,
58 fragment::Fragment,
59 params::Params,
60 util::cowvec::CowVec,
61 value::{Value, blob::Blob, identity::IdentityId, row_number::RowNumber, r#type::Type},
62};
63
64use crate::operator::stateful::{raw::RawStatefulOperator, row::RowNumberProvider, window::WindowStateful};
65
66static EMPTY_SYMBOL_TABLE: LazyLock<SymbolTable> = LazyLock::new(|| SymbolTable::new());
67
68#[derive(Debug, Clone, Serialize, Deserialize)]
70pub struct WindowEvent {
71 pub row_number: RowNumber,
72 pub timestamp: u64, #[serde(with = "serde_bytes")]
74 pub encoded_bytes: Vec<u8>,
75 pub layout_names: Vec<String>,
76 pub layout_types: Vec<Type>,
77}
78
79impl WindowEvent {
80 pub fn from_row(row: &Row, timestamp: u64) -> Self {
81 let names: Vec<String> = row.schema.field_names().map(|s| s.to_string()).collect();
82 let types: Vec<Type> = row.schema.fields().iter().map(|f| f.constraint.get_type()).collect();
83
84 let mut stored_values = Vec::new();
85 for (i, _field) in row.schema.fields().iter().enumerate() {
86 let value = row.schema.get_value(&row.encoded, i);
87 stored_values.push(format!("{:?}", value));
88 }
89
90 Self {
91 row_number: row.number,
92 timestamp,
93 encoded_bytes: row.encoded.as_slice().to_vec(),
94 layout_names: names,
95 layout_types: types,
96 }
97 }
98
99 pub fn to_row(&self) -> Row {
100 let fields: Vec<SchemaField> = self
101 .layout_names
102 .iter()
103 .zip(self.layout_types.iter())
104 .map(|(name, ty)| SchemaField::unconstrained(name.clone(), ty.clone()))
105 .collect();
106
107 let layout = Schema::new(fields);
108 let encoded = EncodedValues(CowVec::new(self.encoded_bytes.clone()));
109
110 let row = Row {
111 number: self.row_number,
112 encoded,
113 schema: layout,
114 };
115
116 row
117 }
118}
119
120#[derive(Debug, Clone, Serialize, Deserialize)]
122pub struct WindowState {
123 pub events: Vec<WindowEvent>,
125 pub window_start: u64,
127 pub event_count: u64,
129 pub is_triggered: bool,
131}
132
133impl Default for WindowState {
134 fn default() -> Self {
135 Self {
136 events: Vec::new(),
137 window_start: 0,
138 event_count: 0,
139 is_triggered: false,
140 }
141 }
142}
143
144pub struct WindowOperator {
146 pub parent: Arc<Operators>,
147 pub node: FlowNodeId,
148 pub window_type: WindowType,
149 pub size: WindowSize,
150 pub slide: Option<WindowSlide>,
151 pub group_by: Vec<Expression>,
152 pub aggregations: Vec<Expression>,
153 pub compiled_group_by: Vec<CompiledExpr>,
154 pub compiled_aggregations: Vec<CompiledExpr>,
155 pub layout: Schema,
156 pub functions: Functions,
157 pub row_number_provider: RowNumberProvider,
158 pub min_events: usize, pub max_window_count: Option<usize>, pub max_window_age: Option<time::Duration>, pub clock: Clock,
162}
163
164impl WindowOperator {
165 pub fn new(
166 parent: Arc<Operators>,
167 node: FlowNodeId,
168 window_type: WindowType,
169 size: WindowSize,
170 slide: Option<WindowSlide>,
171 group_by: Vec<Expression>,
172 aggregations: Vec<Expression>,
173 min_events: usize,
174 max_window_count: Option<usize>,
175 max_window_age: Option<time::Duration>,
176 clock: Clock,
177 functions: Functions,
178 ) -> Self {
179 let symbol_table = SymbolTable::new();
180 let compile_ctx = CompileContext {
181 functions: &functions,
182 symbol_table: &symbol_table,
183 };
184
185 let compiled_group_by: Vec<CompiledExpr> = group_by
187 .iter()
188 .map(|e| compile_expression(&compile_ctx, e).expect("Failed to compile group_by expression"))
189 .collect();
190
191 let compiled_aggregations: Vec<CompiledExpr> = aggregations
193 .iter()
194 .map(|e| compile_expression(&compile_ctx, e).expect("Failed to compile aggregation expression"))
195 .collect();
196
197 Self {
198 parent,
199 node,
200 window_type,
201 size,
202 slide,
203 group_by,
204 aggregations,
205 compiled_group_by,
206 compiled_aggregations,
207 layout: Schema::testing(&[Type::Blob]),
208 functions,
209 row_number_provider: RowNumberProvider::new(node),
210 min_events: min_events.max(1), max_window_count,
212 max_window_age,
213 clock,
214 }
215 }
216
217 pub fn current_timestamp(&self) -> u64 {
219 self.clock.now_millis()
220 }
221
222 pub fn compute_group_keys(&self, columns: &Columns) -> Result<Vec<Hash128>> {
224 let row_count = columns.row_count();
225 if row_count == 0 {
226 return Ok(Vec::new());
227 }
228
229 if self.compiled_group_by.is_empty() {
230 return Ok(vec![Hash128::from(0u128); row_count]);
231 }
232
233 let exec_ctx = EvalContext {
234 target: None,
235 columns: columns.clone(),
236 row_count,
237 take: None,
238 params: &EMPTY_PARAMS,
239 symbol_table: &EMPTY_SYMBOL_TABLE,
240 is_aggregate_context: false,
241 functions: &self.functions,
242 clock: &self.clock,
243 arena: None,
244 identity: IdentityId::root(),
245 };
246
247 let mut group_columns: Vec<Column> = Vec::new();
248 for compiled_expr in &self.compiled_group_by {
249 let col = compiled_expr.execute(&exec_ctx)?;
250 group_columns.push(col);
251 }
252
253 let mut hashes = Vec::with_capacity(row_count);
254 for row_idx in 0..row_count {
255 let mut data = Vec::new();
256 for col in &group_columns {
257 let value = col.data().get_value(row_idx);
258 let value_str = value.to_string();
259 data.extend_from_slice(value_str.as_bytes());
260 }
261 hashes.push(xxh3_128(&data));
262 }
263
264 Ok(hashes)
265 }
266
267 pub fn extract_timestamps(&self, columns: &Columns) -> Result<Vec<u64>> {
269 let row_count = columns.row_count();
270 if row_count == 0 {
271 return Ok(Vec::new());
272 }
273
274 match &self.window_type {
275 WindowType::Time(time_mode) => match time_mode {
276 WindowTimeMode::Processing => {
277 let now = self.current_timestamp();
278 Ok(vec![now; row_count])
279 }
280 WindowTimeMode::EventTime(column_name) => {
281 if let Some(col) = columns.column(column_name) {
282 let mut timestamps = Vec::with_capacity(row_count);
283 for row_idx in 0..row_count {
284 let value = col.data().get_value(row_idx);
285
286 let ts = match value {
287 Value::Int8(v) => v as u64,
288 Value::Uint8(v) => v,
289 Value::Int4(v) => v as u64,
290 Value::Uint4(v) => v as u64,
291 Value::DateTime(dt) => dt.timestamp_millis() as u64,
292 _ => {
293 return Err(Error(internal!(
294 "Cannot convert {:?} to timestamp",
295 value.get_type()
296 )));
297 }
298 };
299 timestamps.push(ts);
300 }
301 Ok(timestamps)
302 } else {
303 Err(Error(internal!(
304 "Event time column '{}' not found in columns",
305 column_name
306 )))
307 }
308 }
309 },
310 WindowType::Count => {
311 let now = self.current_timestamp();
312 Ok(vec![now; row_count])
313 }
314 }
315 }
316
317 pub fn create_window_key(&self, group_hash: Hash128, window_id: u64) -> EncodedKey {
319 let mut serializer = KeySerializer::with_capacity(32);
320 serializer.extend_bytes(b"win:");
321 serializer.extend_u128(group_hash);
322 serializer.extend_u64(window_id);
323 EncodedKey::new(serializer.finish())
324 }
325
326 pub fn extract_timestamp_from_row(&self, row: &Row) -> Result<u64> {
328 match &self.window_type {
329 WindowType::Time(time_mode) => match time_mode {
330 WindowTimeMode::Processing => Ok(self.current_timestamp()),
331 WindowTimeMode::EventTime(column_name) => {
332 if let Some(timestamp_index) = row.schema.find_field_index(column_name) {
333 let timestamp_value = row.schema.get_i64(&row.encoded, timestamp_index);
334 Ok(timestamp_value as u64)
335 } else {
336 let column_names: Vec<&str> = row.schema.field_names().collect();
337 Err(Error(internal!(
338 "Event time column '{}' not found in row with columns: {:?}",
339 column_name,
340 column_names
341 )))
342 }
343 }
344 },
345 WindowType::Count => {
346 unreachable!(
347 "extract_timestamp_from_row should never be called for count-based windows"
348 )
349 }
350 }
351 }
352
353 pub fn extract_group_values(&self, events: &[WindowEvent]) -> Result<(Vec<Value>, Vec<String>)> {
356 if events.is_empty() || self.group_by.is_empty() {
357 return Ok((Vec::new(), Vec::new()));
358 }
359
360 unimplemented!("Window operator extract_group_values needs refactoring to use column-based evaluation")
363 }
364
365 pub fn events_to_columns(&self, events: &[WindowEvent]) -> Result<Columns> {
367 if events.is_empty() {
368 return Ok(Columns::new(Vec::new()));
369 }
370
371 let first_event = &events[0];
372 let mut columns = Vec::new();
373
374 for (field_idx, (field_name, field_type)) in
375 first_event.layout_names.iter().zip(first_event.layout_types.iter()).enumerate()
376 {
377 let mut column_data = ColumnData::with_capacity(field_type.clone(), events.len());
378
379 for (_event_idx, event) in events.iter().enumerate() {
380 let row = event.to_row();
381 let value = row.schema.get_value(&row.encoded, field_idx);
382 column_data.push_value(value);
383 }
384
385 columns.push(Column {
386 name: Fragment::internal(field_name.clone()),
387 data: column_data,
388 });
389 }
390
391 Ok(Columns::new(columns))
392 }
393
394 pub fn apply_aggregations(
396 &self,
397 txn: &mut FlowTransaction,
398 window_key: &EncodedKey,
399 events: &[WindowEvent],
400 ) -> Result<Option<(Row, bool)>> {
401 if events.is_empty() {
402 return Ok(None);
403 }
404
405 if self.aggregations.is_empty() {
406 let (result_row_number, is_new) =
408 self.row_number_provider.get_or_create_row_number(txn, window_key)?;
409 let mut result_row = events[0].to_row();
410 result_row.number = result_row_number;
411 return Ok(Some((result_row, is_new)));
412 }
413
414 let columns = self.events_to_columns(events)?;
415
416 let exec_ctx = EvalContext {
417 target: None,
418 columns,
419 row_count: events.len(),
420 take: None,
421 params: &EMPTY_PARAMS,
422 symbol_table: &EMPTY_SYMBOL_TABLE,
423 is_aggregate_context: true, functions: &self.functions,
425 clock: &self.clock,
426 arena: None,
427 identity: IdentityId::root(),
428 };
429
430 let (group_values, group_names) = self.extract_group_values(events)?;
431
432 let mut result_values = Vec::new();
433 let mut result_names = Vec::new();
434 let mut result_types = Vec::new();
435
436 for (value, name) in group_values.into_iter().zip(group_names.into_iter()) {
437 result_values.push(value.clone());
438 result_names.push(name);
439 result_types.push(value.get_type());
440 }
441
442 for (i, compiled_aggregation) in self.compiled_aggregations.iter().enumerate() {
443 let agg_column = compiled_aggregation.execute(&exec_ctx)?;
444
445 let value = agg_column.data().get_value(0);
446 result_values.push(value.clone());
447 result_names.push(column_name_from_expression(&self.aggregations[i]).text().to_string());
448 result_types.push(value.get_type());
449 }
450
451 let fields: Vec<SchemaField> = result_names
452 .iter()
453 .zip(result_types.iter())
454 .map(|(name, ty)| SchemaField::unconstrained(name.clone(), ty.clone()))
455 .collect();
456 let layout = Schema::new(fields);
457 let mut encoded = layout.allocate();
458 layout.set_values(&mut encoded, &result_values);
459
460 let (result_row_number, is_new) = self.row_number_provider.get_or_create_row_number(txn, window_key)?;
461
462 let result_row = Row {
463 number: result_row_number,
464 encoded,
465 schema: layout,
466 };
467
468 Ok(Some((result_row, is_new)))
469 }
470
471 pub fn process_expired_windows(&self, txn: &mut FlowTransaction, current_timestamp: u64) -> Result<Vec<Diff>> {
473 let result = Vec::new();
474
475 if let (WindowType::Time(_), WindowSize::Duration(duration)) = (&self.window_type, &self.size) {
476 let window_size_ms = duration.as_millis() as u64;
477 let expire_before = current_timestamp.saturating_sub(window_size_ms * 2); let before_key = self.create_window_key(Hash128::from(0u128), expire_before / window_size_ms);
481 let range = EncodedKeyRange::new(ops::Bound::Excluded(before_key), ops::Bound::Unbounded);
482
483 let _expired_count = self.expire_range(txn, range)?;
484 }
485
486 Ok(result)
487 }
488
489 pub fn load_window_state(&self, txn: &mut FlowTransaction, window_key: &EncodedKey) -> Result<WindowState> {
491 let state_row = self.load_state(txn, window_key)?;
492
493 if state_row.is_empty() || !state_row.is_defined(0) {
494 return Ok(WindowState::default());
495 }
496
497 let blob = self.layout.get_blob(&state_row, 0);
498 if blob.is_empty() {
499 return Ok(WindowState::default());
500 }
501
502 from_bytes(blob.as_ref()).map_err(|e| Error(internal!("Failed to deserialize WindowState: {}", e)))
503 }
504
505 pub fn save_window_state(
507 &self,
508 txn: &mut FlowTransaction,
509 window_key: &EncodedKey,
510 state: &WindowState,
511 ) -> Result<()> {
512 let serialized =
513 to_stdvec(state).map_err(|e| Error(internal!("Failed to serialize WindowState: {}", e)))?;
514
515 let mut state_row = self.layout.allocate();
516 let blob = Blob::from(serialized);
517 self.layout.set_blob(&mut state_row, 0, &blob);
518
519 self.save_state(txn, window_key, state_row)
520 }
521
522 pub fn get_and_increment_global_count(&self, txn: &mut FlowTransaction, group_hash: Hash128) -> Result<u64> {
524 let count_key = self.create_count_key(group_hash);
525 let count_row = self.load_state(txn, &count_key)?;
526
527 let current_count = if count_row.is_empty() || !count_row.is_defined(0) {
528 0
529 } else {
530 let blob = self.layout.get_blob(&count_row, 0);
531 if blob.is_empty() {
532 0
533 } else {
534 from_bytes(blob.as_ref()).unwrap_or(0)
535 }
536 };
537
538 let new_count = current_count + 1;
539
540 let serialized =
541 to_stdvec(&new_count).map_err(|e| Error(internal!("Failed to serialize count: {}", e)))?;
542
543 let mut count_state_row = self.layout.allocate();
544 let blob = Blob::from(serialized);
545 self.layout.set_blob(&mut count_state_row, 0, &blob);
546
547 self.save_state(txn, &count_key, count_state_row)?;
548
549 Ok(current_count)
550 }
551
552 pub fn create_count_key(&self, group_hash: Hash128) -> EncodedKey {
554 let mut serializer = KeySerializer::with_capacity(32);
555 serializer.extend_bytes(b"cnt:");
556 serializer.extend_u128(group_hash);
557 EncodedKey::new(serializer.finish())
558 }
559}
560
561impl RawStatefulOperator for WindowOperator {}
562
563impl WindowStateful for WindowOperator {
564 fn layout(&self) -> Schema {
565 self.layout.clone()
566 }
567}
568
569impl Operator for WindowOperator {
570 fn id(&self) -> FlowNodeId {
571 self.node
572 }
573
574 fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
575 match &self.slide {
578 Some(WindowSlide::Rolling) => apply_rolling_window(self, txn, change),
579 Some(_) => apply_sliding_window(self, txn, change),
580 None => apply_tumbling_window(self, txn, change),
581 }
582 }
583
584 fn pull(&self, _txn: &mut FlowTransaction, _rows: &[RowNumber]) -> Result<Columns> {
585 todo!()
586 }
587}
588
589impl WindowOperator {
591 pub fn should_trigger_window(&self, state: &WindowState, current_timestamp: u64) -> bool {
593 match (&self.window_type, &self.size, &self.slide) {
594 (WindowType::Time(_), WindowSize::Duration(_), None) => {
596 if state.event_count > 0 {
597 return true;
598 }
599 false
600 }
601 (WindowType::Time(_), WindowSize::Duration(duration), Some(_)) => {
605 if state.event_count > 0 {
607 let window_size_ms = duration.as_millis() as u64;
608 let trigger_time = state.window_start + window_size_ms;
609 current_timestamp >= trigger_time
610 } else {
611 false
612 }
613 }
614 (WindowType::Count, WindowSize::Count(count), None) => state.event_count >= *count,
616 (WindowType::Count, WindowSize::Count(_count), Some(_)) => {
618 state.event_count >= self.min_events as u64
619 }
620 _ => false,
621 }
622 }
623}