use std::time::Duration;
use serde::{Serialize, Deserialize};
use std::fmt::{Display, Formatter};
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum TimeWindow {
None,
Fixed(Duration),
Sliding {
window: Duration,
slide: Duration,
},
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum AggregateFunction {
Count,
Sum,
Avg,
Min,
Max,
}
impl Display for AggregateFunction {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
AggregateFunction::Count => write!(f, "COUNT"),
AggregateFunction::Sum => write!(f, "SUM"),
AggregateFunction::Avg => write!(f, "AVG"),
AggregateFunction::Min => write!(f, "MIN"),
AggregateFunction::Max => write!(f, "MAX"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GroupBy {
pub columns: Vec<String>,
pub time_column: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AggregateResult {
pub value: f64,
pub timestamp: i64,
}
impl TimeWindow {
pub fn window_bounds(&self, timestamp: i64) -> (i64, i64) {
match *self {
TimeWindow::None => (i64::MIN, i64::MAX),
TimeWindow::Fixed(duration) => {
let window_size = duration.as_secs() as i64;
let window_start = (timestamp / window_size) * window_size;
(window_start, window_start + window_size)
},
TimeWindow::Sliding { window, slide } => {
let window_size = window.as_secs() as i64;
let slide_size = slide.as_secs() as i64;
let current_slide = (timestamp / slide_size) * slide_size;
(current_slide, current_slide + window_size)
}
}
}
pub fn to_sql(&self) -> Option<String> {
match *self {
TimeWindow::None => None,
TimeWindow::Fixed(duration) => {
let window_size = duration.as_secs();
Some(format!(
"(timestamp / {}) * {} as window_start,
((timestamp / {}) + 1) * {} as window_end",
window_size, window_size, window_size, window_size
))
},
TimeWindow::Sliding { window, slide } => {
let window_size = window.as_secs();
let slide_size = slide.as_secs();
Some(format!(
"(timestamp / {}) * {} as window_start,
((timestamp / {}) * {} + {}) as window_end",
slide_size, slide_size, slide_size, slide_size, window_size
))
}
}
}
}
impl AggregateFunction {
pub fn to_sql(&self, column: &str) -> String {
match self {
AggregateFunction::Sum => format!("SUM({})", column),
AggregateFunction::Avg => format!("AVG({})", column),
AggregateFunction::Min => format!("MIN({})", column),
AggregateFunction::Max => format!("MAX({})", column),
AggregateFunction::Count => format!("COUNT({})", column),
}
}
}
pub fn build_aggregate_query(
table_name: &str,
function: AggregateFunction,
group_by: &GroupBy,
_columns: &[&str],
from_timestamp: Option<i64>,
to_timestamp: Option<i64>,
) -> String {
let mut query = String::new();
query.push_str("SELECT ");
if !group_by.columns.is_empty() {
let cols: Vec<&str> = group_by.columns.iter().map(|s| s.as_str()).collect();
query.push_str(&cols.join(", "));
query.push_str(", ");
}
if let Some(time_col) = &group_by.time_column {
query.push_str(&format!("{}, ", time_col));
}
match function {
AggregateFunction::Sum => query.push_str("SUM(value)"),
AggregateFunction::Avg => query.push_str("AVG(value)"),
AggregateFunction::Count => query.push_str("COUNT(*)"),
AggregateFunction::Min => query.push_str("MIN(value)"),
AggregateFunction::Max => query.push_str("MAX(value)"),
}
query.push_str(&format!(" FROM {}", table_name));
if let Some(from_ts) = from_timestamp {
query.push_str(&format!(" WHERE timestamp >= {}", from_ts));
if let Some(to_ts) = to_timestamp {
query.push_str(&format!(" AND timestamp <= {}", to_ts));
}
}
if !group_by.columns.is_empty() || group_by.time_column.is_some() {
query.push_str(" GROUP BY ");
let mut group_cols = Vec::new();
if !group_by.columns.is_empty() {
let cols: Vec<&str> = group_by.columns.iter().map(|s| s.as_str()).collect();
group_cols.extend(cols);
}
if let Some(time_col) = &group_by.time_column {
group_cols.push(time_col.as_str());
}
query.push_str(&group_cols.join(", "));
}
query
}