1use std::time::Duration;
14use serde::{Serialize, Deserialize};
15use std::fmt::{Display, Formatter};
16
17#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
19pub enum TimeWindow {
20 None,
22 Fixed(Duration),
24 Sliding {
26 window: Duration,
27 slide: Duration,
28 },
29}
30
31#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
33pub enum AggregateFunction {
34 Count,
36 Sum,
38 Avg,
40 Min,
42 Max,
44}
45
46impl Display for AggregateFunction {
47 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
48 match self {
49 AggregateFunction::Count => write!(f, "COUNT"),
50 AggregateFunction::Sum => write!(f, "SUM"),
51 AggregateFunction::Avg => write!(f, "AVG"),
52 AggregateFunction::Min => write!(f, "MIN"),
53 AggregateFunction::Max => write!(f, "MAX"),
54 }
55 }
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct GroupBy {
61 pub columns: Vec<String>,
62 pub time_column: Option<String>,
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct AggregateResult {
68 pub value: f64,
69 pub timestamp: i64,
70}
71
72impl TimeWindow {
73 pub fn window_bounds(&self, timestamp: i64) -> (i64, i64) {
75 match *self {
76 TimeWindow::None => (i64::MIN, i64::MAX),
77 TimeWindow::Fixed(duration) => {
78 let window_size = duration.as_secs() as i64;
79 let window_start = (timestamp / window_size) * window_size;
80 (window_start, window_start + window_size)
81 },
82 TimeWindow::Sliding { window, slide } => {
83 let window_size = window.as_secs() as i64;
84 let slide_size = slide.as_secs() as i64;
85 let current_slide = (timestamp / slide_size) * slide_size;
86 (current_slide, current_slide + window_size)
87 }
88 }
89 }
90
91 pub fn to_sql(&self) -> Option<String> {
93 match *self {
94 TimeWindow::None => None,
95 TimeWindow::Fixed(duration) => {
96 let window_size = duration.as_secs();
97 Some(format!(
98 "(timestamp / {}) * {} as window_start,
99 ((timestamp / {}) + 1) * {} as window_end",
100 window_size, window_size, window_size, window_size
101 ))
102 },
103 TimeWindow::Sliding { window, slide } => {
104 let window_size = window.as_secs();
105 let slide_size = slide.as_secs();
106 Some(format!(
107 "(timestamp / {}) * {} as window_start,
108 ((timestamp / {}) * {} + {}) as window_end",
109 slide_size, slide_size, slide_size, slide_size, window_size
110 ))
111 }
112 }
113 }
114}
115
116impl AggregateFunction {
117 pub fn to_sql(&self, column: &str) -> String {
119 match self {
120 AggregateFunction::Sum => format!("SUM({})", column),
121 AggregateFunction::Avg => format!("AVG({})", column),
122 AggregateFunction::Min => format!("MIN({})", column),
123 AggregateFunction::Max => format!("MAX({})", column),
124 AggregateFunction::Count => format!("COUNT({})", column),
125 }
126 }
127}
128
129pub fn build_aggregate_query(
148 table_name: &str,
149 function: AggregateFunction,
150 group_by: &GroupBy,
151 _columns: &[&str],
152 from_timestamp: Option<i64>,
153 to_timestamp: Option<i64>,
154) -> String {
155 let mut query = String::new();
156
157 query.push_str("SELECT ");
159
160 if !group_by.columns.is_empty() {
162 let cols: Vec<&str> = group_by.columns.iter().map(|s| s.as_str()).collect();
163 query.push_str(&cols.join(", "));
164 query.push_str(", ");
165 }
166
167 if let Some(time_col) = &group_by.time_column {
169 query.push_str(&format!("{}, ", time_col));
170 }
171
172 match function {
174 AggregateFunction::Sum => query.push_str("SUM(value)"),
175 AggregateFunction::Avg => query.push_str("AVG(value)"),
176 AggregateFunction::Count => query.push_str("COUNT(*)"),
177 AggregateFunction::Min => query.push_str("MIN(value)"),
178 AggregateFunction::Max => query.push_str("MAX(value)"),
179 }
180
181 query.push_str(&format!(" FROM {}", table_name));
183
184 if let Some(from_ts) = from_timestamp {
186 query.push_str(&format!(" WHERE timestamp >= {}", from_ts));
187 if let Some(to_ts) = to_timestamp {
188 query.push_str(&format!(" AND timestamp <= {}", to_ts));
189 }
190 }
191
192 if !group_by.columns.is_empty() || group_by.time_column.is_some() {
194 query.push_str(" GROUP BY ");
195 let mut group_cols = Vec::new();
196
197 if !group_by.columns.is_empty() {
198 let cols: Vec<&str> = group_by.columns.iter().map(|s| s.as_str()).collect();
199 group_cols.extend(cols);
200 }
201
202 if let Some(time_col) = &group_by.time_column {
203 group_cols.push(time_col.as_str());
204 }
205
206 query.push_str(&group_cols.join(", "));
207 }
208
209 query
210}