oxirs_stream/processing/
window.rs1use crate::StreamEvent;
10use anyhow::Result;
11use chrono::{DateTime, Duration as ChronoDuration, Utc};
12use serde::{Deserialize, Serialize};
13use std::collections::{HashMap, VecDeque};
14use uuid::Uuid;
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
18pub enum WindowType {
19 Tumbling { duration: ChronoDuration },
21 Sliding {
23 duration: ChronoDuration,
24 slide: ChronoDuration,
25 },
26 CountBased { size: usize },
28 Session { timeout: ChronoDuration },
30 Custom { name: String },
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
36pub enum WindowTrigger {
37 OnTime,
39 OnCount(usize),
41 OnCondition(String),
43 Hybrid { time: ChronoDuration, count: usize },
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct WindowConfig {
50 pub window_type: WindowType,
51 pub aggregates: Vec<super::aggregation::AggregateFunction>,
52 pub group_by: Vec<String>,
53 pub filter: Option<String>,
54 pub allow_lateness: Option<ChronoDuration>,
55 pub trigger: WindowTrigger,
56}
57
58#[derive(Debug)]
60pub struct EventWindow {
61 id: String,
62 config: WindowConfig,
63 events: VecDeque<StreamEvent>,
64 start_time: DateTime<Utc>,
65 end_time: Option<DateTime<Utc>>,
66 last_trigger: Option<DateTime<Utc>>,
67 event_count: usize,
68 aggregation_state: HashMap<String, super::aggregation::AggregationState>,
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct WindowResult {
74 pub window_id: String,
75 pub window_start: DateTime<Utc>,
76 pub window_end: DateTime<Utc>,
77 pub event_count: usize,
78 pub aggregations: HashMap<String, serde_json::Value>,
79 pub trigger_reason: String,
80 pub processing_time: DateTime<Utc>,
81}
82
83impl EventWindow {
84 pub fn new(config: WindowConfig) -> Self {
86 let id = Uuid::new_v4().to_string();
87 let start_time = Utc::now();
88
89 Self {
90 id,
91 config,
92 events: VecDeque::new(),
93 start_time,
94 end_time: None,
95 last_trigger: None,
96 event_count: 0,
97 aggregation_state: HashMap::new(),
98 }
99 }
100
101 pub fn add_event(&mut self, event: StreamEvent) -> Result<()> {
103 self.events.push_back(event);
104 self.event_count += 1;
105
106 self.update_aggregations()?;
108
109 Ok(())
110 }
111
112 pub fn should_trigger(&self, current_time: DateTime<Utc>) -> bool {
114 match &self.config.trigger {
115 WindowTrigger::OnTime => match &self.config.window_type {
116 WindowType::Tumbling { duration } => current_time >= self.start_time + *duration,
117 WindowType::Sliding { duration, .. } => current_time >= self.start_time + *duration,
118 _ => false,
119 },
120 WindowTrigger::OnCount(count) => self.event_count >= *count,
121 WindowTrigger::OnCondition(condition) => self.evaluate_condition(condition),
122 WindowTrigger::Hybrid { time, count } => {
123 let time_condition = current_time >= self.start_time + *time;
124 let count_condition = self.event_count >= *count;
125 time_condition || count_condition
126 }
127 }
128 }
129
130 fn evaluate_condition(&self, condition: &str) -> bool {
132 match condition {
133 "window_full" => match &self.config.window_type {
134 WindowType::CountBased { size } => self.event_count >= *size,
135 _ => false,
136 },
137 "always" => true,
138 "never" => false,
139 condition if condition.starts_with("time_elapsed:") => {
140 if let Ok(seconds) = condition
141 .strip_prefix("time_elapsed:")
142 .expect("strip_prefix should succeed after starts_with check")
143 .parse::<i64>()
144 {
145 let duration = ChronoDuration::seconds(seconds);
146 Utc::now() >= self.start_time + duration
147 } else {
148 false
149 }
150 }
151 condition if condition.starts_with("count_gte:") => {
152 if let Ok(count) = condition
153 .strip_prefix("count_gte:")
154 .expect("strip_prefix should succeed after starts_with check")
155 .parse::<usize>()
156 {
157 self.event_count >= count
158 } else {
159 false
160 }
161 }
162 condition if condition.starts_with("count_eq:") => {
163 if let Ok(count) = condition
164 .strip_prefix("count_eq:")
165 .expect("strip_prefix should succeed after starts_with check")
166 .parse::<usize>()
167 {
168 self.event_count == count
169 } else {
170 false
171 }
172 }
173 _ => condition.parse::<bool>().unwrap_or_default(),
174 }
175 }
176
177 fn update_aggregations(&mut self) -> Result<()> {
179 Ok(())
182 }
183
184 pub fn id(&self) -> &str {
186 &self.id
187 }
188
189 pub fn config(&self) -> &WindowConfig {
191 &self.config
192 }
193
194 pub fn events(&self) -> &VecDeque<StreamEvent> {
196 &self.events
197 }
198
199 pub fn event_count(&self) -> usize {
201 self.event_count
202 }
203
204 pub fn aggregation_state(&self) -> &HashMap<String, super::aggregation::AggregationState> {
206 &self.aggregation_state
207 }
208}
209
210#[derive(Debug, Clone, Serialize, Deserialize)]
212pub struct Watermark {
213 pub timestamp: DateTime<Utc>,
215 pub allowed_lateness: ChronoDuration,
217}
218
219impl Watermark {
220 pub fn new() -> Self {
222 Self {
223 timestamp: Utc::now(),
224 allowed_lateness: ChronoDuration::seconds(60),
225 }
226 }
227
228 pub fn update(&mut self, timestamp: DateTime<Utc>) {
230 if timestamp > self.timestamp {
231 self.timestamp = timestamp;
232 }
233 }
234
235 pub fn current(&self) -> DateTime<Utc> {
237 self.timestamp
238 }
239}
240
241impl Default for Watermark {
242 fn default() -> Self {
243 Self::new()
244 }
245}