1use std::sync::Arc;
5
6use reifydb_core::{
7 common::{CommitVersion, WindowKind, WindowSize},
8 error::diagnostic::flow::{flow_window_timestamp_column_not_found, flow_window_timestamp_column_type_mismatch},
9 interface::catalog::flow::FlowNodeId,
10 internal,
11 key::{EncodableKey, flow_node_state::FlowNodeStateKey},
12};
13use serde::{Deserialize, Serialize};
14
15use crate::{
16 operator::{Operator, Operators},
17 transaction::FlowTransaction,
18};
19
20pub mod rolling;
21pub mod session;
22pub mod sliding;
23pub mod tumbling;
24
25use rolling::apply_rolling_window;
26use session::apply_session_window;
27use sliding::apply_sliding_window;
28use tumbling::apply_tumbling_window;
29
30static EMPTY_PARAMS: Params = Params::None;
31
32use std::{ops, sync::LazyLock, time::Duration};
33
34use postcard::{from_bytes, to_stdvec};
35use reifydb_core::{
36 encoded::{
37 key::{EncodedKey, EncodedKeyRange},
38 row::EncodedRow,
39 schema::{RowSchema, RowSchemaField},
40 },
41 interface::change::{Change, Diff},
42 row::Row,
43 util::encoding::keycode::serializer::KeySerializer,
44 value::column::{Column, columns::Columns, data::ColumnData},
45};
46use reifydb_engine::{
47 expression::{
48 compile::{CompiledExpr, compile_expression},
49 context::{CompileContext, EvalSession},
50 },
51 vm::stack::SymbolTable,
52};
53use reifydb_routine::function::registry::Functions;
54use reifydb_rql::expression::{
55 Expression,
56 name::{collect_all_column_names, column_name_from_expression},
57};
58use reifydb_runtime::{
59 context::RuntimeContext,
60 hash::{Hash128, xxh3_128},
61};
62use reifydb_type::{
63 Result,
64 error::Error,
65 fragment::Fragment,
66 params::Params,
67 util::cowvec::CowVec,
68 value::{Value, blob::Blob, datetime::DateTime, identity::IdentityId, row_number::RowNumber, r#type::Type},
69};
70
71use crate::operator::stateful::{raw::RawStatefulOperator, row::RowNumberProvider, window::WindowStateful};
72
73static EMPTY_SYMBOL_TABLE: LazyLock<SymbolTable> = LazyLock::new(|| SymbolTable::new());
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct WindowLayout {
78 pub names: Vec<String>,
79 pub types: Vec<Type>,
80}
81
82impl WindowLayout {
83 pub fn from_row(row: &Row) -> Self {
84 Self {
85 names: row.schema.field_names().map(|s| s.to_string()).collect(),
86 types: row.schema.fields().iter().map(|f| f.constraint.get_type()).collect(),
87 }
88 }
89
90 pub fn to_schema(&self) -> RowSchema {
91 let fields: Vec<RowSchemaField> = self
92 .names
93 .iter()
94 .zip(self.types.iter())
95 .map(|(name, ty)| RowSchemaField::unconstrained(name.clone(), ty.clone()))
96 .collect();
97 RowSchema::new(fields)
98 }
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize)]
103pub struct WindowEvent {
104 pub row_number: RowNumber,
105 pub timestamp: u64,
106 #[serde(with = "serde_bytes")]
107 pub encoded_bytes: Vec<u8>,
108}
109
110impl WindowEvent {
111 pub fn from_row(row: &Row, timestamp: u64) -> Self {
112 Self {
113 row_number: row.number,
114 timestamp,
115 encoded_bytes: row.encoded.as_slice().to_vec(),
116 }
117 }
118
119 pub fn to_row(&self, layout: &WindowLayout) -> Row {
120 let schema = layout.to_schema();
121 let encoded = EncodedRow(CowVec::new(self.encoded_bytes.clone()));
122 Row {
123 number: self.row_number,
124 encoded,
125 schema,
126 }
127 }
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize)]
132pub struct WindowState {
133 pub events: Vec<WindowEvent>,
135 pub window_layout: Option<WindowLayout>,
137 pub window_start: u64,
139 pub event_count: u64,
141 pub last_event_time: u64,
143}
144
145impl WindowState {
146 pub fn layout(&self) -> &WindowLayout {
148 self.window_layout.as_ref().expect("WindowState layout must be set before accessing")
149 }
150}
151
152impl Default for WindowState {
153 fn default() -> Self {
154 Self {
155 events: Vec::new(),
156 window_layout: None,
157 window_start: 0,
158 event_count: 0,
159 last_event_time: 0,
160 }
161 }
162}
163
164pub struct WindowOperator {
166 pub parent: Arc<Operators>,
167 pub node: FlowNodeId,
168 pub kind: WindowKind,
169 pub group_by: Vec<Expression>,
170 pub aggregations: Vec<Expression>,
171 pub ts: Option<String>,
172 pub compiled_group_by: Vec<CompiledExpr>,
173 pub compiled_aggregations: Vec<CompiledExpr>,
174 pub layout: RowSchema,
175 pub functions: Functions,
176 pub row_number_provider: RowNumberProvider,
177 pub runtime_context: RuntimeContext,
178 pub projected_columns: Vec<String>,
181}
182
183impl WindowOperator {
184 pub fn new(
185 parent: Arc<Operators>,
186 node: FlowNodeId,
187 kind: WindowKind,
188 group_by: Vec<Expression>,
189 aggregations: Vec<Expression>,
190 ts: Option<String>,
191 runtime_context: RuntimeContext,
192 functions: Functions,
193 ) -> Self {
194 let symbols = SymbolTable::new();
195 let compile_ctx = CompileContext {
196 functions: &functions,
197 symbols: &symbols,
198 };
199
200 let compiled_group_by: Vec<CompiledExpr> = group_by
202 .iter()
203 .map(|e| compile_expression(&compile_ctx, e).expect("Failed to compile group_by expression"))
204 .collect();
205
206 let compiled_aggregations: Vec<CompiledExpr> = aggregations
208 .iter()
209 .map(|e| compile_expression(&compile_ctx, e).expect("Failed to compile aggregation expression"))
210 .collect();
211
212 let mut needed = collect_all_column_names(&group_by);
213 needed.extend(collect_all_column_names(&aggregations));
214 let mut projected_columns: Vec<String> = needed.into_iter().collect();
215 projected_columns.sort();
216
217 Self {
218 parent,
219 node,
220 kind,
221 group_by,
222 aggregations,
223 ts,
224 compiled_group_by,
225 compiled_aggregations,
226 layout: RowSchema::testing(&[Type::Blob]),
227 functions,
228 row_number_provider: RowNumberProvider::new(node),
229 runtime_context,
230 projected_columns,
231 }
232 }
233
234 pub fn current_timestamp(&self) -> u64 {
236 self.runtime_context.clock.now_millis()
237 }
238
239 pub fn project_columns(&self, columns: &Columns) -> Columns {
241 if self.projected_columns.is_empty() {
242 return columns.clone();
243 }
244 columns.project_by_names(&self.projected_columns)
245 }
246
247 pub fn is_count_based(&self) -> bool {
249 self.kind.size().map_or(false, |m| m.is_count())
250 }
251
252 pub fn size_duration(&self) -> Option<Duration> {
254 self.kind.size().and_then(|m| m.as_duration())
255 }
256
257 pub fn size_count(&self) -> Option<u64> {
259 self.kind.size().and_then(|m| m.as_count())
260 }
261
262 fn eval_session(&self, is_aggregate: bool) -> EvalSession<'_> {
263 EvalSession {
264 params: &EMPTY_PARAMS,
265 symbols: &EMPTY_SYMBOL_TABLE,
266 functions: &self.functions,
267 runtime_context: &self.runtime_context,
268 arena: None,
269 identity: IdentityId::root(),
270 is_aggregate_context: is_aggregate,
271 }
272 }
273
274 pub fn compute_group_keys(&self, columns: &Columns) -> Result<Vec<Hash128>> {
276 let row_count = columns.row_count();
277 if row_count == 0 {
278 return Ok(Vec::new());
279 }
280
281 if self.compiled_group_by.is_empty() {
282 return Ok(vec![Hash128::from(0u128); row_count]);
283 }
284
285 let session = self.eval_session(false);
286 let exec_ctx = session.eval(columns.clone(), row_count);
287
288 let mut group_columns: Vec<Column> = Vec::new();
289 for compiled_expr in &self.compiled_group_by {
290 let col = compiled_expr.execute(&exec_ctx)?;
291 group_columns.push(col);
292 }
293
294 let mut hashes = Vec::with_capacity(row_count);
295 for row_idx in 0..row_count {
296 let mut data = Vec::new();
297 for col in &group_columns {
298 let value = col.data().get_value(row_idx);
299 let value_str = value.to_string();
300 data.extend_from_slice(value_str.as_bytes());
301 }
302 hashes.push(xxh3_128(&data));
303 }
304
305 Ok(hashes)
306 }
307
308 pub fn resolve_event_timestamps(&self, columns: &Columns, row_count: usize) -> Result<Vec<u64>> {
312 if row_count == 0 {
313 return Ok(Vec::new());
314 }
315 match &self.ts {
316 Some(ts_col) => {
317 let col = columns
318 .column(ts_col)
319 .ok_or_else(|| Error(flow_window_timestamp_column_not_found(ts_col)))?;
320 let mut timestamps = Vec::with_capacity(row_count);
321 for i in 0..row_count {
322 match col.data().get_value(i) {
323 Value::DateTime(dt) => timestamps.push(dt.timestamp_millis() as u64),
324 other => {
325 return Err(Error(flow_window_timestamp_column_type_mismatch(
326 ts_col,
327 other.get_type(),
328 )));
329 }
330 }
331 }
332 Ok(timestamps)
333 }
334 None => {
335 let now = self.current_timestamp();
336 Ok(vec![now; row_count])
337 }
338 }
339 }
340
341 pub fn create_window_key(&self, group_hash: Hash128, window_id: u64) -> EncodedKey {
343 let mut serializer = KeySerializer::with_capacity(32);
344 serializer.extend_bytes(b"win:");
345 serializer.extend_u128(group_hash);
346 serializer.extend_u64(window_id);
347 EncodedKey::new(serializer.finish())
348 }
349
350 fn create_row_index_key(&self, group_hash: Hash128, row_number: RowNumber) -> EncodedKey {
352 let mut serializer = KeySerializer::with_capacity(32);
353 serializer.extend_bytes(b"idx:");
354 serializer.extend_u128(group_hash);
355 serializer.extend_u64(row_number.0);
356 EncodedKey::new(serializer.finish())
357 }
358
359 pub fn store_row_index(
362 &self,
363 txn: &mut FlowTransaction,
364 group_hash: Hash128,
365 row_number: RowNumber,
366 window_id: u64,
367 ) -> Result<()> {
368 let index_key = self.create_row_index_key(group_hash, row_number);
369 let mut window_ids = self.lookup_row_index(txn, group_hash, row_number)?;
370 if !window_ids.contains(&window_id) {
371 window_ids.push(window_id);
372 }
373 let serialized =
374 to_stdvec(&window_ids).map_err(|e| Error(internal!("Failed to serialize row index: {}", e)))?;
375 let mut state_row = self.layout.allocate();
376 let blob = Blob::from(serialized);
377 self.layout.set_blob(&mut state_row, 0, &blob);
378 self.save_state(txn, &index_key, state_row)
379 }
380
381 fn lookup_row_index(
383 &self,
384 txn: &mut FlowTransaction,
385 group_hash: Hash128,
386 row_number: RowNumber,
387 ) -> Result<Vec<u64>> {
388 let index_key = self.create_row_index_key(group_hash, row_number);
389 let state_row = self.load_state(txn, &index_key)?;
390 if state_row.is_empty() || !state_row.is_defined(0) {
391 return Ok(Vec::new());
392 }
393 let blob = self.layout.get_blob(&state_row, 0);
394 if blob.is_empty() {
395 return Ok(Vec::new());
396 }
397 let window_ids: Vec<u64> = from_bytes(blob.as_ref())
398 .map_err(|e| Error(internal!("Failed to deserialize row index: {}", e)))?;
399 Ok(window_ids)
400 }
401
402 fn replace_event_in_windows(
405 &self,
406 txn: &mut FlowTransaction,
407 group_hash: Hash128,
408 row_number: RowNumber,
409 post_row: &Row,
410 post_timestamp: u64,
411 ) -> Result<Vec<Diff>> {
412 let window_ids = self.lookup_row_index(txn, group_hash, row_number)?;
413 if window_ids.is_empty() {
414 return Ok(Vec::new());
415 }
416
417 let mut result = Vec::new();
418
419 for window_id in &window_ids {
420 let window_key = self.create_window_key(group_hash, *window_id);
421 let mut window_state = self.load_window_state(txn, &window_key)?;
422
423 let event_idx = window_state.events.iter().position(|e| e.row_number == row_number);
424 if let Some(idx) = event_idx {
425 let layout = match &window_state.window_layout {
426 Some(l) => l.clone(),
427 None => continue,
428 };
429
430 let pre_aggregation =
431 self.apply_aggregations(txn, &window_key, &layout, &window_state.events)?;
432
433 window_state.events[idx] = WindowEvent::from_row(post_row, post_timestamp);
434
435 let post_aggregation =
436 self.apply_aggregations(txn, &window_key, &layout, &window_state.events)?;
437
438 self.save_window_state(txn, &window_key, &window_state)?;
439
440 if let (Some((pre_row, _)), Some((post_row, _))) = (pre_aggregation, post_aggregation) {
441 result.push(Diff::Update {
442 pre: Columns::from_row(&pre_row),
443 post: Columns::from_row(&post_row),
444 });
445 }
446 }
447 }
448
449 Ok(result)
450 }
451
452 fn process_event_updates(&self, txn: &mut FlowTransaction, pre: &Columns, post: &Columns) -> Result<Vec<Diff>> {
454 let row_count = pre.row_count();
455 if row_count == 0 {
456 return Ok(Vec::new());
457 }
458
459 let group_hashes = self.compute_group_keys(pre)?;
460 let post_timestamps = self.resolve_event_timestamps(post, row_count)?;
461 let mut result = Vec::new();
462
463 for row_idx in 0..row_count {
464 let row_number = pre.row_numbers[row_idx];
465 let group_hash = group_hashes[row_idx];
466 let post_timestamp = post_timestamps[row_idx];
467
468 let single_row = post.extract_row(row_idx);
469 let projected = self.project_columns(&single_row);
470 let post_row = projected.to_single_row();
471
472 let diffs =
473 self.replace_event_in_windows(txn, group_hash, row_number, &post_row, post_timestamp)?;
474 result.extend(diffs);
475 }
476
477 Ok(result)
478 }
479
480 fn remove_event_from_windows(
482 &self,
483 txn: &mut FlowTransaction,
484 group_hash: Hash128,
485 row_number: RowNumber,
486 ) -> Result<Vec<Diff>> {
487 let window_ids = self.lookup_row_index(txn, group_hash, row_number)?;
488 if window_ids.is_empty() {
489 return Ok(Vec::new());
490 }
491
492 let mut result = Vec::new();
493
494 for window_id in &window_ids {
495 let window_key = self.create_window_key(group_hash, *window_id);
496 let mut window_state = self.load_window_state(txn, &window_key)?;
497
498 let event_idx = window_state.events.iter().position(|e| e.row_number == row_number);
499 if let Some(idx) = event_idx {
500 let layout = match &window_state.window_layout {
501 Some(l) => l.clone(),
502 None => continue,
503 };
504
505 let pre_aggregation =
506 self.apply_aggregations(txn, &window_key, &layout, &window_state.events)?;
507
508 window_state.events.remove(idx);
509 window_state.event_count = window_state.event_count.saturating_sub(1);
510
511 if window_state.events.is_empty() {
512 self.save_window_state(txn, &window_key, &window_state)?;
513 if let Some((pre_row, _)) = pre_aggregation {
514 result.push(Diff::Remove {
515 pre: Columns::from_row(&pre_row),
516 });
517 }
518 } else {
519 let post_aggregation = self.apply_aggregations(
520 txn,
521 &window_key,
522 &layout,
523 &window_state.events,
524 )?;
525 self.save_window_state(txn, &window_key, &window_state)?;
526
527 if let (Some((pre_row, _)), Some((post_row, _))) =
528 (pre_aggregation, post_aggregation)
529 {
530 result.push(Diff::Update {
531 pre: Columns::from_row(&pre_row),
532 post: Columns::from_row(&post_row),
533 });
534 }
535 }
536 }
537 }
538
539 let index_key = self.create_row_index_key(group_hash, row_number);
541 let empty = self.layout.allocate();
542 self.save_state(txn, &index_key, empty)?;
543
544 Ok(result)
545 }
546
547 fn process_event_removals(&self, txn: &mut FlowTransaction, pre: &Columns) -> Result<Vec<Diff>> {
549 let row_count = pre.row_count();
550 if row_count == 0 {
551 return Ok(Vec::new());
552 }
553
554 let group_hashes = self.compute_group_keys(pre)?;
555 let mut result = Vec::new();
556
557 for row_idx in 0..row_count {
558 let row_number = pre.row_numbers[row_idx];
559 let group_hash = group_hashes[row_idx];
560
561 let diffs = self.remove_event_from_windows(txn, group_hash, row_number)?;
562 result.extend(diffs);
563 }
564
565 Ok(result)
566 }
567
568 pub fn extract_group_values(
571 &self,
572 window_layout: &WindowLayout,
573 events: &[WindowEvent],
574 ) -> Result<(Vec<Value>, Vec<String>)> {
575 if events.is_empty() || self.group_by.is_empty() {
576 return Ok((Vec::new(), Vec::new()));
577 }
578
579 let columns = self.events_to_columns(window_layout, events)?;
580 let row_count = columns.row_count();
581 if row_count == 0 {
582 return Ok((Vec::new(), Vec::new()));
583 }
584
585 let session = self.eval_session(false);
586 let exec_ctx = session.eval(columns, row_count);
587
588 let mut values = Vec::new();
589 let mut names = Vec::new();
590 for (i, compiled_expr) in self.compiled_group_by.iter().enumerate() {
591 let col = compiled_expr.execute(&exec_ctx)?;
592 values.push(col.data().get_value(0).clone());
593 names.push(column_name_from_expression(&self.group_by[i]).text().to_string());
594 }
595
596 Ok((values, names))
597 }
598
599 pub fn events_to_columns(&self, window_layout: &WindowLayout, events: &[WindowEvent]) -> Result<Columns> {
601 if events.is_empty() {
602 return Ok(Columns::new(Vec::new()));
603 }
604
605 let mut builders: Vec<ColumnData> = window_layout
606 .types
607 .iter()
608 .map(|ty| ColumnData::with_capacity(ty.clone(), events.len()))
609 .collect();
610
611 for event in events.iter() {
612 let row = event.to_row(window_layout);
613 for (idx, builder) in builders.iter_mut().enumerate() {
614 let value = row.schema.get_value(&row.encoded, idx);
615 builder.push_value(value);
616 }
617 }
618
619 let columns = window_layout
620 .names
621 .iter()
622 .zip(builders.into_iter())
623 .map(|(name, data)| Column {
624 name: Fragment::internal(name.clone()),
625 data,
626 })
627 .collect();
628
629 Ok(Columns::new(columns))
630 }
631
632 pub fn apply_aggregations(
634 &self,
635 txn: &mut FlowTransaction,
636 window_key: &EncodedKey,
637 window_layout: &WindowLayout,
638 events: &[WindowEvent],
639 ) -> Result<Option<(Row, bool)>> {
640 if events.is_empty() {
641 return Ok(None);
642 }
643
644 if self.aggregations.is_empty() {
645 let (result_row_number, is_new) =
647 self.row_number_provider.get_or_create_row_number(txn, window_key)?;
648 let mut result_row = events[0].to_row(window_layout);
649 result_row.number = result_row_number;
650 return Ok(Some((result_row, is_new)));
651 }
652
653 let columns = self.events_to_columns(window_layout, events)?;
654
655 let agg_session = self.eval_session(true);
656 let exec_ctx = agg_session.eval(columns, events.len());
657
658 let (group_values, group_names) = self.extract_group_values(window_layout, events)?;
659
660 let mut result_values = Vec::new();
661 let mut result_names = Vec::new();
662 let mut result_types = Vec::new();
663
664 for (value, name) in group_values.into_iter().zip(group_names.into_iter()) {
665 result_values.push(value.clone());
666 result_names.push(name);
667 result_types.push(value.get_type());
668 }
669
670 for (i, compiled_aggregation) in self.compiled_aggregations.iter().enumerate() {
671 let agg_column = compiled_aggregation.execute(&exec_ctx)?;
672
673 let value = agg_column.data().get_value(0);
674 result_values.push(value.clone());
675 result_names.push(column_name_from_expression(&self.aggregations[i]).text().to_string());
676 result_types.push(value.get_type());
677 }
678
679 let fields: Vec<RowSchemaField> = result_names
680 .iter()
681 .zip(result_types.iter())
682 .map(|(name, ty)| RowSchemaField::unconstrained(name.clone(), ty.clone()))
683 .collect();
684 let layout = RowSchema::new(fields);
685 let mut encoded = layout.allocate();
686 layout.set_values(&mut encoded, &result_values);
687
688 let (result_row_number, is_new) = self.row_number_provider.get_or_create_row_number(txn, window_key)?;
689
690 let result_row = Row {
691 number: result_row_number,
692 encoded,
693 schema: layout,
694 };
695
696 Ok(Some((result_row, is_new)))
697 }
698
699 pub fn process_expired_windows(&self, txn: &mut FlowTransaction, current_timestamp: u64) -> Result<Vec<Diff>> {
702 let mut result = Vec::new();
703
704 if let Some(duration) = self.size_duration() {
705 let window_size_ms = duration.as_millis() as u64;
706 if window_size_ms > 0 {
707 let expire_before = current_timestamp.saturating_sub(window_size_ms * 2);
708 let cutoff_id = expire_before / window_size_ms;
709 if cutoff_id == 0 {
710 return Ok(result);
711 }
712
713 let groups = self.load_group_registry(txn)?;
714 for group_hash in &groups {
715 let low_key = self.create_window_key(*group_hash, cutoff_id);
717 let high_key = self.create_window_key(*group_hash, 0);
718 let range = EncodedKeyRange::new(
719 ops::Bound::Excluded(low_key),
720 ops::Bound::Included(high_key),
721 );
722
723 let expired_keys = self.scan_keys_in_range(txn, &range)?;
724 for key in &expired_keys {
725 let window_state = self.load_window_state(txn, key)?;
726 if !window_state.events.is_empty() {
727 if let Some(layout) = &window_state.window_layout {
728 if let Some((row, _)) = self.apply_aggregations(
729 txn,
730 key,
731 layout,
732 &window_state.events,
733 )? {
734 result.push(Diff::Remove {
735 pre: Columns::from_row(&row),
736 });
737 }
738 }
739 }
740 }
741
742 if !expired_keys.is_empty() {
743 let low_key = self.create_window_key(*group_hash, cutoff_id);
744 let high_key = self.create_window_key(*group_hash, 0);
745 let range = EncodedKeyRange::new(
746 ops::Bound::Excluded(low_key),
747 ops::Bound::Included(high_key),
748 );
749 let _ = self.expire_range(txn, range)?;
750 }
751 }
752 }
753 }
754
755 Ok(result)
756 }
757
758 pub fn load_window_state(&self, txn: &mut FlowTransaction, window_key: &EncodedKey) -> Result<WindowState> {
760 let state_row = self.load_state(txn, window_key)?;
761
762 if state_row.is_empty() || !state_row.is_defined(0) {
763 return Ok(WindowState::default());
764 }
765
766 let blob = self.layout.get_blob(&state_row, 0);
767 if blob.is_empty() {
768 return Ok(WindowState::default());
769 }
770
771 from_bytes(blob.as_ref()).map_err(|e| Error(internal!("Failed to deserialize WindowState: {}", e)))
772 }
773
774 pub fn save_window_state(
776 &self,
777 txn: &mut FlowTransaction,
778 window_key: &EncodedKey,
779 state: &WindowState,
780 ) -> Result<()> {
781 let serialized =
782 to_stdvec(state).map_err(|e| Error(internal!("Failed to serialize WindowState: {}", e)))?;
783
784 let mut state_row = self.layout.allocate();
785 let blob = Blob::from(serialized);
786 self.layout.set_blob(&mut state_row, 0, &blob);
787
788 self.save_state(txn, window_key, state_row)
789 }
790
791 pub fn get_and_increment_global_count(&self, txn: &mut FlowTransaction, group_hash: Hash128) -> Result<u64> {
793 let count_key = self.create_count_key(group_hash);
794 let count_row = self.load_state(txn, &count_key)?;
795
796 let current_count = if count_row.is_empty() || !count_row.is_defined(0) {
797 0
798 } else {
799 let blob = self.layout.get_blob(&count_row, 0);
800 if blob.is_empty() {
801 0
802 } else {
803 from_bytes(blob.as_ref()).unwrap_or(0)
804 }
805 };
806
807 let new_count = current_count + 1;
808
809 let serialized =
810 to_stdvec(&new_count).map_err(|e| Error(internal!("Failed to serialize count: {}", e)))?;
811
812 let mut count_state_row = self.layout.allocate();
813 let blob = Blob::from(serialized);
814 self.layout.set_blob(&mut count_state_row, 0, &blob);
815
816 self.save_state(txn, &count_key, count_state_row)?;
817
818 Ok(current_count)
819 }
820
821 pub fn create_count_key(&self, group_hash: Hash128) -> EncodedKey {
823 let mut serializer = KeySerializer::with_capacity(32);
824 serializer.extend_bytes(b"cnt:");
825 serializer.extend_u128(group_hash);
826 EncodedKey::new(serializer.finish())
827 }
828
829 fn create_group_registry_key(&self) -> EncodedKey {
831 EncodedKey::new(b"grp:")
832 }
833
834 pub fn load_group_registry(&self, txn: &mut FlowTransaction) -> Result<Vec<Hash128>> {
836 let key = self.create_group_registry_key();
837 let state_row = self.load_state(txn, &key)?;
838 if state_row.is_empty() || !state_row.is_defined(0) {
839 return Ok(Vec::new());
840 }
841 let blob = self.layout.get_blob(&state_row, 0);
842 if blob.is_empty() {
843 return Ok(Vec::new());
844 }
845 let groups: Vec<u128> = from_bytes(blob.as_ref()).unwrap_or_default();
846 Ok(groups.into_iter().map(Hash128::from).collect())
847 }
848
849 fn save_group_registry(&self, txn: &mut FlowTransaction, groups: &[Hash128]) -> Result<()> {
851 let key = self.create_group_registry_key();
852 let raw: Vec<u128> = groups.iter().map(|h| (*h).into()).collect();
853 let serialized =
854 to_stdvec(&raw).map_err(|e| Error(internal!("Failed to serialize group registry: {}", e)))?;
855 let mut state_row = self.layout.allocate();
856 let blob = Blob::from(serialized);
857 self.layout.set_blob(&mut state_row, 0, &blob);
858 self.save_state(txn, &key, state_row)
859 }
860
861 pub fn register_group(&self, txn: &mut FlowTransaction, group_hash: Hash128) -> Result<()> {
863 let mut groups = self.load_group_registry(txn)?;
864 if !groups.contains(&group_hash) {
865 groups.push(group_hash);
866 self.save_group_registry(txn, &groups)?;
867 }
868 Ok(())
869 }
870
871 pub fn tick_expire_windows(&self, txn: &mut FlowTransaction, current_timestamp: u64) -> Result<Vec<Diff>> {
874 let mut result = Vec::new();
875 let window_size_ms = match self.size_duration() {
876 Some(d) => d.as_millis() as u64,
877 None => return Ok(result),
878 };
879 if window_size_ms == 0 {
880 return Ok(result);
881 }
882
883 let all_state = txn.state_scan(self.node)?;
885 let prefix = FlowNodeStateKey::new(self.node, vec![]).encode();
886 let win_marker = b"win:";
887
888 let mut keys_to_remove = Vec::new();
889
890 for item in &all_state.items {
891 let full_key = &item.key;
893 if full_key.len() <= prefix.len() {
894 continue;
895 }
896 let inner = &full_key[prefix.len()..];
897
898 if !inner.starts_with(win_marker) {
900 continue;
901 }
902
903 let window_key = EncodedKey::new(inner);
904 let window_state = self.load_window_state(txn, &window_key)?;
905 if window_state.events.is_empty() {
906 continue;
907 }
908
909 let newest_event_time = window_state.events.iter().map(|e| e.timestamp).max().unwrap_or(0);
911 if current_timestamp.saturating_sub(newest_event_time) > window_size_ms {
912 if let Some(layout) = &window_state.window_layout {
913 if let Some((row, _)) =
914 self.apply_aggregations(txn, &window_key, layout, &window_state.events)?
915 {
916 result.push(Diff::Remove {
917 pre: Columns::from_row(&row),
918 });
919 }
920 }
921 keys_to_remove.push(window_key);
922 }
923 }
924
925 for key in &keys_to_remove {
927 let empty = self.create_state();
928 self.save_state(txn, key, empty)?;
929 }
930
931 Ok(result)
932 }
933
934 pub fn process_insert(
936 &self,
937 txn: &mut FlowTransaction,
938 columns: &Columns,
939 group_fn: impl Fn(&WindowOperator, &mut FlowTransaction, &Columns, Hash128) -> Result<Vec<Diff>>,
940 ) -> Result<Vec<Diff>> {
941 let row_count = columns.row_count();
942 if row_count == 0 {
943 return Ok(Vec::new());
944 }
945 let group_hashes = self.compute_group_keys(columns)?;
946 let groups = columns.partition_by_keys(&group_hashes);
947 let mut result = Vec::new();
948 for (group_hash, group_columns) in groups {
949 self.register_group(txn, group_hash)?;
950 let group_result = group_fn(self, txn, &group_columns, group_hash)?;
951 result.extend(group_result);
952 }
953 Ok(result)
954 }
955
956 pub fn apply_window_change(
959 &self,
960 txn: &mut FlowTransaction,
961 change: &Change,
962 expire: bool,
963 process_fn: impl Fn(&WindowOperator, &mut FlowTransaction, &Columns) -> Result<Vec<Diff>>,
964 ) -> Result<Vec<Diff>> {
965 let mut result = Vec::new();
966 if expire {
967 let current_timestamp = self.current_timestamp();
968 let expired_diffs = self.process_expired_windows(txn, current_timestamp)?;
969 result.extend(expired_diffs);
970 }
971 for diff in change.diffs.iter() {
972 match diff {
973 Diff::Insert {
974 post,
975 } => {
976 result.extend(process_fn(self, txn, post)?);
977 }
978 Diff::Update {
979 pre,
980 post,
981 } => {
982 result.extend(self.process_event_updates(txn, pre, post)?);
983 }
984 Diff::Remove {
985 pre,
986 } => {
987 result.extend(self.process_event_removals(txn, pre)?);
988 }
989 }
990 }
991 Ok(result)
992 }
993
994 pub fn emit_aggregation_diff(
997 aggregated_row: &Row,
998 is_new: bool,
999 previous_aggregation: Option<(Row, bool)>,
1000 ) -> Diff {
1001 if is_new {
1002 Diff::Insert {
1003 post: Columns::from_row(aggregated_row),
1004 }
1005 } else if let Some((previous_row, _)) = previous_aggregation {
1006 Diff::Update {
1007 pre: Columns::from_row(&previous_row),
1008 post: Columns::from_row(aggregated_row),
1009 }
1010 } else {
1011 Diff::Insert {
1012 post: Columns::from_row(aggregated_row),
1013 }
1014 }
1015 }
1016}
1017
1018impl RawStatefulOperator for WindowOperator {}
1019
1020impl WindowStateful for WindowOperator {
1021 fn layout(&self) -> RowSchema {
1022 self.layout.clone()
1023 }
1024}
1025
1026impl Operator for WindowOperator {
1027 fn id(&self) -> FlowNodeId {
1028 self.node
1029 }
1030
1031 fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
1032 match &self.kind {
1033 WindowKind::Tumbling {
1034 ..
1035 } => apply_tumbling_window(self, txn, change),
1036 WindowKind::Sliding {
1037 ..
1038 } => apply_sliding_window(self, txn, change),
1039 WindowKind::Rolling {
1040 ..
1041 } => apply_rolling_window(self, txn, change),
1042 WindowKind::Session {
1043 ..
1044 } => apply_session_window(self, txn, change),
1045 }
1046 }
1047
1048 fn tick(&self, txn: &mut FlowTransaction, timestamp: DateTime) -> Result<Option<Change>> {
1049 let current_timestamp = (timestamp.to_nanos() / 1_000_000) as u64;
1050 let diffs = match &self.kind {
1051 WindowKind::Tumbling {
1052 ..
1053 }
1054 | WindowKind::Sliding {
1055 ..
1056 } => self.tick_expire_windows(txn, current_timestamp)?,
1057 WindowKind::Rolling {
1058 size: WindowSize::Duration(_),
1059 } => self.tick_rolling_eviction(txn, current_timestamp)?,
1060 WindowKind::Session {
1061 ..
1062 } => self.tick_session_expiration(txn, current_timestamp)?,
1063 _ => vec![],
1064 };
1065
1066 if diffs.is_empty() {
1067 Ok(None)
1068 } else {
1069 Ok(Some(Change::from_flow(self.node, CommitVersion(0), diffs)))
1070 }
1071 }
1072
1073 fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
1074 self.parent.pull(txn, rows)
1075 }
1076}