oxirs-stream 0.2.2

Real-time streaming support with Kafka/NATS/MQTT/OPC-UA I/O, RDF Patch, and SPARQL Update delta
Documentation
//! Window management for event processing
//!
//! This module provides windowing capabilities for stream processing including:
//! - Time-based windowing (tumbling, sliding)
//! - Count-based windowing
//! - Session-based windowing
//! - Custom window types

use crate::StreamEvent;
use anyhow::Result;
use chrono::{DateTime, Duration as ChronoDuration, Utc};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use uuid::Uuid;

/// Window types for event processing
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum WindowType {
    /// Fixed time-based window
    Tumbling { duration: ChronoDuration },
    /// Overlapping time-based window
    Sliding {
        duration: ChronoDuration,
        slide: ChronoDuration,
    },
    /// Count-based window
    CountBased { size: usize },
    /// Session-based window (events grouped by activity)
    Session { timeout: ChronoDuration },
    /// Custom window with user-defined logic
    Custom { name: String },
}

/// Window trigger conditions
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum WindowTrigger {
    /// Trigger when window ends
    OnTime,
    /// Trigger every N events
    OnCount(usize),
    /// Trigger on specific conditions
    OnCondition(String),
    /// Trigger both on time and count
    Hybrid { time: ChronoDuration, count: usize },
}

/// Window configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WindowConfig {
    pub window_type: WindowType,
    pub aggregates: Vec<super::aggregation::AggregateFunction>,
    pub group_by: Vec<String>,
    pub filter: Option<String>,
    pub allow_lateness: Option<ChronoDuration>,
    pub trigger: WindowTrigger,
}

/// Event processing window
#[derive(Debug)]
pub struct EventWindow {
    id: String,
    config: WindowConfig,
    events: VecDeque<StreamEvent>,
    start_time: DateTime<Utc>,
    end_time: Option<DateTime<Utc>>,
    last_trigger: Option<DateTime<Utc>>,
    event_count: usize,
    aggregation_state: HashMap<String, super::aggregation::AggregationState>,
}

/// Result of window aggregation
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WindowResult {
    pub window_id: String,
    pub window_start: DateTime<Utc>,
    pub window_end: DateTime<Utc>,
    pub event_count: usize,
    pub aggregations: HashMap<String, serde_json::Value>,
    pub trigger_reason: String,
    pub processing_time: DateTime<Utc>,
}

impl EventWindow {
    /// Create a new event window
    pub fn new(config: WindowConfig) -> Self {
        let id = Uuid::new_v4().to_string();
        let start_time = Utc::now();

        Self {
            id,
            config,
            events: VecDeque::new(),
            start_time,
            end_time: None,
            last_trigger: None,
            event_count: 0,
            aggregation_state: HashMap::new(),
        }
    }

    /// Add an event to the window
    pub fn add_event(&mut self, event: StreamEvent) -> Result<()> {
        self.events.push_back(event);
        self.event_count += 1;

        // Update aggregation state
        self.update_aggregations()?;

        Ok(())
    }

    /// Check if window should trigger
    pub fn should_trigger(&self, current_time: DateTime<Utc>) -> bool {
        match &self.config.trigger {
            WindowTrigger::OnTime => match &self.config.window_type {
                WindowType::Tumbling { duration } => current_time >= self.start_time + *duration,
                WindowType::Sliding { duration, .. } => current_time >= self.start_time + *duration,
                _ => false,
            },
            WindowTrigger::OnCount(count) => self.event_count >= *count,
            WindowTrigger::OnCondition(condition) => self.evaluate_condition(condition),
            WindowTrigger::Hybrid { time, count } => {
                let time_condition = current_time >= self.start_time + *time;
                let count_condition = self.event_count >= *count;
                time_condition || count_condition
            }
        }
    }

    /// Evaluate trigger condition
    fn evaluate_condition(&self, condition: &str) -> bool {
        match condition {
            "window_full" => match &self.config.window_type {
                WindowType::CountBased { size } => self.event_count >= *size,
                _ => false,
            },
            "always" => true,
            "never" => false,
            condition if condition.starts_with("time_elapsed:") => {
                if let Ok(seconds) = condition
                    .strip_prefix("time_elapsed:")
                    .expect("strip_prefix should succeed after starts_with check")
                    .parse::<i64>()
                {
                    let duration = ChronoDuration::seconds(seconds);
                    Utc::now() >= self.start_time + duration
                } else {
                    false
                }
            }
            condition if condition.starts_with("count_gte:") => {
                if let Ok(count) = condition
                    .strip_prefix("count_gte:")
                    .expect("strip_prefix should succeed after starts_with check")
                    .parse::<usize>()
                {
                    self.event_count >= count
                } else {
                    false
                }
            }
            condition if condition.starts_with("count_eq:") => {
                if let Ok(count) = condition
                    .strip_prefix("count_eq:")
                    .expect("strip_prefix should succeed after starts_with check")
                    .parse::<usize>()
                {
                    self.event_count == count
                } else {
                    false
                }
            }
            _ => condition.parse::<bool>().unwrap_or_default(),
        }
    }

    /// Update aggregation state
    fn update_aggregations(&mut self) -> Result<()> {
        // Implementation details for aggregation updates
        // This would be moved from the original processing.rs
        Ok(())
    }

    /// Get window ID
    pub fn id(&self) -> &str {
        &self.id
    }

    /// Get window configuration
    pub fn config(&self) -> &WindowConfig {
        &self.config
    }

    /// Get events in window
    pub fn events(&self) -> &VecDeque<StreamEvent> {
        &self.events
    }

    /// Get event count
    pub fn event_count(&self) -> usize {
        self.event_count
    }

    /// Get aggregation state
    pub fn aggregation_state(&self) -> &HashMap<String, super::aggregation::AggregationState> {
        &self.aggregation_state
    }
}

/// Watermark for tracking event time progress
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Watermark {
    /// Current watermark timestamp
    pub timestamp: DateTime<Utc>,
    /// Allowed lateness after watermark
    pub allowed_lateness: ChronoDuration,
}

impl Watermark {
    /// Create a new watermark with default values
    pub fn new() -> Self {
        Self {
            timestamp: Utc::now(),
            allowed_lateness: ChronoDuration::seconds(60),
        }
    }

    /// Update watermark with new timestamp
    pub fn update(&mut self, timestamp: DateTime<Utc>) {
        if timestamp > self.timestamp {
            self.timestamp = timestamp;
        }
    }

    /// Get the current watermark timestamp
    pub fn current(&self) -> DateTime<Utc> {
        self.timestamp
    }
}

impl Default for Watermark {
    fn default() -> Self {
        Self::new()
    }
}