reifydb_sub_flow/operator/window/
sliding.rs1use reifydb_core::{
5 common::{WindowKind, WindowSize},
6 interface::change::{Change, Diff},
7 value::column::columns::Columns,
8};
9use reifydb_runtime::hash::Hash128;
10use reifydb_type::Result;
11
12use super::{WindowEvent, WindowLayout, WindowOperator};
13use crate::transaction::FlowTransaction;
14
15impl WindowOperator {
16 pub fn get_sliding_window_ids(&self, timestamp_or_row_index: u64) -> Vec<u64> {
18 match &self.kind {
19 WindowKind::Sliding {
20 size: WindowSize::Duration(duration),
21 slide: WindowSize::Duration(slide_duration),
22 } => {
23 let window_size_ms = duration.as_millis() as u64;
24 let slide_ms = slide_duration.as_millis() as u64;
25 let timestamp = timestamp_or_row_index;
26
27 if slide_ms >= window_size_ms {
28 vec![timestamp / slide_ms]
29 } else {
30 let min_window_id = if timestamp >= window_size_ms {
31 (timestamp - window_size_ms + 1) / slide_ms
32 } else {
33 0
34 };
35 let max_window_id = timestamp / slide_ms;
36 (min_window_id..=max_window_id)
37 .filter(|&wid| {
38 let start = wid * slide_ms;
39 timestamp >= start && timestamp < start + window_size_ms
40 })
41 .collect()
42 }
43 }
44 WindowKind::Sliding {
45 size: WindowSize::Count(count),
46 slide: WindowSize::Count(slide_count),
47 } => {
48 let row_number = timestamp_or_row_index + 1; let min_window = if row_number > *count {
50 (row_number - *count) / *slide_count
51 } else {
52 0
53 };
54 let max_window = (row_number - 1) / *slide_count;
55 (min_window..=max_window)
56 .filter(|&wid| {
57 let start_row = wid * *slide_count + 1;
58 let end_row = start_row + *count - 1;
59 row_number >= start_row && row_number <= end_row
60 })
61 .collect()
62 }
63 _ => vec![0],
64 }
65 }
66
67 pub fn set_sliding_window_start(&self, timestamp: u64, window_id: u64) -> u64 {
69 match &self.kind {
70 WindowKind::Sliding {
71 slide: WindowSize::Duration(slide_duration),
72 ..
73 } => {
74 let slide_ms = slide_duration.as_millis() as u64;
75 window_id * slide_ms
76 }
77 _ => timestamp,
78 }
79 }
80}
81
82fn process_sliding_group_insert(
84 operator: &WindowOperator,
85 txn: &mut FlowTransaction,
86 columns: &Columns,
87 group_hash: Hash128,
88) -> Result<Vec<Diff>> {
89 let mut result = Vec::new();
90 let row_count = columns.row_count();
91 if row_count == 0 {
92 return Ok(result);
93 }
94
95 let timestamps = operator.resolve_event_timestamps(columns, row_count)?;
96
97 for row_idx in 0..row_count {
98 let timestamp = timestamps[row_idx];
99 let (event_timestamp, window_ids) = if operator.is_count_based() {
100 let event_timestamp = operator.current_timestamp();
101 let group_count = operator.get_and_increment_global_count(txn, group_hash)?;
102 (event_timestamp, operator.get_sliding_window_ids(group_count))
103 } else {
104 (timestamp, operator.get_sliding_window_ids(timestamp))
105 };
106
107 let single_row_columns = columns.extract_row(row_idx);
108 let projected = operator.project_columns(&single_row_columns);
109 let row = projected.to_single_row();
110 let row_layout = WindowLayout::from_row(&row);
111
112 for window_id in window_ids {
113 let window_key = operator.create_window_key(group_hash, window_id);
114 let mut window_state = operator.load_window_state(txn, &window_key)?;
115
116 if window_state.window_layout.is_none() {
117 window_state.window_layout = Some(row_layout.clone());
118 }
119 let layout = window_state.layout().clone();
120
121 let previous_aggregation = if !window_state.events.is_empty() {
122 operator.apply_aggregations(txn, &window_key, &layout, &window_state.events)?
123 } else {
124 None
125 };
126
127 let event = WindowEvent::from_row(&row, event_timestamp);
128 let event_row_number = event.row_number;
129 window_state.events.push(event);
130 window_state.event_count += 1;
131 window_state.last_event_time = event_timestamp;
132
133 if window_state.window_start == 0 {
134 window_state.window_start =
135 operator.set_sliding_window_start(event_timestamp, window_id);
136 }
137
138 if let Some((aggregated_row, is_new)) =
139 operator.apply_aggregations(txn, &window_key, &layout, &window_state.events)?
140 {
141 result.push(WindowOperator::emit_aggregation_diff(
142 &aggregated_row,
143 is_new,
144 previous_aggregation,
145 ));
146 }
147
148 operator.save_window_state(txn, &window_key, &window_state)?;
149 operator.store_row_index(txn, group_hash, event_row_number, window_id)?;
150 }
151 }
152
153 Ok(result)
154}
155
156pub fn apply_sliding_window(operator: &WindowOperator, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
158 let diffs = operator.apply_window_change(txn, &change, true, |op, txn, columns| {
159 op.process_insert(txn, columns, process_sliding_group_insert)
160 })?;
161 Ok(Change::from_flow(operator.node, change.version, diffs))
162}