oxirs_stream/processing/
window.rs

1//! Window management for event processing
2//!
3//! This module provides windowing capabilities for stream processing including:
4//! - Time-based windowing (tumbling, sliding)
5//! - Count-based windowing
6//! - Session-based windowing
7//! - Custom window types
8
9use crate::StreamEvent;
10use anyhow::Result;
11use chrono::{DateTime, Duration as ChronoDuration, Utc};
12use serde::{Deserialize, Serialize};
13use std::collections::{HashMap, VecDeque};
14use uuid::Uuid;
15
16/// Window types for event processing
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub enum WindowType {
19    /// Fixed time-based window
20    Tumbling { duration: ChronoDuration },
21    /// Overlapping time-based window
22    Sliding {
23        duration: ChronoDuration,
24        slide: ChronoDuration,
25    },
26    /// Count-based window
27    CountBased { size: usize },
28    /// Session-based window (events grouped by activity)
29    Session { timeout: ChronoDuration },
30    /// Custom window with user-defined logic
31    Custom { name: String },
32}
33
34/// Window trigger conditions
35#[derive(Debug, Clone, Serialize, Deserialize)]
36pub enum WindowTrigger {
37    /// Trigger when window ends
38    OnTime,
39    /// Trigger every N events
40    OnCount(usize),
41    /// Trigger on specific conditions
42    OnCondition(String),
43    /// Trigger both on time and count
44    Hybrid { time: ChronoDuration, count: usize },
45}
46
47/// Window configuration
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct WindowConfig {
50    pub window_type: WindowType,
51    pub aggregates: Vec<super::aggregation::AggregateFunction>,
52    pub group_by: Vec<String>,
53    pub filter: Option<String>,
54    pub allow_lateness: Option<ChronoDuration>,
55    pub trigger: WindowTrigger,
56}
57
58/// Event processing window
59#[derive(Debug)]
60pub struct EventWindow {
61    id: String,
62    config: WindowConfig,
63    events: VecDeque<StreamEvent>,
64    start_time: DateTime<Utc>,
65    end_time: Option<DateTime<Utc>>,
66    last_trigger: Option<DateTime<Utc>>,
67    event_count: usize,
68    aggregation_state: HashMap<String, super::aggregation::AggregationState>,
69}
70
71/// Result of window aggregation
72#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct WindowResult {
74    pub window_id: String,
75    pub window_start: DateTime<Utc>,
76    pub window_end: DateTime<Utc>,
77    pub event_count: usize,
78    pub aggregations: HashMap<String, serde_json::Value>,
79    pub trigger_reason: String,
80    pub processing_time: DateTime<Utc>,
81}
82
83impl EventWindow {
84    /// Create a new event window
85    pub fn new(config: WindowConfig) -> Self {
86        let id = Uuid::new_v4().to_string();
87        let start_time = Utc::now();
88
89        Self {
90            id,
91            config,
92            events: VecDeque::new(),
93            start_time,
94            end_time: None,
95            last_trigger: None,
96            event_count: 0,
97            aggregation_state: HashMap::new(),
98        }
99    }
100
101    /// Add an event to the window
102    pub fn add_event(&mut self, event: StreamEvent) -> Result<()> {
103        self.events.push_back(event);
104        self.event_count += 1;
105
106        // Update aggregation state
107        self.update_aggregations()?;
108
109        Ok(())
110    }
111
112    /// Check if window should trigger
113    pub fn should_trigger(&self, current_time: DateTime<Utc>) -> bool {
114        match &self.config.trigger {
115            WindowTrigger::OnTime => match &self.config.window_type {
116                WindowType::Tumbling { duration } => current_time >= self.start_time + *duration,
117                WindowType::Sliding { duration, .. } => current_time >= self.start_time + *duration,
118                _ => false,
119            },
120            WindowTrigger::OnCount(count) => self.event_count >= *count,
121            WindowTrigger::OnCondition(condition) => self.evaluate_condition(condition),
122            WindowTrigger::Hybrid { time, count } => {
123                let time_condition = current_time >= self.start_time + *time;
124                let count_condition = self.event_count >= *count;
125                time_condition || count_condition
126            }
127        }
128    }
129
130    /// Evaluate trigger condition
131    fn evaluate_condition(&self, condition: &str) -> bool {
132        match condition {
133            "window_full" => match &self.config.window_type {
134                WindowType::CountBased { size } => self.event_count >= *size,
135                _ => false,
136            },
137            "always" => true,
138            "never" => false,
139            condition if condition.starts_with("time_elapsed:") => {
140                if let Ok(seconds) = condition
141                    .strip_prefix("time_elapsed:")
142                    .unwrap()
143                    .parse::<i64>()
144                {
145                    let duration = ChronoDuration::seconds(seconds);
146                    Utc::now() >= self.start_time + duration
147                } else {
148                    false
149                }
150            }
151            condition if condition.starts_with("count_gte:") => {
152                if let Ok(count) = condition
153                    .strip_prefix("count_gte:")
154                    .unwrap()
155                    .parse::<usize>()
156                {
157                    self.event_count >= count
158                } else {
159                    false
160                }
161            }
162            condition if condition.starts_with("count_eq:") => {
163                if let Ok(count) = condition
164                    .strip_prefix("count_eq:")
165                    .unwrap()
166                    .parse::<usize>()
167                {
168                    self.event_count == count
169                } else {
170                    false
171                }
172            }
173            _ => condition.parse::<bool>().unwrap_or_default(),
174        }
175    }
176
177    /// Update aggregation state
178    fn update_aggregations(&mut self) -> Result<()> {
179        // Implementation details for aggregation updates
180        // This would be moved from the original processing.rs
181        Ok(())
182    }
183
184    /// Get window ID
185    pub fn id(&self) -> &str {
186        &self.id
187    }
188
189    /// Get window configuration
190    pub fn config(&self) -> &WindowConfig {
191        &self.config
192    }
193
194    /// Get events in window
195    pub fn events(&self) -> &VecDeque<StreamEvent> {
196        &self.events
197    }
198
199    /// Get event count
200    pub fn event_count(&self) -> usize {
201        self.event_count
202    }
203
204    /// Get aggregation state
205    pub fn aggregation_state(&self) -> &HashMap<String, super::aggregation::AggregationState> {
206        &self.aggregation_state
207    }
208}
209
210/// Watermark for tracking event time progress
211#[derive(Debug, Clone, Serialize, Deserialize)]
212pub struct Watermark {
213    /// Current watermark timestamp
214    pub timestamp: DateTime<Utc>,
215    /// Allowed lateness after watermark
216    pub allowed_lateness: ChronoDuration,
217}
218
219impl Watermark {
220    /// Create a new watermark with default values
221    pub fn new() -> Self {
222        Self {
223            timestamp: Utc::now(),
224            allowed_lateness: ChronoDuration::seconds(60),
225        }
226    }
227
228    /// Update watermark with new timestamp
229    pub fn update(&mut self, timestamp: DateTime<Utc>) {
230        if timestamp > self.timestamp {
231            self.timestamp = timestamp;
232        }
233    }
234
235    /// Get the current watermark timestamp
236    pub fn current(&self) -> DateTime<Utc> {
237        self.timestamp
238    }
239}
240
241impl Default for Watermark {
242    fn default() -> Self {
243        Self::new()
244    }
245}