1#[derive(Debug, Clone, Copy, PartialEq)]
8pub enum WindowType {
9 Tumbling { size_ms: u64 },
11 Sliding { size_ms: u64, step_ms: u64 },
13 Session { gap_ms: u64 },
15 Count { size: usize, step: usize },
17}
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum WindowAggregation {
22 Sum,
24 Mean,
26 Max,
28 Min,
30 Count,
32 LastValue,
34 FirstValue,
36}
37
38#[derive(Debug, Clone)]
40pub struct WindowConfig {
41 pub window_type: WindowType,
43 pub aggregation: WindowAggregation,
45 pub emit_partial: bool,
47 pub min_elements: usize,
49}
50
51impl WindowConfig {
52 pub fn tumbling(size_ms: u64, aggregation: WindowAggregation) -> Self {
54 WindowConfig {
55 window_type: WindowType::Tumbling { size_ms },
56 aggregation,
57 emit_partial: false,
58 min_elements: 1,
59 }
60 }
61
62 pub fn sliding(size_ms: u64, step_ms: u64, aggregation: WindowAggregation) -> Self {
64 WindowConfig {
65 window_type: WindowType::Sliding { size_ms, step_ms },
66 aggregation,
67 emit_partial: false,
68 min_elements: 1,
69 }
70 }
71
72 pub fn count(size: usize, step: usize, aggregation: WindowAggregation) -> Self {
74 WindowConfig {
75 window_type: WindowType::Count { size, step },
76 aggregation,
77 emit_partial: false,
78 min_elements: 1,
79 }
80 }
81
82 pub fn with_emit_partial(mut self, emit: bool) -> Self {
84 self.emit_partial = emit;
85 self
86 }
87
88 pub fn with_min_elements(mut self, min: usize) -> Self {
90 self.min_elements = min;
91 self
92 }
93}
94
95#[derive(Debug, Clone)]
97pub struct WindowResult {
98 pub start_ms: u64,
100 pub end_ms: u64,
102 pub element_count: usize,
104 pub value: f64,
106 pub is_complete: bool,
108}
109
110pub struct WindowedAggregation {
112 config: WindowConfig,
113}
114
115impl WindowedAggregation {
116 pub fn new(config: WindowConfig) -> Self {
118 WindowedAggregation { config }
119 }
120
121 pub fn process_tumbling(&self, events: &[(u64, f64)]) -> Vec<WindowResult> {
125 if events.is_empty() {
126 return Vec::new();
127 }
128
129 let size_ms = match self.config.window_type {
130 WindowType::Tumbling { size_ms } => size_ms,
131 _ => return Vec::new(),
132 };
133
134 let first_ts = events[0].0;
135 let window_start_base = (first_ts / size_ms) * size_ms;
137
138 let last_ts = events.iter().map(|(t, _)| *t).max().unwrap_or(first_ts);
139 let num_windows = ((last_ts.saturating_sub(window_start_base)) / size_ms + 1) as usize;
141
142 let mut results = Vec::new();
143
144 for i in 0..num_windows {
145 let start = window_start_base + i as u64 * size_ms;
146 let end = start + size_ms;
147
148 let window_values: Vec<f64> = events
149 .iter()
150 .filter(|(t, _)| *t >= start && *t < end)
151 .map(|(_, v)| *v)
152 .collect();
153
154 if window_values.len() < self.config.min_elements {
155 continue;
156 }
157
158 let is_complete = if self.config.emit_partial {
165 last_ts >= end
168 } else {
169 true
172 };
173
174 if !is_complete && !self.config.emit_partial {
175 continue;
178 }
179
180 let value = WindowedAggregation::aggregate(&window_values, self.config.aggregation);
181 results.push(WindowResult {
182 start_ms: start,
183 end_ms: end,
184 element_count: window_values.len(),
185 value,
186 is_complete,
187 });
188 }
189
190 results
191 }
192
193 pub fn process_sliding(&self, events: &[(u64, f64)]) -> Vec<WindowResult> {
197 if events.is_empty() {
198 return Vec::new();
199 }
200
201 let (size_ms, step_ms) = match self.config.window_type {
202 WindowType::Sliding { size_ms, step_ms } => (size_ms, step_ms),
203 _ => return Vec::new(),
204 };
205
206 let first_ts = events[0].0;
207 let last_ts = events.iter().map(|(t, _)| *t).max().unwrap_or(first_ts);
208
209 let start_base = (first_ts / step_ms) * step_ms;
211
212 let mut results = Vec::new();
213 let mut window_start = start_base;
214
215 loop {
216 if window_start > last_ts {
217 break;
218 }
219 let window_end = window_start + size_ms;
220
221 let window_values: Vec<f64> = events
222 .iter()
223 .filter(|(t, _)| *t >= window_start && *t < window_end)
224 .map(|(_, v)| *v)
225 .collect();
226
227 if window_values.len() >= self.config.min_elements {
228 let is_complete = if self.config.emit_partial {
232 last_ts >= window_end
233 } else {
234 true
235 };
236 if is_complete || self.config.emit_partial {
237 let value =
238 WindowedAggregation::aggregate(&window_values, self.config.aggregation);
239 results.push(WindowResult {
240 start_ms: window_start,
241 end_ms: window_end,
242 element_count: window_values.len(),
243 value,
244 is_complete,
245 });
246 }
247 }
248
249 window_start += step_ms;
250 }
251
252 results
253 }
254
255 pub fn process_count(&self, values: &[f64]) -> Vec<WindowResult> {
259 if values.is_empty() {
260 return Vec::new();
261 }
262
263 let (size, step) = match self.config.window_type {
264 WindowType::Count { size, step } => (size, step),
265 _ => return Vec::new(),
266 };
267
268 if step == 0 || size == 0 {
269 return Vec::new();
270 }
271
272 let mut results = Vec::new();
273 let mut offset = 0usize;
274
275 loop {
276 if offset >= values.len() {
277 break;
278 }
279 let end = (offset + size).min(values.len());
280 let window_values = &values[offset..end];
281
282 let is_complete = offset + size <= values.len();
283
284 if window_values.len() < self.config.min_elements {
285 break;
286 }
287
288 if !is_complete && !self.config.emit_partial {
289 break;
290 }
291
292 let value = WindowedAggregation::aggregate(window_values, self.config.aggregation);
293 results.push(WindowResult {
294 start_ms: offset as u64,
295 end_ms: (offset + size) as u64,
296 element_count: window_values.len(),
297 value,
298 is_complete,
299 });
300
301 offset += step;
302 }
303
304 results
305 }
306
307 pub fn process_session(&self, events: &[(u64, f64)]) -> Vec<WindowResult> {
310 if events.is_empty() {
311 return Vec::new();
312 }
313
314 let gap_ms = match self.config.window_type {
315 WindowType::Session { gap_ms } => gap_ms,
316 _ => return Vec::new(),
317 };
318
319 let mut results = Vec::new();
320 let mut session_start = events[0].0;
321 let mut session_values: Vec<f64> = Vec::new();
322
323 for (idx, (ts, val)) in events.iter().enumerate() {
324 if idx > 0 {
325 let prev_ts = events[idx - 1].0;
326 let gap = ts.saturating_sub(prev_ts);
327 if gap > gap_ms {
328 if session_values.len() >= self.config.min_elements {
330 let value = WindowedAggregation::aggregate(
331 &session_values,
332 self.config.aggregation,
333 );
334 results.push(WindowResult {
335 start_ms: session_start,
336 end_ms: events[idx - 1].0,
337 element_count: session_values.len(),
338 value,
339 is_complete: true,
340 });
341 }
342 session_start = *ts;
344 session_values.clear();
345 }
346 }
347 session_values.push(*val);
348 }
349
350 if session_values.len() >= self.config.min_elements {
352 let value = WindowedAggregation::aggregate(&session_values, self.config.aggregation);
353 results.push(WindowResult {
354 start_ms: session_start,
355 end_ms: events.last().map(|(t, _)| *t).unwrap_or(session_start),
356 element_count: session_values.len(),
357 value,
358 is_complete: true,
359 });
360 }
361
362 results
363 }
364
365 pub fn aggregate(values: &[f64], agg: WindowAggregation) -> f64 {
367 match agg {
368 WindowAggregation::Sum => values.iter().copied().fold(0.0_f64, |acc, v| acc + v),
369 WindowAggregation::Mean => {
370 if values.is_empty() {
371 0.0
372 } else {
373 let sum: f64 = values.iter().copied().fold(0.0_f64, |acc, v| acc + v);
374 sum / values.len() as f64
375 }
376 }
377 WindowAggregation::Max => values.iter().copied().fold(f64::NEG_INFINITY, f64::max),
378 WindowAggregation::Min => values.iter().copied().fold(f64::INFINITY, f64::min),
379 WindowAggregation::Count => values.len() as f64,
380 WindowAggregation::LastValue => values.last().copied().unwrap_or(0.0),
381 WindowAggregation::FirstValue => values.first().copied().unwrap_or(0.0),
382 }
383 }
384
385 pub fn process(&self, events: &[(u64, f64)]) -> Vec<WindowResult> {
387 match self.config.window_type {
388 WindowType::Tumbling { .. } => self.process_tumbling(events),
389 WindowType::Sliding { .. } => self.process_sliding(events),
390 WindowType::Session { .. } => self.process_session(events),
391 WindowType::Count { .. } => {
392 let values: Vec<f64> = events.iter().map(|(_, v)| *v).collect();
394 self.process_count(&values)
395 }
396 }
397 }
398}
399
400#[derive(Debug, thiserror::Error)]
402pub enum WindowError {
403 #[error("Step size {step} must be <= window size {size}")]
405 StepExceedsSize { step: u64, size: u64 },
406 #[error("Empty event stream")]
408 EmptyStream,
409 #[error("Invalid window configuration: {0}")]
411 InvalidConfig(String),
412}
413
414#[cfg(test)]
419mod tests {
420 use super::*;
421
422 #[test]
425 fn test_tumbling_sum_basic() {
426 let events: Vec<(u64, f64)> = (0..10u64).map(|i| (i * 100, 1.0)).collect();
428 let cfg = WindowConfig::tumbling(500, WindowAggregation::Sum);
429 let wa = WindowedAggregation::new(cfg);
430 let results = wa.process_tumbling(&events);
431
432 assert_eq!(results.len(), 2, "Expected 2 tumbling windows");
433 assert_eq!(results[0].element_count, 5);
434 assert!((results[0].value - 5.0).abs() < 1e-9);
435 assert_eq!(results[1].element_count, 5);
436 assert!((results[1].value - 5.0).abs() < 1e-9);
437 assert!(results[0].is_complete);
438 assert!(results[1].is_complete);
439 }
440
441 #[test]
442 fn test_tumbling_non_overlapping() {
443 let events: Vec<(u64, f64)> = (0..10u64).map(|i| (i * 100, 1.0)).collect();
445 let cfg = WindowConfig::tumbling(500, WindowAggregation::Count);
446 let wa = WindowedAggregation::new(cfg);
447 let results = wa.process_tumbling(&events);
448
449 let total_counted: usize = results.iter().map(|r| r.element_count).sum();
450 assert_eq!(
451 total_counted, 10,
452 "Each event must appear in exactly one window"
453 );
454 }
455
456 #[test]
457 fn test_tumbling_empty_returns_empty() {
458 let cfg = WindowConfig::tumbling(500, WindowAggregation::Sum);
459 let wa = WindowedAggregation::new(cfg);
460 let results = wa.process_tumbling(&[]);
461 assert!(results.is_empty());
462 }
463
464 #[test]
465 fn test_tumbling_with_partial_window() {
466 let events: Vec<(u64, f64)> = (0..7u64).map(|i| (i * 100, 1.0)).collect();
468 let cfg = WindowConfig::tumbling(500, WindowAggregation::Sum).with_emit_partial(true);
469 let wa = WindowedAggregation::new(cfg);
470 let results = wa.process_tumbling(&events);
471
472 assert_eq!(results.len(), 2, "Expected complete + partial window");
473 assert_eq!(results[0].element_count, 5);
475 assert!((results[0].value - 5.0).abs() < 1e-9);
476 assert!(results[0].is_complete);
477 assert_eq!(results[1].element_count, 2);
479 assert!((results[1].value - 2.0).abs() < 1e-9);
480 assert!(!results[1].is_complete);
481 }
482
483 #[test]
486 fn test_sliding_overlapping_windows() {
487 let events: Vec<(u64, f64)> = (0..5u64).map(|i| (i * 100, 1.0)).collect();
489 let cfg = WindowConfig::sliding(300, 100, WindowAggregation::Sum);
490 let wa = WindowedAggregation::new(cfg);
491 let results = wa.process_sliding(&events);
492
493 assert!(!results.is_empty());
495 assert!(
497 (results[0].value - 3.0).abs() < 1e-9,
498 "First window sum should be 3.0, got {}",
499 results[0].value
500 );
501 }
502
503 #[test]
504 fn test_sliding_step_equals_size_is_tumbling() {
505 let events: Vec<(u64, f64)> = (0..10u64).map(|i| (i * 100, 1.0)).collect();
507 let cfg_sliding = WindowConfig::sliding(500, 500, WindowAggregation::Sum);
508 let cfg_tumbling = WindowConfig::tumbling(500, WindowAggregation::Sum);
509 let wa_s = WindowedAggregation::new(cfg_sliding);
510 let wa_t = WindowedAggregation::new(cfg_tumbling);
511 let sliding_results = wa_s.process_sliding(&events);
512 let tumbling_results = wa_t.process_tumbling(&events);
513 assert_eq!(
514 sliding_results.len(),
515 tumbling_results.len(),
516 "Sliding with step==size should match tumbling"
517 );
518 for (s, t) in sliding_results.iter().zip(tumbling_results.iter()) {
519 assert!((s.value - t.value).abs() < 1e-9);
520 }
521 }
522
523 #[test]
524 fn test_sliding_mean() {
525 let events = vec![(0u64, 1.0_f64), (1, 2.0), (2, 3.0)];
528 let cfg = WindowConfig::sliding(3, 1, WindowAggregation::Mean).with_emit_partial(true);
529 let wa = WindowedAggregation::new(cfg);
530 let results = wa.process_sliding(&events);
531 assert!(!results.is_empty());
533 assert!(
534 (results[0].value - 2.0).abs() < 1e-9,
535 "Mean should be 2.0, got {}",
536 results[0].value
537 );
538 }
539
540 #[test]
543 fn test_count_window_basic() {
544 let values: Vec<f64> = (1..=9u32).map(|v| v as f64).collect();
546 let cfg = WindowConfig::count(3, 3, WindowAggregation::Sum);
547 let wa = WindowedAggregation::new(cfg);
548 let results = wa.process_count(&values);
549
550 assert_eq!(results.len(), 3);
551 assert!((results[0].value - 6.0).abs() < 1e-9); assert!((results[1].value - 15.0).abs() < 1e-9); assert!((results[2].value - 24.0).abs() < 1e-9); }
555
556 #[test]
557 fn test_count_window_sliding() {
558 let values: Vec<f64> = (1..=8u32).map(|v| v as f64).collect();
561 let cfg = WindowConfig::count(4, 2, WindowAggregation::Sum);
562 let wa = WindowedAggregation::new(cfg);
563 let results = wa.process_count(&values);
564
565 assert_eq!(results.len(), 3);
566 assert!((results[0].value - 10.0).abs() < 1e-9);
567 assert!((results[1].value - 18.0).abs() < 1e-9);
568 assert!((results[2].value - 26.0).abs() < 1e-9);
569 }
570
571 #[test]
572 fn test_count_window_min() {
573 let values = vec![3.0_f64, 1.0, 4.0, 1.0, 5.0, 9.0];
575 let cfg = WindowConfig::count(3, 3, WindowAggregation::Min);
576 let wa = WindowedAggregation::new(cfg);
577 let results = wa.process_count(&values);
578
579 assert_eq!(results.len(), 2);
580 assert!((results[0].value - 1.0).abs() < 1e-9); assert!((results[1].value - 1.0).abs() < 1e-9); }
583
584 #[test]
587 fn test_aggregate_all_strategies() {
588 let values = vec![1.0_f64, 2.0, 3.0, 4.0, 5.0];
589 assert!(
590 (WindowedAggregation::aggregate(&values, WindowAggregation::Sum) - 15.0).abs() < 1e-9
591 );
592 assert!(
593 (WindowedAggregation::aggregate(&values, WindowAggregation::Mean) - 3.0).abs() < 1e-9
594 );
595 assert!(
596 (WindowedAggregation::aggregate(&values, WindowAggregation::Max) - 5.0).abs() < 1e-9
597 );
598 assert!(
599 (WindowedAggregation::aggregate(&values, WindowAggregation::Min) - 1.0).abs() < 1e-9
600 );
601 assert!(
602 (WindowedAggregation::aggregate(&values, WindowAggregation::Count) - 5.0).abs() < 1e-9
603 );
604 assert!(
605 (WindowedAggregation::aggregate(&values, WindowAggregation::FirstValue) - 1.0).abs()
606 < 1e-9
607 );
608 assert!(
609 (WindowedAggregation::aggregate(&values, WindowAggregation::LastValue) - 5.0).abs()
610 < 1e-9
611 );
612 }
613
614 #[test]
615 fn test_session_window_gap_detection() {
616 let events = vec![
618 (0u64, 1.0_f64),
619 (100, 2.0),
620 (200, 3.0),
621 (1500, 4.0),
622 (1600, 5.0),
623 ];
624 let cfg = WindowConfig {
625 window_type: WindowType::Session { gap_ms: 500 },
626 aggregation: WindowAggregation::Sum,
627 emit_partial: false,
628 min_elements: 1,
629 };
630 let wa = WindowedAggregation::new(cfg);
631 let results = wa.process_session(&events);
632
633 assert_eq!(results.len(), 2, "Expected 2 sessions");
634 assert_eq!(
635 results[0].element_count, 3,
636 "First session should have 3 elements"
637 );
638 assert_eq!(
639 results[1].element_count, 2,
640 "Second session should have 2 elements"
641 );
642 assert!(results[0].is_complete);
643 assert!(results[1].is_complete);
644 }
645}