hyprstream_core/
aggregation.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
//! Core aggregation framework for time-series data.
//!
//! This module provides the foundational types and functionality for aggregating
//! time-series data. It defines:
//! - Generic aggregation functions (Sum, Avg, Min, Max, Count)
//! - Time window specifications (None, Fixed, Sliding)
//! - Grouping operations
//! - SQL query generation
//!
//! This framework is used by more specific aggregation implementations, such as
//! the metric-specific aggregation in `crate::metrics::aggregation`.

use std::time::Duration;
use serde::{Serialize, Deserialize};
use std::fmt::{Display, Formatter};

/// Time window for aggregation
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum TimeWindow {
    /// No time window, aggregate all data
    None,
    /// Fixed time window (e.g., 5 minutes, 1 hour)
    Fixed(Duration),
    /// Sliding time window with window size and slide interval
    Sliding {
        window: Duration,
        slide: Duration,
    },
}

/// Generic aggregation functions that can be applied to time-series data
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum AggregateFunction {
    /// Count the number of values
    Count,
    /// Sum all values
    Sum,
    /// Calculate the average
    Avg,
    /// Find the minimum value
    Min,
    /// Find the maximum value
    Max,
}

impl Display for AggregateFunction {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        match self {
            AggregateFunction::Count => write!(f, "COUNT"),
            AggregateFunction::Sum => write!(f, "SUM"),
            AggregateFunction::Avg => write!(f, "AVG"),
            AggregateFunction::Min => write!(f, "MIN"),
            AggregateFunction::Max => write!(f, "MAX"),
        }
    }
}

/// Grouping specification for aggregation
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GroupBy {
    pub columns: Vec<String>,
    pub time_column: Option<String>,
}

/// Result of an aggregation operation
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AggregateResult {
    pub value: f64,
    pub timestamp: i64,
}

impl TimeWindow {
    /// Calculates the window boundaries for a given timestamp
    pub fn window_bounds(&self, timestamp: i64) -> (i64, i64) {
        match *self {
            TimeWindow::None => (i64::MIN, i64::MAX),
            TimeWindow::Fixed(duration) => {
                let window_size = duration.as_secs() as i64;
                let window_start = (timestamp / window_size) * window_size;
                (window_start, window_start + window_size)
            },
            TimeWindow::Sliding { window, slide } => {
                let window_size = window.as_secs() as i64;
                let slide_size = slide.as_secs() as i64;
                let current_slide = (timestamp / slide_size) * slide_size;
                (current_slide, current_slide + window_size)
            }
        }
    }

    /// Generates SQL expressions for window boundaries
    pub fn to_sql(&self) -> Option<String> {
        match *self {
            TimeWindow::None => None,
            TimeWindow::Fixed(duration) => {
                let window_size = duration.as_secs();
                Some(format!(
                    "(timestamp / {}) * {} as window_start, 
                    ((timestamp / {}) + 1) * {} as window_end",
                    window_size, window_size, window_size, window_size
                ))
            },
            TimeWindow::Sliding { window, slide } => {
                let window_size = window.as_secs();
                let slide_size = slide.as_secs();
                Some(format!(
                    "(timestamp / {}) * {} as window_start,
                    ((timestamp / {}) * {} + {}) as window_end",
                    slide_size, slide_size, slide_size, slide_size, window_size
                ))
            }
        }
    }
}

impl AggregateFunction {
    /// Generates SQL for the aggregation function
    pub fn to_sql(&self, column: &str) -> String {
        match self {
            AggregateFunction::Sum => format!("SUM({})", column),
            AggregateFunction::Avg => format!("AVG({})", column),
            AggregateFunction::Min => format!("MIN({})", column),
            AggregateFunction::Max => format!("MAX({})", column),
            AggregateFunction::Count => format!("COUNT({})", column),
        }
    }
}

/// Builds a SQL query for aggregation.
///
/// This is the core query builder used by specific aggregation implementations.
/// It provides a flexible way to build SQL queries for different types of
/// time-series data aggregation.
///
/// # Arguments
///
/// * `table_name` - The source table name
/// * `function` - The aggregation function to apply
/// * `group_by` - The grouping specification
/// * `columns` - The columns to aggregate
/// * `from_timestamp` - Optional start of the time range
/// * `to_timestamp` - Optional end of the time range
///
/// # Returns
///
/// A SQL query string for the specified aggregation
pub fn build_aggregate_query(
    table_name: &str,
    function: AggregateFunction,
    group_by: &GroupBy,
    columns: &[&str],
    from_timestamp: Option<i64>,
    to_timestamp: Option<i64>,
) -> String {
    let mut query = String::new();
    
    // Build SELECT clause
    query.push_str("SELECT ");
    
    // Add group by columns
    if !group_by.columns.is_empty() {
        let cols: Vec<&str> = group_by.columns.iter().map(|s| s.as_str()).collect();
        query.push_str(&cols.join(", "));
        query.push_str(", ");
    }
    
    // Add time column if present
    if let Some(time_col) = &group_by.time_column {
        query.push_str(&format!("{}, ", time_col));
    }
    
    // Add aggregation function
    match function {
        AggregateFunction::Sum => query.push_str("SUM(value)"),
        AggregateFunction::Avg => query.push_str("AVG(value)"),
        AggregateFunction::Count => query.push_str("COUNT(*)"),
        AggregateFunction::Min => query.push_str("MIN(value)"),
        AggregateFunction::Max => query.push_str("MAX(value)"),
    }
    
    // Add FROM clause
    query.push_str(&format!(" FROM {}", table_name));
    
    // Add WHERE clause for timestamp range
    if let Some(from_ts) = from_timestamp {
        query.push_str(&format!(" WHERE timestamp >= {}", from_ts));
        if let Some(to_ts) = to_timestamp {
            query.push_str(&format!(" AND timestamp <= {}", to_ts));
        }
    }
    
    // Add GROUP BY clause
    if !group_by.columns.is_empty() || group_by.time_column.is_some() {
        query.push_str(" GROUP BY ");
        let mut group_cols = Vec::new();
        
        if !group_by.columns.is_empty() {
            let cols: Vec<&str> = group_by.columns.iter().map(|s| s.as_str()).collect();
            group_cols.extend(cols);
        }
        
        if let Some(time_col) = &group_by.time_column {
            group_cols.push(time_col.as_str());
        }
        
        query.push_str(&group_cols.join(", "));
    }
    
    query
}