reifydb_sub_flow/operator/window/
sliding.rs1use reifydb_core::{
5 common::{WindowKind, WindowSize},
6 interface::change::{Change, Diff},
7 value::column::columns::Columns,
8};
9use reifydb_runtime::hash::Hash128;
10use reifydb_type::{Result, value::datetime::DateTime};
11
12use super::{WindowEvent, WindowLayout, WindowOperator};
13use crate::transaction::FlowTransaction;
14
15impl WindowOperator {
16 pub fn get_sliding_window_ids(&self, timestamp_or_row_index: u64) -> Vec<u64> {
18 match &self.kind {
19 WindowKind::Sliding {
20 size: WindowSize::Duration(duration),
21 slide: WindowSize::Duration(slide_duration),
22 } => {
23 let window_size_ms = duration.as_millis() as u64;
24 let slide_ms = slide_duration.as_millis() as u64;
25 let timestamp = timestamp_or_row_index;
26
27 if slide_ms >= window_size_ms {
28 vec![timestamp / slide_ms]
29 } else {
30 let min_window_id = if timestamp >= window_size_ms {
31 (timestamp - window_size_ms + 1) / slide_ms
32 } else {
33 0
34 };
35 let max_window_id = timestamp / slide_ms;
36 (min_window_id..=max_window_id)
37 .filter(|&wid| {
38 let start = wid * slide_ms;
39 timestamp >= start && timestamp < start + window_size_ms
40 })
41 .collect()
42 }
43 }
44 WindowKind::Sliding {
45 size: WindowSize::Count(count),
46 slide: WindowSize::Count(slide_count),
47 } => {
48 let row_number = timestamp_or_row_index + 1; let min_window = if row_number > *count {
50 (row_number - *count) / *slide_count
51 } else {
52 0
53 };
54 let max_window = (row_number - 1) / *slide_count;
55 (min_window..=max_window)
56 .filter(|&wid| {
57 let start_row = wid * *slide_count + 1;
58 let end_row = start_row + *count - 1;
59 row_number >= start_row && row_number <= end_row
60 })
61 .collect()
62 }
63 _ => vec![0],
64 }
65 }
66
67 pub fn set_sliding_window_start(&self, timestamp: u64, window_id: u64) -> u64 {
69 match &self.kind {
70 WindowKind::Sliding {
71 slide: WindowSize::Duration(slide_duration),
72 ..
73 } => {
74 let slide_ms = slide_duration.as_millis() as u64;
75 window_id * slide_ms
76 }
77 _ => timestamp,
78 }
79 }
80}
81
82fn process_sliding_group_insert(
84 operator: &WindowOperator,
85 txn: &mut FlowTransaction,
86 columns: &Columns,
87 group_hash: Hash128,
88 changed_at: DateTime,
89) -> Result<Vec<Diff>> {
90 let mut result = Vec::new();
91 let row_count = columns.row_count();
92 if row_count == 0 {
93 return Ok(result);
94 }
95
96 let timestamps = operator.resolve_event_timestamps(columns, row_count)?;
97
98 for (row_idx, ×tamp) in timestamps.iter().enumerate() {
99 let (event_timestamp, window_ids) = if operator.is_count_based() {
100 let event_timestamp = operator.current_timestamp();
101 let group_count = operator.get_and_increment_global_count(txn, group_hash)?;
102 (event_timestamp, operator.get_sliding_window_ids(group_count))
103 } else {
104 (timestamp, operator.get_sliding_window_ids(timestamp))
105 };
106
107 let single_row_columns = columns.extract_row(row_idx);
108 let projected = operator.project_columns(&single_row_columns);
109 let row = projected.to_single_row();
110 let row_layout = WindowLayout::from_row(&row);
111
112 for window_id in window_ids {
113 let window_key = operator.create_window_key(group_hash, window_id);
114 let mut window_state = operator.load_window_state(txn, &window_key)?;
115
116 if window_state.window_layout.is_none() {
117 window_state.window_layout = Some(row_layout.clone());
118 }
119 let layout = window_state.layout().clone();
120
121 let previous_aggregation = if !window_state.events.is_empty() {
122 operator.apply_aggregations(
123 txn,
124 &window_key,
125 &layout,
126 &window_state.events,
127 changed_at,
128 )?
129 } else {
130 None
131 };
132
133 let event = WindowEvent::from_row(&row, event_timestamp);
134 let event_row_number = event.row_number;
135 window_state.events.push(event);
136 window_state.event_count += 1;
137 window_state.last_event_time = event_timestamp;
138
139 if window_state.window_start == 0 {
140 window_state.window_start =
141 operator.set_sliding_window_start(event_timestamp, window_id);
142 }
143
144 if let Some((aggregated_row, is_new)) = operator.apply_aggregations(
145 txn,
146 &window_key,
147 &layout,
148 &window_state.events,
149 changed_at,
150 )? {
151 result.push(WindowOperator::emit_aggregation_diff(
152 &aggregated_row,
153 is_new,
154 previous_aggregation,
155 ));
156 }
157
158 operator.save_window_state(txn, &window_key, &window_state)?;
159 operator.store_row_index(txn, group_hash, event_row_number, window_id)?;
160 }
161 }
162
163 Ok(result)
164}
165
166pub fn apply_sliding_window(operator: &WindowOperator, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
168 let changed_at = change.changed_at;
169 let diffs = operator.apply_window_change(txn, &change, true, |op, txn, columns| {
170 op.process_insert(txn, columns, changed_at, process_sliding_group_insert)
171 })?;
172 Ok(Change::from_flow(operator.node, change.version, diffs, change.changed_at))
173}