use crate::StreamEvent;
use anyhow::Result;
use chrono::{DateTime, Duration as ChronoDuration, Utc};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum WindowType {
Tumbling { duration: ChronoDuration },
Sliding {
duration: ChronoDuration,
slide: ChronoDuration,
},
CountBased { size: usize },
Session { timeout: ChronoDuration },
Custom { name: String },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum WindowTrigger {
OnTime,
OnCount(usize),
OnCondition(String),
Hybrid { time: ChronoDuration, count: usize },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WindowConfig {
pub window_type: WindowType,
pub aggregates: Vec<super::aggregation::AggregateFunction>,
pub group_by: Vec<String>,
pub filter: Option<String>,
pub allow_lateness: Option<ChronoDuration>,
pub trigger: WindowTrigger,
}
#[derive(Debug)]
pub struct EventWindow {
id: String,
config: WindowConfig,
events: VecDeque<StreamEvent>,
start_time: DateTime<Utc>,
end_time: Option<DateTime<Utc>>,
last_trigger: Option<DateTime<Utc>>,
event_count: usize,
aggregation_state: HashMap<String, super::aggregation::AggregationState>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WindowResult {
pub window_id: String,
pub window_start: DateTime<Utc>,
pub window_end: DateTime<Utc>,
pub event_count: usize,
pub aggregations: HashMap<String, serde_json::Value>,
pub trigger_reason: String,
pub processing_time: DateTime<Utc>,
}
impl EventWindow {
pub fn new(config: WindowConfig) -> Self {
let id = Uuid::new_v4().to_string();
let start_time = Utc::now();
Self {
id,
config,
events: VecDeque::new(),
start_time,
end_time: None,
last_trigger: None,
event_count: 0,
aggregation_state: HashMap::new(),
}
}
pub fn add_event(&mut self, event: StreamEvent) -> Result<()> {
self.events.push_back(event);
self.event_count += 1;
self.update_aggregations()?;
Ok(())
}
pub fn should_trigger(&self, current_time: DateTime<Utc>) -> bool {
match &self.config.trigger {
WindowTrigger::OnTime => match &self.config.window_type {
WindowType::Tumbling { duration } => current_time >= self.start_time + *duration,
WindowType::Sliding { duration, .. } => current_time >= self.start_time + *duration,
_ => false,
},
WindowTrigger::OnCount(count) => self.event_count >= *count,
WindowTrigger::OnCondition(condition) => self.evaluate_condition(condition),
WindowTrigger::Hybrid { time, count } => {
let time_condition = current_time >= self.start_time + *time;
let count_condition = self.event_count >= *count;
time_condition || count_condition
}
}
}
fn evaluate_condition(&self, condition: &str) -> bool {
match condition {
"window_full" => match &self.config.window_type {
WindowType::CountBased { size } => self.event_count >= *size,
_ => false,
},
"always" => true,
"never" => false,
condition if condition.starts_with("time_elapsed:") => {
if let Ok(seconds) = condition
.strip_prefix("time_elapsed:")
.expect("strip_prefix should succeed after starts_with check")
.parse::<i64>()
{
let duration = ChronoDuration::seconds(seconds);
Utc::now() >= self.start_time + duration
} else {
false
}
}
condition if condition.starts_with("count_gte:") => {
if let Ok(count) = condition
.strip_prefix("count_gte:")
.expect("strip_prefix should succeed after starts_with check")
.parse::<usize>()
{
self.event_count >= count
} else {
false
}
}
condition if condition.starts_with("count_eq:") => {
if let Ok(count) = condition
.strip_prefix("count_eq:")
.expect("strip_prefix should succeed after starts_with check")
.parse::<usize>()
{
self.event_count == count
} else {
false
}
}
_ => condition.parse::<bool>().unwrap_or_default(),
}
}
fn update_aggregations(&mut self) -> Result<()> {
Ok(())
}
pub fn id(&self) -> &str {
&self.id
}
pub fn config(&self) -> &WindowConfig {
&self.config
}
pub fn events(&self) -> &VecDeque<StreamEvent> {
&self.events
}
pub fn event_count(&self) -> usize {
self.event_count
}
pub fn aggregation_state(&self) -> &HashMap<String, super::aggregation::AggregationState> {
&self.aggregation_state
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Watermark {
pub timestamp: DateTime<Utc>,
pub allowed_lateness: ChronoDuration,
}
impl Watermark {
pub fn new() -> Self {
Self {
timestamp: Utc::now(),
allowed_lateness: ChronoDuration::seconds(60),
}
}
pub fn update(&mut self, timestamp: DateTime<Utc>) {
if timestamp > self.timestamp {
self.timestamp = timestamp;
}
}
pub fn current(&self) -> DateTime<Utc> {
self.timestamp
}
}
impl Default for Watermark {
fn default() -> Self {
Self::new()
}
}