hyprstream_core/
aggregation.rs

1//! Core aggregation framework for time-series data.
2//!
3//! This module provides the foundational types and functionality for aggregating
4//! time-series data. It defines:
5//! - Generic aggregation functions (Sum, Avg, Min, Max, Count)
6//! - Time window specifications (None, Fixed, Sliding)
7//! - Grouping operations
8//! - SQL query generation
9//!
10//! This framework is used by more specific aggregation implementations, such as
11//! the metric-specific aggregation in `crate::metrics::aggregation`.
12
13use std::time::Duration;
14use serde::{Serialize, Deserialize};
15use std::fmt::{Display, Formatter};
16
17/// Time window for aggregation
18#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
19pub enum TimeWindow {
20    /// No time window, aggregate all data
21    None,
22    /// Fixed time window (e.g., 5 minutes, 1 hour)
23    Fixed(Duration),
24    /// Sliding time window with window size and slide interval
25    Sliding {
26        window: Duration,
27        slide: Duration,
28    },
29}
30
31/// Generic aggregation functions that can be applied to time-series data
32#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
33pub enum AggregateFunction {
34    /// Count the number of values
35    Count,
36    /// Sum all values
37    Sum,
38    /// Calculate the average
39    Avg,
40    /// Find the minimum value
41    Min,
42    /// Find the maximum value
43    Max,
44}
45
46impl Display for AggregateFunction {
47    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
48        match self {
49            AggregateFunction::Count => write!(f, "COUNT"),
50            AggregateFunction::Sum => write!(f, "SUM"),
51            AggregateFunction::Avg => write!(f, "AVG"),
52            AggregateFunction::Min => write!(f, "MIN"),
53            AggregateFunction::Max => write!(f, "MAX"),
54        }
55    }
56}
57
58/// Grouping specification for aggregation
59#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct GroupBy {
61    pub columns: Vec<String>,
62    pub time_column: Option<String>,
63}
64
65/// Result of an aggregation operation
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct AggregateResult {
68    pub value: f64,
69    pub timestamp: i64,
70}
71
72impl TimeWindow {
73    /// Calculates the window boundaries for a given timestamp
74    pub fn window_bounds(&self, timestamp: i64) -> (i64, i64) {
75        match *self {
76            TimeWindow::None => (i64::MIN, i64::MAX),
77            TimeWindow::Fixed(duration) => {
78                let window_size = duration.as_secs() as i64;
79                let window_start = (timestamp / window_size) * window_size;
80                (window_start, window_start + window_size)
81            },
82            TimeWindow::Sliding { window, slide } => {
83                let window_size = window.as_secs() as i64;
84                let slide_size = slide.as_secs() as i64;
85                let current_slide = (timestamp / slide_size) * slide_size;
86                (current_slide, current_slide + window_size)
87            }
88        }
89    }
90
91    /// Generates SQL expressions for window boundaries
92    pub fn to_sql(&self) -> Option<String> {
93        match *self {
94            TimeWindow::None => None,
95            TimeWindow::Fixed(duration) => {
96                let window_size = duration.as_secs();
97                Some(format!(
98                    "(timestamp / {}) * {} as window_start, 
99                    ((timestamp / {}) + 1) * {} as window_end",
100                    window_size, window_size, window_size, window_size
101                ))
102            },
103            TimeWindow::Sliding { window, slide } => {
104                let window_size = window.as_secs();
105                let slide_size = slide.as_secs();
106                Some(format!(
107                    "(timestamp / {}) * {} as window_start,
108                    ((timestamp / {}) * {} + {}) as window_end",
109                    slide_size, slide_size, slide_size, slide_size, window_size
110                ))
111            }
112        }
113    }
114}
115
116impl AggregateFunction {
117    /// Generates SQL for the aggregation function
118    pub fn to_sql(&self, column: &str) -> String {
119        match self {
120            AggregateFunction::Sum => format!("SUM({})", column),
121            AggregateFunction::Avg => format!("AVG({})", column),
122            AggregateFunction::Min => format!("MIN({})", column),
123            AggregateFunction::Max => format!("MAX({})", column),
124            AggregateFunction::Count => format!("COUNT({})", column),
125        }
126    }
127}
128
129/// Builds a SQL query for aggregation.
130///
131/// This is the core query builder used by specific aggregation implementations.
132/// It provides a flexible way to build SQL queries for different types of
133/// time-series data aggregation.
134///
135/// # Arguments
136///
137/// * `table_name` - The source table name
138/// * `function` - The aggregation function to apply
139/// * `group_by` - The grouping specification
140/// * `columns` - The columns to aggregate
141/// * `from_timestamp` - Optional start of the time range
142/// * `to_timestamp` - Optional end of the time range
143///
144/// # Returns
145///
146/// A SQL query string for the specified aggregation
147pub fn build_aggregate_query(
148    table_name: &str,
149    function: AggregateFunction,
150    group_by: &GroupBy,
151    _columns: &[&str],
152    from_timestamp: Option<i64>,
153    to_timestamp: Option<i64>,
154) -> String {
155    let mut query = String::new();
156    
157    // Build SELECT clause
158    query.push_str("SELECT ");
159    
160    // Add group by columns
161    if !group_by.columns.is_empty() {
162        let cols: Vec<&str> = group_by.columns.iter().map(|s| s.as_str()).collect();
163        query.push_str(&cols.join(", "));
164        query.push_str(", ");
165    }
166    
167    // Add time column if present
168    if let Some(time_col) = &group_by.time_column {
169        query.push_str(&format!("{}, ", time_col));
170    }
171    
172    // Add aggregation function
173    match function {
174        AggregateFunction::Sum => query.push_str("SUM(value)"),
175        AggregateFunction::Avg => query.push_str("AVG(value)"),
176        AggregateFunction::Count => query.push_str("COUNT(*)"),
177        AggregateFunction::Min => query.push_str("MIN(value)"),
178        AggregateFunction::Max => query.push_str("MAX(value)"),
179    }
180    
181    // Add FROM clause
182    query.push_str(&format!(" FROM {}", table_name));
183    
184    // Add WHERE clause for timestamp range
185    if let Some(from_ts) = from_timestamp {
186        query.push_str(&format!(" WHERE timestamp >= {}", from_ts));
187        if let Some(to_ts) = to_timestamp {
188            query.push_str(&format!(" AND timestamp <= {}", to_ts));
189        }
190    }
191    
192    // Add GROUP BY clause
193    if !group_by.columns.is_empty() || group_by.time_column.is_some() {
194        query.push_str(" GROUP BY ");
195        let mut group_cols = Vec::new();
196        
197        if !group_by.columns.is_empty() {
198            let cols: Vec<&str> = group_by.columns.iter().map(|s| s.as_str()).collect();
199            group_cols.extend(cols);
200        }
201        
202        if let Some(time_col) = &group_by.time_column {
203            group_cols.push(time_col.as_str());
204        }
205        
206        query.push_str(&group_cols.join(", "));
207    }
208    
209    query
210}