1use std::sync::Arc;
5
6use reifydb_core::{
7 common::{CommitVersion, WindowKind, WindowSize},
8 error::diagnostic::flow::{flow_window_timestamp_column_not_found, flow_window_timestamp_column_type_mismatch},
9 interface::catalog::flow::FlowNodeId,
10 internal,
11 key::{EncodableKey, flow_node_state::FlowNodeStateKey},
12};
13use serde::{Deserialize, Serialize};
14
15use crate::{
16 operator::{Operator, Operators},
17 transaction::FlowTransaction,
18};
19
20pub mod rolling;
21pub mod session;
22pub mod sliding;
23pub mod tumbling;
24
25use rolling::apply_rolling_window;
26use session::apply_session_window;
27use sliding::apply_sliding_window;
28use tumbling::apply_tumbling_window;
29
30static EMPTY_PARAMS: Params = Params::None;
31
32use std::{ops, sync::LazyLock, time::Duration};
33
34use postcard::{from_bytes, to_stdvec};
35use reifydb_core::{
36 encoded::{
37 key::{EncodedKey, EncodedKeyRange},
38 row::EncodedRow,
39 shape::{RowShape, RowShapeField},
40 },
41 interface::change::{Change, Diff},
42 row::Row,
43 util::encoding::keycode::serializer::KeySerializer,
44 value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns},
45};
46use reifydb_engine::{
47 expression::{
48 compile::{CompiledExpr, compile_expression},
49 context::{CompileContext, EvalContext},
50 },
51 vm::stack::SymbolTable,
52};
53use reifydb_routine::routine::registry::Routines;
54use reifydb_rql::expression::{
55 Expression,
56 name::{collect_all_column_names, display_label},
57};
58use reifydb_runtime::{
59 context::RuntimeContext,
60 hash::{Hash128, xxh3_128},
61};
62use reifydb_type::{
63 Result,
64 error::Error,
65 fragment::Fragment,
66 params::Params,
67 util::cowvec::CowVec,
68 value::{Value, blob::Blob, datetime::DateTime, identity::IdentityId, row_number::RowNumber, r#type::Type},
69};
70
71use crate::operator::stateful::{raw::RawStatefulOperator, row::RowNumberProvider, window::WindowStateful};
72
73#[inline]
74fn build_aggregation_shape(names: &[String], types: &[Type]) -> RowShape {
75 let fields: Vec<RowShapeField> = names
76 .iter()
77 .zip(types.iter())
78 .map(|(name, ty)| RowShapeField::unconstrained(name.clone(), ty.clone()))
79 .collect();
80 RowShape::new(fields)
81}
82
83static EMPTY_SYMBOL_TABLE: LazyLock<SymbolTable> = LazyLock::new(SymbolTable::new);
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct WindowLayout {
88 pub names: Vec<String>,
89 pub types: Vec<Type>,
90}
91
92impl WindowLayout {
93 pub fn from_row(row: &Row) -> Self {
94 Self {
95 names: row.shape.field_names().map(|s| s.to_string()).collect(),
96 types: row.shape.fields().iter().map(|f| f.constraint.get_type()).collect(),
97 }
98 }
99
100 pub fn to_shape(&self) -> RowShape {
101 let fields: Vec<RowShapeField> = self
102 .names
103 .iter()
104 .zip(self.types.iter())
105 .map(|(name, ty)| RowShapeField::unconstrained(name.clone(), ty.clone()))
106 .collect();
107 RowShape::new(fields)
108 }
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct WindowEvent {
114 pub row_number: RowNumber,
115 pub timestamp: u64,
116 #[serde(with = "serde_bytes")]
117 pub encoded_bytes: Vec<u8>,
118}
119
120impl WindowEvent {
121 pub fn from_row(row: &Row, timestamp: u64) -> Self {
122 Self {
123 row_number: row.number,
124 timestamp,
125 encoded_bytes: row.encoded.as_slice().to_vec(),
126 }
127 }
128
129 pub fn to_row(&self, layout: &WindowLayout) -> Row {
130 let shape = layout.to_shape();
131 let encoded = EncodedRow(CowVec::new(self.encoded_bytes.clone()));
132 Row {
133 number: self.row_number,
134 encoded,
135 shape,
136 }
137 }
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize, Default)]
142pub struct WindowState {
143 pub events: Vec<WindowEvent>,
145 pub window_layout: Option<WindowLayout>,
147 pub window_start: u64,
149 pub event_count: u64,
151 pub last_event_time: u64,
153}
154
155impl WindowState {
156 pub fn layout(&self) -> &WindowLayout {
158 self.window_layout.as_ref().expect("WindowState layout must be set before accessing")
159 }
160}
161
162pub struct WindowConfig {
164 pub parent: Arc<Operators>,
165 pub node: FlowNodeId,
166 pub kind: WindowKind,
167 pub group_by: Vec<Expression>,
168 pub aggregations: Vec<Expression>,
169 pub ts: Option<String>,
170 pub runtime_context: RuntimeContext,
171 pub routines: Routines,
172}
173
174pub struct WindowOperator {
176 pub parent: Arc<Operators>,
177 pub node: FlowNodeId,
178 pub kind: WindowKind,
179 pub group_by: Vec<Expression>,
180 pub aggregations: Vec<Expression>,
181 pub ts: Option<String>,
182 pub compiled_group_by: Vec<CompiledExpr>,
183 pub compiled_aggregations: Vec<CompiledExpr>,
184 pub layout: RowShape,
185 pub routines: Routines,
186 pub row_number_provider: RowNumberProvider,
187 pub runtime_context: RuntimeContext,
188 pub projected_columns: Vec<String>,
191}
192
193impl WindowOperator {
194 pub fn new(config: WindowConfig) -> Self {
195 let symbols = SymbolTable::new();
196 let compile_ctx = CompileContext {
197 symbols: &symbols,
198 };
199
200 let compiled_group_by: Vec<CompiledExpr> = config
202 .group_by
203 .iter()
204 .map(|e| compile_expression(&compile_ctx, e).expect("Failed to compile group_by expression"))
205 .collect();
206
207 let compiled_aggregations: Vec<CompiledExpr> = config
209 .aggregations
210 .iter()
211 .map(|e| compile_expression(&compile_ctx, e).expect("Failed to compile aggregation expression"))
212 .collect();
213
214 let mut needed = collect_all_column_names(&config.group_by);
215 needed.extend(collect_all_column_names(&config.aggregations));
216 let mut projected_columns: Vec<String> = needed.into_iter().collect();
217 projected_columns.sort();
218
219 Self {
220 parent: config.parent,
221 node: config.node,
222 kind: config.kind,
223 group_by: config.group_by,
224 aggregations: config.aggregations,
225 ts: config.ts,
226 compiled_group_by,
227 compiled_aggregations,
228 layout: RowShape::testing(&[Type::Blob]),
229 routines: config.routines,
230 row_number_provider: RowNumberProvider::new(config.node),
231 runtime_context: config.runtime_context,
232 projected_columns,
233 }
234 }
235
236 pub fn current_timestamp(&self) -> u64 {
238 self.runtime_context.clock.now_millis()
239 }
240
241 pub fn project_columns(&self, columns: &Columns) -> Columns {
243 if self.projected_columns.is_empty() {
244 return columns.clone();
245 }
246 columns.project_by_names(&self.projected_columns)
247 }
248
249 pub fn is_count_based(&self) -> bool {
251 self.kind.size().is_some_and(|m| m.is_count())
252 }
253
254 pub fn size_duration(&self) -> Option<Duration> {
256 self.kind.size().and_then(|m| m.as_duration())
257 }
258
259 pub fn size_count(&self) -> Option<u64> {
261 self.kind.size().and_then(|m| m.as_count())
262 }
263
264 fn eval_session(&self, is_aggregate: bool) -> EvalContext<'_> {
265 EvalContext {
266 params: &EMPTY_PARAMS,
267 symbols: &EMPTY_SYMBOL_TABLE,
268 routines: &self.routines,
269 runtime_context: &self.runtime_context,
270 arena: None,
271 identity: IdentityId::root(),
272 is_aggregate_context: is_aggregate,
273 columns: Columns::empty(),
274 row_count: 1,
275 target: None,
276 take: None,
277 }
278 }
279
280 pub fn compute_group_keys(&self, columns: &Columns) -> Result<Vec<Hash128>> {
282 let row_count = columns.row_count();
283 if row_count == 0 {
284 return Ok(Vec::new());
285 }
286
287 if self.compiled_group_by.is_empty() {
288 return Ok(vec![Hash128::from(0u128); row_count]);
289 }
290
291 let session = self.eval_session(false);
292 let exec_ctx = session.with_eval(columns.clone(), row_count);
293
294 let mut group_columns: Vec<ColumnWithName> = Vec::new();
295 for compiled_expr in &self.compiled_group_by {
296 let col = compiled_expr.execute(&exec_ctx)?;
297 group_columns.push(col);
298 }
299
300 let mut hashes = Vec::with_capacity(row_count);
301 for row_idx in 0..row_count {
302 let mut data = Vec::new();
303 for col in &group_columns {
304 let value = col.data().get_value(row_idx);
305 let value_str = value.to_string();
306 data.extend_from_slice(value_str.as_bytes());
307 }
308 hashes.push(xxh3_128(&data));
309 }
310
311 Ok(hashes)
312 }
313
314 pub fn resolve_event_timestamps(&self, columns: &Columns, row_count: usize) -> Result<Vec<u64>> {
318 if row_count == 0 {
319 return Ok(Vec::new());
320 }
321 match &self.ts {
322 Some(ts_col) => {
323 let col = columns.column(ts_col).ok_or_else(|| {
324 Error(Box::new(flow_window_timestamp_column_not_found(ts_col)))
325 })?;
326 let mut timestamps = Vec::with_capacity(row_count);
327 for i in 0..row_count {
328 match col.data().get_value(i) {
329 Value::DateTime(dt) => timestamps.push(dt.timestamp_millis() as u64),
330 other => {
331 return Err(Error(Box::new(
332 flow_window_timestamp_column_type_mismatch(
333 ts_col,
334 other.get_type(),
335 ),
336 )));
337 }
338 }
339 }
340 Ok(timestamps)
341 }
342 None => {
343 let now = self.current_timestamp();
344 Ok(vec![now; row_count])
345 }
346 }
347 }
348
349 pub fn create_window_key(&self, group_hash: Hash128, window_id: u64) -> EncodedKey {
351 let mut serializer = KeySerializer::with_capacity(32);
352 serializer.extend_bytes(b"win:");
353 serializer.extend_u128(group_hash);
354 serializer.extend_u64(window_id);
355 EncodedKey::new(serializer.finish())
356 }
357
358 fn create_row_index_key(&self, group_hash: Hash128, row_number: RowNumber) -> EncodedKey {
360 let mut serializer = KeySerializer::with_capacity(32);
361 serializer.extend_bytes(b"idx:");
362 serializer.extend_u128(group_hash);
363 serializer.extend_u64(row_number.0);
364 EncodedKey::new(serializer.finish())
365 }
366
367 pub fn store_row_index(
370 &self,
371 txn: &mut FlowTransaction,
372 group_hash: Hash128,
373 row_number: RowNumber,
374 window_id: u64,
375 ) -> Result<()> {
376 let index_key = self.create_row_index_key(group_hash, row_number);
377 let mut window_ids = self.lookup_row_index(txn, group_hash, row_number)?;
378 if !window_ids.contains(&window_id) {
379 window_ids.push(window_id);
380 }
381 let serialized = to_stdvec(&window_ids)
382 .map_err(|e| Error(Box::new(internal!("Failed to serialize row index: {}", e))))?;
383 let mut state_row = self.layout.allocate();
384 let blob = Blob::from(serialized);
385 self.layout.set_blob(&mut state_row, 0, &blob);
386 self.save_state(txn, &index_key, state_row)
387 }
388
389 fn lookup_row_index(
391 &self,
392 txn: &mut FlowTransaction,
393 group_hash: Hash128,
394 row_number: RowNumber,
395 ) -> Result<Vec<u64>> {
396 let index_key = self.create_row_index_key(group_hash, row_number);
397 let state_row = self.load_state(txn, &index_key)?;
398 if state_row.is_empty() || !state_row.is_defined(0) {
399 return Ok(Vec::new());
400 }
401 let blob = self.layout.get_blob(&state_row, 0);
402 if blob.is_empty() {
403 return Ok(Vec::new());
404 }
405 let window_ids: Vec<u64> = from_bytes(blob.as_ref())
406 .map_err(|e| Error(Box::new(internal!("Failed to deserialize row index: {}", e))))?;
407 Ok(window_ids)
408 }
409
410 fn replace_event_in_windows(
413 &self,
414 txn: &mut FlowTransaction,
415 group_hash: Hash128,
416 row_number: RowNumber,
417 post_row: &Row,
418 post_timestamp: u64,
419 ) -> Result<Vec<Diff>> {
420 let window_ids = self.lookup_row_index(txn, group_hash, row_number)?;
421 if window_ids.is_empty() {
422 return Ok(Vec::new());
423 }
424
425 let mut result = Vec::new();
426
427 for window_id in &window_ids {
428 let window_key = self.create_window_key(group_hash, *window_id);
429 let mut window_state = self.load_window_state(txn, &window_key)?;
430
431 let event_idx = window_state.events.iter().position(|e| e.row_number == row_number);
432 if let Some(idx) = event_idx {
433 let layout = match &window_state.window_layout {
434 Some(l) => l.clone(),
435 None => continue,
436 };
437
438 let changed_at = DateTime::from_nanos(post_timestamp);
439 let pre_aggregation = self.apply_aggregations(
440 txn,
441 &window_key,
442 &layout,
443 &window_state.events,
444 changed_at,
445 )?;
446
447 window_state.events[idx] = WindowEvent::from_row(post_row, post_timestamp);
448
449 let post_aggregation = self.apply_aggregations(
450 txn,
451 &window_key,
452 &layout,
453 &window_state.events,
454 changed_at,
455 )?;
456
457 self.save_window_state(txn, &window_key, &window_state)?;
458
459 if let (Some((pre_row, _)), Some((post_row, _))) = (pre_aggregation, post_aggregation) {
460 result.push(Diff::update(
461 Columns::from_row(&pre_row),
462 Columns::from_row(&post_row),
463 ));
464 }
465 }
466 }
467
468 Ok(result)
469 }
470
471 fn process_event_updates(&self, txn: &mut FlowTransaction, pre: &Columns, post: &Columns) -> Result<Vec<Diff>> {
473 let row_count = pre.row_count();
474 if row_count == 0 {
475 return Ok(Vec::new());
476 }
477
478 let group_hashes = self.compute_group_keys(pre)?;
479 let post_timestamps = self.resolve_event_timestamps(post, row_count)?;
480 let mut result = Vec::new();
481
482 for row_idx in 0..row_count {
483 let row_number = pre.row_numbers[row_idx];
484 let group_hash = group_hashes[row_idx];
485 let post_timestamp = post_timestamps[row_idx];
486
487 let single_row = post.extract_row(row_idx);
488 let projected = self.project_columns(&single_row);
489 let post_row = projected.to_single_row();
490
491 let diffs =
492 self.replace_event_in_windows(txn, group_hash, row_number, &post_row, post_timestamp)?;
493 result.extend(diffs);
494 }
495
496 Ok(result)
497 }
498
499 fn remove_event_from_windows(
500 &self,
501 txn: &mut FlowTransaction,
502 group_hash: Hash128,
503 row_number: RowNumber,
504 ) -> Result<Vec<Diff>> {
505 let window_ids = self.lookup_row_index(txn, group_hash, row_number)?;
506 if window_ids.is_empty() {
507 return Ok(Vec::new());
508 }
509
510 let mut result = Vec::new();
511 for window_id in &window_ids {
512 let window_key = self.create_window_key(group_hash, *window_id);
513 if let Some(diff) = self.remove_event_from_one_window(txn, &window_key, row_number)? {
514 result.push(diff);
515 }
516 }
517
518 let index_key = self.create_row_index_key(group_hash, row_number);
519 let empty = self.layout.allocate();
520 self.save_state(txn, &index_key, empty)?;
521
522 Ok(result)
523 }
524
525 #[inline]
526 fn remove_event_from_one_window(
527 &self,
528 txn: &mut FlowTransaction,
529 window_key: &EncodedKey,
530 row_number: RowNumber,
531 ) -> Result<Option<Diff>> {
532 let mut window_state = self.load_window_state(txn, window_key)?;
533 let Some(idx) = window_state.events.iter().position(|e| e.row_number == row_number) else {
534 return Ok(None);
535 };
536 let Some(layout) = window_state.window_layout.clone() else {
537 return Ok(None);
538 };
539
540 let changed_at = DateTime::from_nanos(txn.clock().now_nanos());
541 let pre_aggregation =
542 self.apply_aggregations(txn, window_key, &layout, &window_state.events, changed_at)?;
543
544 window_state.events.remove(idx);
545 window_state.event_count = window_state.event_count.saturating_sub(1);
546
547 if window_state.events.is_empty() {
548 self.save_window_state(txn, window_key, &window_state)?;
549 return Ok(pre_aggregation.map(|(pre_row, _)| Diff::remove(Columns::from_row(&pre_row))));
550 }
551
552 let post_aggregation =
553 self.apply_aggregations(txn, window_key, &layout, &window_state.events, changed_at)?;
554 self.save_window_state(txn, window_key, &window_state)?;
555
556 Ok(match (pre_aggregation, post_aggregation) {
557 (Some((pre_row, _)), Some((post_row, _))) => {
558 Some(Diff::update(Columns::from_row(&pre_row), Columns::from_row(&post_row)))
559 }
560 _ => None,
561 })
562 }
563
564 fn process_event_removals(&self, txn: &mut FlowTransaction, pre: &Columns) -> Result<Vec<Diff>> {
566 let row_count = pre.row_count();
567 if row_count == 0 {
568 return Ok(Vec::new());
569 }
570
571 let group_hashes = self.compute_group_keys(pre)?;
572 let mut result = Vec::new();
573
574 for (&row_number, &group_hash) in pre.row_numbers.iter().zip(group_hashes.iter()) {
575 let diffs = self.remove_event_from_windows(txn, group_hash, row_number)?;
576 result.extend(diffs);
577 }
578
579 Ok(result)
580 }
581
582 pub fn extract_group_values(
585 &self,
586 window_layout: &WindowLayout,
587 events: &[WindowEvent],
588 ) -> Result<(Vec<Value>, Vec<String>)> {
589 if events.is_empty() || self.group_by.is_empty() {
590 return Ok((Vec::new(), Vec::new()));
591 }
592
593 let columns = self.events_to_columns(window_layout, events)?;
594 let row_count = columns.row_count();
595 if row_count == 0 {
596 return Ok((Vec::new(), Vec::new()));
597 }
598
599 let session = self.eval_session(false);
600 let exec_ctx = session.with_eval(columns, row_count);
601
602 let mut values = Vec::new();
603 let mut names = Vec::new();
604 for (i, compiled_expr) in self.compiled_group_by.iter().enumerate() {
605 let col = compiled_expr.execute(&exec_ctx)?;
606 values.push(col.data().get_value(0).clone());
607 names.push(display_label(&self.group_by[i]).text().to_string());
608 }
609
610 Ok((values, names))
611 }
612
613 pub fn events_to_columns(&self, window_layout: &WindowLayout, events: &[WindowEvent]) -> Result<Columns> {
615 if events.is_empty() {
616 return Ok(Columns::new(Vec::new()));
617 }
618
619 let mut builders: Vec<ColumnBuffer> = window_layout
620 .types
621 .iter()
622 .map(|ty| ColumnBuffer::with_capacity(ty.clone(), events.len()))
623 .collect();
624
625 for event in events.iter() {
626 let row = event.to_row(window_layout);
627 for (idx, builder) in builders.iter_mut().enumerate() {
628 let value = row.shape.get_value(&row.encoded, idx);
629 builder.push_value(value);
630 }
631 }
632
633 let columns = window_layout
634 .names
635 .iter()
636 .zip(builders)
637 .map(|(name, data)| ColumnWithName {
638 name: Fragment::internal(name.clone()),
639 data,
640 })
641 .collect();
642
643 Ok(Columns::new(columns))
644 }
645
646 pub fn apply_aggregations(
647 &self,
648 txn: &mut FlowTransaction,
649 window_key: &EncodedKey,
650 window_layout: &WindowLayout,
651 events: &[WindowEvent],
652 changed_at: DateTime,
653 ) -> Result<Option<(Row, bool)>> {
654 if events.is_empty() {
655 return Ok(None);
656 }
657
658 if self.aggregations.is_empty() {
659 let (result_row_number, is_new) =
660 self.row_number_provider.get_or_create_row_number(txn, window_key)?;
661 let mut result_row = events[0].to_row(window_layout);
662 result_row.number = result_row_number;
663 return Ok(Some((result_row, is_new)));
664 }
665
666 let (result_values, result_names, result_types) =
667 self.compute_aggregation_outputs(window_layout, events)?;
668
669 let layout = build_aggregation_shape(&result_names, &result_types);
670 let mut encoded = layout.allocate();
671 layout.set_values(&mut encoded, &result_values);
672
673 let ts_nanos = changed_at.to_nanos();
674 encoded.set_timestamps(ts_nanos, ts_nanos);
675
676 let (result_row_number, is_new) = self.row_number_provider.get_or_create_row_number(txn, window_key)?;
677 Ok(Some((
678 Row {
679 number: result_row_number,
680 encoded,
681 shape: layout,
682 },
683 is_new,
684 )))
685 }
686
687 #[inline]
688 fn compute_aggregation_outputs(
689 &self,
690 window_layout: &WindowLayout,
691 events: &[WindowEvent],
692 ) -> Result<(Vec<Value>, Vec<String>, Vec<Type>)> {
693 let columns = self.events_to_columns(window_layout, events)?;
694 let agg_session = self.eval_session(true);
695 let exec_ctx = agg_session.with_eval(columns, events.len());
696 let (group_values, group_names) = self.extract_group_values(window_layout, events)?;
697
698 let mut values = Vec::new();
699 let mut names = Vec::new();
700 let mut types = Vec::new();
701
702 for (value, name) in group_values.into_iter().zip(group_names.into_iter()) {
703 types.push(value.get_type());
704 values.push(value);
705 names.push(name);
706 }
707
708 for (i, compiled_aggregation) in self.compiled_aggregations.iter().enumerate() {
709 let agg_column = compiled_aggregation.execute(&exec_ctx)?;
710 let value = agg_column.data().get_value(0);
711 types.push(value.get_type());
712 values.push(value);
713 names.push(display_label(&self.aggregations[i]).text().to_string());
714 }
715
716 Ok((values, names, types))
717 }
718
719 pub fn process_expired_windows(&self, txn: &mut FlowTransaction, current_timestamp: u64) -> Result<Vec<Diff>> {
722 let mut result = Vec::new();
723
724 if let Some(duration) = self.size_duration() {
725 let window_size_ms = duration.as_millis() as u64;
726 if window_size_ms > 0 {
727 let expire_before = current_timestamp.saturating_sub(window_size_ms * 2);
728 let cutoff_id = expire_before / window_size_ms;
729 if cutoff_id == 0 {
730 return Ok(result);
731 }
732
733 let groups = self.load_group_registry(txn)?;
734 for group_hash in &groups {
735 let low_key = self.create_window_key(*group_hash, cutoff_id);
737 let high_key = self.create_window_key(*group_hash, 0);
738 let range = EncodedKeyRange::new(
739 ops::Bound::Excluded(low_key),
740 ops::Bound::Included(high_key),
741 );
742
743 let expired_keys = self.scan_keys_in_range(txn, &range)?;
744 let changed_at = DateTime::from_nanos(current_timestamp);
745 for key in &expired_keys {
746 let window_state = self.load_window_state(txn, key)?;
747 if !window_state.events.is_empty()
748 && let Some(layout) = &window_state.window_layout && let Some((
749 row,
750 _,
751 )) = self
752 .apply_aggregations(
753 txn,
754 key,
755 layout,
756 &window_state.events,
757 changed_at,
758 )? {
759 result.push(Diff::remove(Columns::from_row(&row)));
760 }
761 }
762
763 if !expired_keys.is_empty() {
764 let low_key = self.create_window_key(*group_hash, cutoff_id);
765 let high_key = self.create_window_key(*group_hash, 0);
766 let range = EncodedKeyRange::new(
767 ops::Bound::Excluded(low_key),
768 ops::Bound::Included(high_key),
769 );
770 let _ = self.expire_range(txn, range)?;
771 }
772 }
773 }
774 }
775
776 Ok(result)
777 }
778
779 pub fn load_window_state(&self, txn: &mut FlowTransaction, window_key: &EncodedKey) -> Result<WindowState> {
781 let state_row = self.load_state(txn, window_key)?;
782
783 if state_row.is_empty() || !state_row.is_defined(0) {
784 return Ok(WindowState::default());
785 }
786
787 let blob = self.layout.get_blob(&state_row, 0);
788 if blob.is_empty() {
789 return Ok(WindowState::default());
790 }
791
792 from_bytes(blob.as_ref())
793 .map_err(|e| Error(Box::new(internal!("Failed to deserialize WindowState: {}", e))))
794 }
795
796 pub fn save_window_state(
798 &self,
799 txn: &mut FlowTransaction,
800 window_key: &EncodedKey,
801 state: &WindowState,
802 ) -> Result<()> {
803 let serialized = to_stdvec(state)
804 .map_err(|e| Error(Box::new(internal!("Failed to serialize WindowState: {}", e))))?;
805
806 let mut state_row = self.layout.allocate();
807 let blob = Blob::from(serialized);
808 self.layout.set_blob(&mut state_row, 0, &blob);
809
810 self.save_state(txn, window_key, state_row)
811 }
812
813 pub fn get_and_increment_global_count(&self, txn: &mut FlowTransaction, group_hash: Hash128) -> Result<u64> {
815 let count_key = self.create_count_key(group_hash);
816 let count_row = self.load_state(txn, &count_key)?;
817
818 let current_count = if count_row.is_empty() || !count_row.is_defined(0) {
819 0
820 } else {
821 let blob = self.layout.get_blob(&count_row, 0);
822 if blob.is_empty() {
823 0
824 } else {
825 from_bytes(blob.as_ref()).unwrap_or(0)
826 }
827 };
828
829 let new_count = current_count + 1;
830
831 let serialized = to_stdvec(&new_count)
832 .map_err(|e| Error(Box::new(internal!("Failed to serialize count: {}", e))))?;
833
834 let mut count_state_row = self.layout.allocate();
835 let blob = Blob::from(serialized);
836 self.layout.set_blob(&mut count_state_row, 0, &blob);
837
838 self.save_state(txn, &count_key, count_state_row)?;
839
840 Ok(current_count)
841 }
842
843 pub fn create_count_key(&self, group_hash: Hash128) -> EncodedKey {
845 let mut serializer = KeySerializer::with_capacity(32);
846 serializer.extend_bytes(b"cnt:");
847 serializer.extend_u128(group_hash);
848 EncodedKey::new(serializer.finish())
849 }
850
851 fn create_group_registry_key(&self) -> EncodedKey {
853 EncodedKey::new(b"grp:")
854 }
855
856 pub fn load_group_registry(&self, txn: &mut FlowTransaction) -> Result<Vec<Hash128>> {
858 let key = self.create_group_registry_key();
859 let state_row = self.load_state(txn, &key)?;
860 if state_row.is_empty() || !state_row.is_defined(0) {
861 return Ok(Vec::new());
862 }
863 let blob = self.layout.get_blob(&state_row, 0);
864 if blob.is_empty() {
865 return Ok(Vec::new());
866 }
867 let groups: Vec<u128> = from_bytes(blob.as_ref()).unwrap_or_default();
868 Ok(groups.into_iter().map(Hash128::from).collect())
869 }
870
871 fn save_group_registry(&self, txn: &mut FlowTransaction, groups: &[Hash128]) -> Result<()> {
873 let key = self.create_group_registry_key();
874 let raw: Vec<u128> = groups.iter().map(|h| (*h).into()).collect();
875 let serialized = to_stdvec(&raw)
876 .map_err(|e| Error(Box::new(internal!("Failed to serialize group registry: {}", e))))?;
877 let mut state_row = self.layout.allocate();
878 let blob = Blob::from(serialized);
879 self.layout.set_blob(&mut state_row, 0, &blob);
880 self.save_state(txn, &key, state_row)
881 }
882
883 pub fn register_group(&self, txn: &mut FlowTransaction, group_hash: Hash128) -> Result<()> {
885 let mut groups = self.load_group_registry(txn)?;
886 if !groups.contains(&group_hash) {
887 groups.push(group_hash);
888 self.save_group_registry(txn, &groups)?;
889 }
890 Ok(())
891 }
892
893 pub fn tick_expire_windows(&self, txn: &mut FlowTransaction, current_timestamp: u64) -> Result<Vec<Diff>> {
894 let mut result = Vec::new();
895 let window_size_ms = match self.size_duration() {
896 Some(d) => d.as_millis() as u64,
897 None => return Ok(result),
898 };
899 if window_size_ms == 0 {
900 return Ok(result);
901 }
902
903 let mut keys_to_remove = Vec::new();
904 for window_key in self.scan_window_keys(txn)? {
905 if let Some(diff) =
906 self.expire_window_if_due(txn, &window_key, current_timestamp, window_size_ms)?
907 {
908 result.push(diff);
909 keys_to_remove.push(window_key);
910 }
911 }
912
913 for key in &keys_to_remove {
914 let empty = self.create_state();
915 self.save_state(txn, key, empty)?;
916 }
917
918 Ok(result)
919 }
920
921 #[inline]
922 fn scan_window_keys(&self, txn: &mut FlowTransaction) -> Result<Vec<EncodedKey>> {
923 let all_state = txn.state_scan(self.node)?;
924 let prefix = FlowNodeStateKey::new(self.node, vec![]).encode();
925 let win_marker = b"win:";
926
927 let mut keys = Vec::new();
928 for item in &all_state.items {
929 let full_key = &item.key;
930 if full_key.len() <= prefix.len() {
931 continue;
932 }
933 let inner = &full_key[prefix.len()..];
934 if !inner.starts_with(win_marker) {
935 continue;
936 }
937 keys.push(EncodedKey::new(inner));
938 }
939 Ok(keys)
940 }
941
942 #[inline]
943 fn expire_window_if_due(
944 &self,
945 txn: &mut FlowTransaction,
946 window_key: &EncodedKey,
947 current_timestamp: u64,
948 window_size_ms: u64,
949 ) -> Result<Option<Diff>> {
950 let window_state = self.load_window_state(txn, window_key)?;
951 if window_state.events.is_empty() {
952 return Ok(None);
953 }
954
955 let newest_event_time = window_state.events.iter().map(|e| e.timestamp).max().unwrap_or(0);
956 if current_timestamp.saturating_sub(newest_event_time) <= window_size_ms {
957 return Ok(None);
958 }
959
960 let changed_at = DateTime::from_nanos(current_timestamp);
961 if let Some(layout) = &window_state.window_layout
962 && let Some((row, _)) =
963 self.apply_aggregations(txn, window_key, layout, &window_state.events, changed_at)?
964 {
965 return Ok(Some(Diff::remove(Columns::from_row(&row))));
966 }
967 Ok(None)
968 }
969
970 pub fn process_insert(
972 &self,
973 txn: &mut FlowTransaction,
974 columns: &Columns,
975 changed_at: DateTime,
976 group_fn: impl Fn(&WindowOperator, &mut FlowTransaction, &Columns, Hash128, DateTime) -> Result<Vec<Diff>>,
977 ) -> Result<Vec<Diff>> {
978 let row_count = columns.row_count();
979 if row_count == 0 {
980 return Ok(Vec::new());
981 }
982 let group_hashes = self.compute_group_keys(columns)?;
983 let groups = columns.partition_by_keys(&group_hashes);
984 let mut result = Vec::new();
985 for (group_hash, group_columns) in groups {
986 self.register_group(txn, group_hash)?;
987 let group_result = group_fn(self, txn, &group_columns, group_hash, changed_at)?;
988 result.extend(group_result);
989 }
990 Ok(result)
991 }
992
993 pub fn apply_window_change(
996 &self,
997 txn: &mut FlowTransaction,
998 change: &Change,
999 expire: bool,
1000 process_fn: impl Fn(&WindowOperator, &mut FlowTransaction, &Columns) -> Result<Vec<Diff>>,
1001 ) -> Result<Vec<Diff>> {
1002 let mut result = Vec::new();
1003 if expire {
1004 let current_timestamp = self.current_timestamp();
1005 let expired_diffs = self.process_expired_windows(txn, current_timestamp)?;
1006 result.extend(expired_diffs);
1007 }
1008 for diff in change.diffs.iter() {
1009 match diff {
1010 Diff::Insert {
1011 post,
1012 } => result.extend(process_fn(self, txn, post)?),
1013 Diff::Update {
1014 pre,
1015 post,
1016 } => result.extend(self.apply_window_update_diff(txn, pre, post, &process_fn)?),
1017 Diff::Remove {
1018 pre,
1019 } => result.extend(self.process_event_removals(txn, pre)?),
1020 }
1021 }
1022 Ok(result)
1023 }
1024
1025 #[inline]
1026 fn apply_window_update_diff(
1027 &self,
1028 txn: &mut FlowTransaction,
1029 pre: &Columns,
1030 post: &Columns,
1031 process_fn: &impl Fn(&WindowOperator, &mut FlowTransaction, &Columns) -> Result<Vec<Diff>>,
1032 ) -> Result<Vec<Diff>> {
1033 let group_hashes = self.compute_group_keys(pre)?;
1034 let mut update_indices: Vec<usize> = Vec::new();
1035 let mut insert_indices: Vec<usize> = Vec::new();
1036 for (row_idx, &group_hash) in group_hashes.iter().enumerate() {
1037 let row_number = pre.row_numbers[row_idx];
1038 if self.lookup_row_index(txn, group_hash, row_number)?.is_empty() {
1039 insert_indices.push(row_idx);
1040 } else {
1041 update_indices.push(row_idx);
1042 }
1043 }
1044
1045 let mut result = Vec::new();
1046 if !update_indices.is_empty() {
1047 let pre_subset = pre.extract_by_indices(&update_indices);
1048 let post_subset = post.extract_by_indices(&update_indices);
1049 result.extend(self.process_event_updates(txn, &pre_subset, &post_subset)?);
1050 }
1051 if !insert_indices.is_empty() {
1052 let post_subset = post.extract_by_indices(&insert_indices);
1053 result.extend(process_fn(self, txn, &post_subset)?);
1054 }
1055 Ok(result)
1056 }
1057
1058 pub fn emit_aggregation_diff(
1061 aggregated_row: &Row,
1062 is_new: bool,
1063 previous_aggregation: Option<(Row, bool)>,
1064 ) -> Diff {
1065 if is_new {
1066 Diff::insert(Columns::from_row(aggregated_row))
1067 } else if let Some((previous_row, _)) = previous_aggregation {
1068 Diff::update(Columns::from_row(&previous_row), Columns::from_row(aggregated_row))
1069 } else {
1070 Diff::insert(Columns::from_row(aggregated_row))
1071 }
1072 }
1073}
1074
1075impl RawStatefulOperator for WindowOperator {}
1076
1077impl WindowStateful for WindowOperator {
1078 fn layout(&self) -> RowShape {
1079 self.layout.clone()
1080 }
1081}
1082
1083impl Operator for WindowOperator {
1084 fn id(&self) -> FlowNodeId {
1085 self.node
1086 }
1087
1088 fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
1089 match &self.kind {
1090 WindowKind::Tumbling {
1091 ..
1092 } => apply_tumbling_window(self, txn, change),
1093 WindowKind::Sliding {
1094 ..
1095 } => apply_sliding_window(self, txn, change),
1096 WindowKind::Rolling {
1097 ..
1098 } => apply_rolling_window(self, txn, change),
1099 WindowKind::Session {
1100 ..
1101 } => apply_session_window(self, txn, change),
1102 }
1103 }
1104
1105 fn tick(&self, txn: &mut FlowTransaction, timestamp: DateTime) -> Result<Option<Change>> {
1106 let current_timestamp = timestamp.to_nanos() / 1_000_000;
1107 let diffs = match &self.kind {
1108 WindowKind::Tumbling {
1109 ..
1110 }
1111 | WindowKind::Sliding {
1112 ..
1113 } => self.tick_expire_windows(txn, current_timestamp)?,
1114 WindowKind::Rolling {
1115 size: WindowSize::Duration(_),
1116 } => self.tick_rolling_eviction(txn, current_timestamp)?,
1117 WindowKind::Session {
1118 ..
1119 } => self.tick_session_expiration(txn, current_timestamp)?,
1120 _ => vec![],
1121 };
1122
1123 if diffs.is_empty() {
1124 Ok(None)
1125 } else {
1126 Ok(Some(Change::from_flow(
1127 self.node,
1128 CommitVersion(0),
1129 diffs,
1130 DateTime::from_nanos(self.runtime_context.clock.now_nanos()),
1131 )))
1132 }
1133 }
1134
1135 fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
1136 self.parent.pull(txn, rows)
1137 }
1138}