use std::time::Duration;
use serde::{Deserialize, Serialize};
use crate::expr::Expr;
mod duration_millis {
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::time::Duration;
pub fn serialize<S: Serializer>(d: &Duration, s: S) -> Result<S::Ok, S::Error> {
d.as_millis().serialize(s)
}
pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Duration, D::Error> {
let millis = u64::deserialize(d)?;
Ok(Duration::from_millis(millis))
}
}
mod option_duration_millis {
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::time::Duration;
pub fn serialize<S: Serializer>(opt: &Option<Duration>, s: S) -> Result<S::Ok, S::Error> {
match opt {
Some(d) => d.as_millis().serialize(s),
None => s.serialize_none(),
}
}
pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Option<Duration>, D::Error> {
let opt: Option<u64> = Option::deserialize(d)?;
Ok(opt.map(Duration::from_millis))
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum AggregateFunction {
Count,
Sum,
Avg,
Min,
Max,
CollectList,
CollectSet,
TopK(usize),
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct AggregateExpr {
pub function: AggregateFunction,
pub expr: Expr,
pub alias: String,
pub distinct: bool,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum WindowSpec {
Tumbling {
#[serde(with = "duration_millis")]
size: Duration,
#[serde(with = "option_duration_millis")]
grace: Option<Duration>,
},
Hopping {
#[serde(with = "duration_millis")]
size: Duration,
#[serde(with = "duration_millis")]
advance: Duration,
#[serde(with = "option_duration_millis")]
grace: Option<Duration>,
},
Session {
#[serde(with = "duration_millis")]
gap: Duration,
#[serde(with = "option_duration_millis")]
grace: Option<Duration>,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum JoinType {
Inner,
Left,
Right,
FullOuter,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum SinkType {
Stream,
Table,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum QueryPlan {
Scan { source: String },
Filter {
input: Box<QueryPlan>,
predicate: Expr,
},
Project {
input: Box<QueryPlan>,
expressions: Vec<Expr>,
},
Aggregate {
input: Box<QueryPlan>,
group_by: Vec<Expr>,
aggregates: Vec<AggregateExpr>,
window: Option<WindowSpec>,
having: Option<Expr>,
},
Join {
left: Box<QueryPlan>,
right: Box<QueryPlan>,
join_type: JoinType,
on: Expr,
#[serde(with = "option_duration_millis")]
within: Option<Duration>,
},
Sink {
input: Box<QueryPlan>,
name: String,
topic: String,
sink_type: SinkType,
},
}
impl QueryPlan {
pub fn source_names(&self) -> Vec<String> {
match self {
QueryPlan::Scan { source } => vec![source.clone()],
QueryPlan::Filter { input, .. }
| QueryPlan::Project { input, .. }
| QueryPlan::Aggregate { input, .. }
| QueryPlan::Sink { input, .. } => input.source_names(),
QueryPlan::Join { left, right, .. } => {
let mut sources = left.source_names();
sources.extend(right.source_names());
sources
}
}
}
}