1use chrono::{DateTime, Duration, Utc};
13use shape_value::KindedSlot;
14use std::collections::HashMap;
15
16use shape_ast::error::Result;
17#[derive(Debug, Clone)]
19pub enum WindowType {
20 Tumbling { size: Duration },
22 Sliding { size: Duration, slide: Duration },
24 Session { gap: Duration },
26 Count { size: usize },
28 Cumulative,
30}
31
32impl WindowType {
33 pub fn tumbling(size: Duration) -> Self {
35 WindowType::Tumbling { size }
36 }
37
38 pub fn sliding(size: Duration, slide: Duration) -> Self {
40 WindowType::Sliding { size, slide }
41 }
42
43 pub fn session(gap: Duration) -> Self {
45 WindowType::Session { gap }
46 }
47
48 pub fn count(size: usize) -> Self {
50 WindowType::Count { size }
51 }
52
53 pub fn cumulative() -> Self {
55 WindowType::Cumulative
56 }
57}
58
59#[derive(Debug, Clone)]
61pub struct WindowDataPoint {
62 pub timestamp: DateTime<Utc>,
63 pub fields: HashMap<String, KindedSlot>,
68}
69
70#[derive(Debug, Clone)]
72pub struct WindowResult {
73 pub start: DateTime<Utc>,
75 pub end: DateTime<Utc>,
77 pub count: usize,
79 pub aggregates: HashMap<String, f64>,
81}
82
83#[derive(Debug, Clone, Copy, PartialEq, Eq)]
85pub enum AggregateFunction {
86 Sum,
87 Avg,
88 Min,
89 Max,
90 Count,
91 First,
92 Last,
93 StdDev,
94 Variance,
95}
96
97#[derive(Debug, Clone)]
99pub struct AggregateSpec {
100 pub field: String,
101 pub function: AggregateFunction,
102 pub output_name: String,
103}
104
105#[derive(Debug)]
107struct WindowState {
108 start: DateTime<Utc>,
109 data: Vec<WindowDataPoint>,
110 last_timestamp: Option<DateTime<Utc>>,
111}
112
113pub struct WindowManager {
115 window_type: WindowType,
117 aggregates: Vec<AggregateSpec>,
119 active_windows: Vec<WindowState>,
121 current_window: Option<WindowState>,
123 current_count: usize,
125 cumulative_data: Vec<WindowDataPoint>,
127 completed_windows: Vec<WindowResult>,
129}
130
131impl WindowManager {
132 pub fn new(window_type: WindowType) -> Self {
134 Self {
135 window_type,
136 aggregates: Vec::new(),
137 active_windows: Vec::new(),
138 current_window: None,
139 current_count: 0,
140 cumulative_data: Vec::new(),
141 completed_windows: Vec::new(),
142 }
143 }
144
145 pub fn aggregate(
147 &mut self,
148 field: &str,
149 function: AggregateFunction,
150 output_name: &str,
151 ) -> &mut Self {
152 self.aggregates.push(AggregateSpec {
153 field: field.to_string(),
154 function,
155 output_name: output_name.to_string(),
156 });
157 self
158 }
159
160 pub fn process(
162 &mut self,
163 timestamp: DateTime<Utc>,
164 fields: HashMap<String, KindedSlot>,
165 ) -> Result<()> {
166 let data_point = WindowDataPoint { timestamp, fields };
167
168 match &self.window_type {
169 WindowType::Tumbling { size } => {
170 self.process_tumbling(&data_point, *size)?;
171 }
172 WindowType::Sliding { size, slide } => {
173 self.process_sliding(&data_point, *size, *slide)?;
174 }
175 WindowType::Session { gap } => {
176 self.process_session(&data_point, *gap)?;
177 }
178 WindowType::Count { size } => {
179 self.process_count(&data_point, *size)?;
180 }
181 WindowType::Cumulative => {
182 self.process_cumulative(&data_point)?;
183 }
184 }
185
186 Ok(())
187 }
188
189 fn process_tumbling(&mut self, data_point: &WindowDataPoint, size: Duration) -> Result<()> {
191 let window_start = self.align_to_window(data_point.timestamp, size);
192
193 let should_close = self
195 .current_window
196 .as_ref()
197 .map(|w| data_point.timestamp >= w.start + size)
198 .unwrap_or(false);
199
200 if should_close {
201 if let Some(window) = self.current_window.take() {
203 let result = self.compute_window_result(&window)?;
204 self.completed_windows.push(result);
205 }
206 }
207
208 match &mut self.current_window {
210 Some(window) => {
211 window.data.push(data_point.clone());
212 window.last_timestamp = Some(data_point.timestamp);
213 }
214 None => {
215 self.current_window = Some(WindowState {
216 start: window_start,
217 data: vec![data_point.clone()],
218 last_timestamp: Some(data_point.timestamp),
219 });
220 }
221 }
222
223 Ok(())
224 }
225
226 fn process_sliding(
228 &mut self,
229 data_point: &WindowDataPoint,
230 size: Duration,
231 slide: Duration,
232 ) -> Result<()> {
233 let ts = data_point.timestamp;
235
236 let window_start = self.align_to_window(ts, slide);
238
239 let needs_new_window = self.active_windows.is_empty()
241 || self
242 .active_windows
243 .last()
244 .map(|w| ts >= w.start + slide)
245 .unwrap_or(true);
246
247 if needs_new_window {
248 self.active_windows.push(WindowState {
249 start: window_start,
250 data: Vec::new(),
251 last_timestamp: None,
252 });
253 }
254
255 for window in &mut self.active_windows {
257 if ts >= window.start && ts < window.start + size {
258 window.data.push(data_point.clone());
259 window.last_timestamp = Some(ts);
260 }
261 }
262
263 let mut closed_indices = Vec::new();
265 for (i, window) in self.active_windows.iter().enumerate() {
266 if ts >= window.start + size {
267 let result = self.compute_window_result(window)?;
268 self.completed_windows.push(result);
269 closed_indices.push(i);
270 }
271 }
272
273 for i in closed_indices.into_iter().rev() {
275 self.active_windows.remove(i);
276 }
277
278 Ok(())
279 }
280
281 fn process_session(&mut self, data_point: &WindowDataPoint, gap: Duration) -> Result<()> {
283 let should_close = self
285 .current_window
286 .as_ref()
287 .and_then(|w| w.last_timestamp)
288 .map(|last_ts| data_point.timestamp - last_ts > gap)
289 .unwrap_or(false);
290
291 if should_close {
292 if let Some(window) = self.current_window.take() {
293 let result = self.compute_window_result(&window)?;
294 self.completed_windows.push(result);
295 }
296 }
297
298 match &mut self.current_window {
300 Some(window) => {
301 window.data.push(data_point.clone());
302 window.last_timestamp = Some(data_point.timestamp);
303 }
304 None => {
305 self.current_window = Some(WindowState {
306 start: data_point.timestamp,
307 data: vec![data_point.clone()],
308 last_timestamp: Some(data_point.timestamp),
309 });
310 }
311 }
312
313 Ok(())
314 }
315
316 fn process_count(&mut self, data_point: &WindowDataPoint, size: usize) -> Result<()> {
318 if self.current_window.is_none() {
319 self.current_window = Some(WindowState {
320 start: data_point.timestamp,
321 data: Vec::new(),
322 last_timestamp: None,
323 });
324 }
325
326 if let Some(window) = &mut self.current_window {
328 window.data.push(data_point.clone());
329 window.last_timestamp = Some(data_point.timestamp);
330 }
331 self.current_count += 1;
332
333 if self.current_count >= size {
335 if let Some(window) = self.current_window.take() {
336 let result = self.compute_window_result(&window)?;
337 self.completed_windows.push(result);
338 }
339 self.current_count = 0;
340 }
341
342 Ok(())
343 }
344
345 fn process_cumulative(&mut self, data_point: &WindowDataPoint) -> Result<()> {
347 self.cumulative_data.push(data_point.clone());
348
349 let start = self
351 .cumulative_data
352 .first()
353 .map(|d| d.timestamp)
354 .unwrap_or(data_point.timestamp);
355 let end = data_point.timestamp;
356
357 let window = WindowState {
358 start,
359 data: self.cumulative_data.clone(),
360 last_timestamp: Some(end),
361 };
362
363 let result = self.compute_window_result(&window)?;
364 self.completed_windows.push(result);
365
366 Ok(())
367 }
368
369 fn align_to_window(&self, ts: DateTime<Utc>, size: Duration) -> DateTime<Utc> {
371 let epoch = DateTime::UNIX_EPOCH;
372 let since_epoch = ts - epoch;
373 let size_millis = size.num_milliseconds();
374
375 if size_millis == 0 {
376 return ts;
377 }
378
379 let aligned_millis = (since_epoch.num_milliseconds() / size_millis) * size_millis;
380 epoch + Duration::milliseconds(aligned_millis)
381 }
382
383 fn compute_window_result(&self, window: &WindowState) -> Result<WindowResult> {
385 let mut aggregates = HashMap::new();
386
387 for spec in &self.aggregates {
388 let values: Vec<f64> = window
389 .data
390 .iter()
391 .filter_map(|d| d.fields.get(&spec.field).map(|v| v.slot().as_f64()))
392 .collect();
393
394 let result = self.compute_aggregate(&values, spec.function)?;
395 aggregates.insert(spec.output_name.clone(), result);
396 }
397
398 let end = window.last_timestamp.unwrap_or(window.start);
399
400 Ok(WindowResult {
401 start: window.start,
402 end,
403 count: window.data.len(),
404 aggregates,
405 })
406 }
407
408 fn compute_aggregate(&self, values: &[f64], function: AggregateFunction) -> Result<f64> {
410 if values.is_empty() {
411 return Ok(f64::NAN);
412 }
413
414 Ok(match function {
415 AggregateFunction::Sum => values.iter().sum(),
416 AggregateFunction::Avg => values.iter().sum::<f64>() / values.len() as f64,
417 AggregateFunction::Min => values.iter().cloned().fold(f64::INFINITY, f64::min),
418 AggregateFunction::Max => values.iter().cloned().fold(f64::NEG_INFINITY, f64::max),
419 AggregateFunction::Count => values.len() as f64,
420 AggregateFunction::First => values.first().copied().unwrap_or(f64::NAN),
421 AggregateFunction::Last => values.last().copied().unwrap_or(f64::NAN),
422 AggregateFunction::StdDev => {
423 let mean = values.iter().sum::<f64>() / values.len() as f64;
424 let variance =
425 values.iter().map(|v| (v - mean).powi(2)).sum::<f64>() / values.len() as f64;
426 variance.sqrt()
427 }
428 AggregateFunction::Variance => {
429 let mean = values.iter().sum::<f64>() / values.len() as f64;
430 values.iter().map(|v| (v - mean).powi(2)).sum::<f64>() / values.len() as f64
431 }
432 })
433 }
434
435 pub fn take_completed(&mut self) -> Vec<WindowResult> {
437 std::mem::take(&mut self.completed_windows)
438 }
439
440 pub fn flush(&mut self) -> Result<Vec<WindowResult>> {
442 if let Some(ref window) = self.current_window {
444 let result = self.compute_window_result(window)?;
445 self.completed_windows.push(result);
446 }
447
448 for window in &self.active_windows {
449 let result = self.compute_window_result(window)?;
450 self.completed_windows.push(result);
451 }
452
453 self.current_window = None;
454 self.active_windows.clear();
455
456 Ok(self.take_completed())
457 }
458}
459
460#[cfg(test)]
461mod tests {
462 use super::*;
463
464 fn make_data_point(
465 timestamp: DateTime<Utc>,
466 value: f64,
467 ) -> (DateTime<Utc>, HashMap<String, KindedSlot>) {
468 let mut fields = HashMap::new();
469 fields.insert("value".to_string(), KindedSlot::from_number(value));
470 (timestamp, fields)
471 }
472
473 #[test]
474 fn test_tumbling_window() {
475 let mut manager = WindowManager::new(WindowType::tumbling(Duration::seconds(10)));
476 manager.aggregate("value", AggregateFunction::Sum, "sum");
477 manager.aggregate("value", AggregateFunction::Avg, "avg");
478
479 let base = DateTime::from_timestamp(1000000000, 0).unwrap(); for i in 0..5 {
484 let (ts, fields) = make_data_point(base + Duration::seconds(i), 10.0);
485 manager.process(ts, fields).unwrap();
486 }
487
488 assert!(
490 manager.take_completed().is_empty(),
491 "Expected no completed windows within first window"
492 );
493
494 let (ts, fields) = make_data_point(base + Duration::seconds(15), 20.0);
496 manager.process(ts, fields).unwrap();
497
498 let completed = manager.take_completed();
499 assert_eq!(completed.len(), 1, "Expected exactly 1 completed window");
500 assert_eq!(completed[0].count, 5, "Expected 5 data points in window");
501 assert_eq!(completed[0].aggregates.get("sum"), Some(&50.0));
502 assert_eq!(completed[0].aggregates.get("avg"), Some(&10.0));
503 }
504
505 #[test]
506 fn test_count_window() {
507 let mut manager = WindowManager::new(WindowType::count(3));
508 manager.aggregate("value", AggregateFunction::Sum, "sum");
509
510 let base = DateTime::from_timestamp(1000000000, 0).unwrap();
511
512 for i in 0..3 {
513 let (ts, fields) = make_data_point(base + Duration::seconds(i as i64), (i + 1) as f64);
514 manager.process(ts, fields).unwrap();
515 }
516
517 let completed = manager.take_completed();
518 assert_eq!(completed.len(), 1);
519 assert_eq!(completed[0].count, 3);
520 assert_eq!(completed[0].aggregates.get("sum"), Some(&6.0)); }
522
523 #[test]
524 fn test_session_window() {
525 let mut manager = WindowManager::new(WindowType::session(Duration::seconds(5)));
526 manager.aggregate("value", AggregateFunction::Count, "count");
527
528 let base = DateTime::from_timestamp(1000000000, 0).unwrap();
529
530 for i in 0..3 {
532 let (ts, fields) = make_data_point(base + Duration::seconds(i), 1.0);
533 manager.process(ts, fields).unwrap();
534 }
535
536 let (ts, fields) = make_data_point(base + Duration::seconds(10), 1.0);
538 manager.process(ts, fields).unwrap();
539
540 let completed = manager.take_completed();
541 assert_eq!(completed.len(), 1); assert_eq!(completed[0].count, 3);
543 }
544
545 #[test]
546 fn test_aggregate_functions() {
547 let mut manager = WindowManager::new(WindowType::count(5));
548 manager.aggregate("value", AggregateFunction::Min, "min");
549 manager.aggregate("value", AggregateFunction::Max, "max");
550 manager.aggregate("value", AggregateFunction::StdDev, "std");
551
552 let base = DateTime::from_timestamp(1000000000, 0).unwrap();
553 let values = [1.0, 2.0, 3.0, 4.0, 5.0];
554
555 for (i, v) in values.iter().enumerate() {
556 let (ts, fields) = make_data_point(base + Duration::seconds(i as i64), *v);
557 manager.process(ts, fields).unwrap();
558 }
559
560 let completed = manager.take_completed();
561 assert_eq!(completed.len(), 1);
562 assert_eq!(completed[0].aggregates.get("min"), Some(&1.0));
563 assert_eq!(completed[0].aggregates.get("max"), Some(&5.0));
564 let std = completed[0].aggregates.get("std").unwrap();
566 assert!((std - 1.414).abs() < 0.01);
567 }
568
569 #[test]
570 fn test_flush() {
571 let mut manager = WindowManager::new(WindowType::tumbling(Duration::seconds(10)));
572 manager.aggregate("value", AggregateFunction::Sum, "sum");
573
574 let base = DateTime::from_timestamp(1000000000, 0).unwrap();
575 let (ts, fields) = make_data_point(base, 42.0);
576 manager.process(ts, fields).unwrap();
577
578 let results = manager.flush().unwrap();
580 assert_eq!(results.len(), 1);
581 assert_eq!(results[0].aggregates.get("sum"), Some(&42.0));
582 }
583}