use std::time::Duration;
use crate::expr::Expr;
use crate::plan::{AggregateExpr, AggregateFunction, JoinType, QueryPlan, SinkType, WindowSpec};
pub struct QueryBuilder {
plan: QueryPlan,
}
impl QueryBuilder {
pub fn from_source(name: &str) -> Self {
Self {
plan: QueryPlan::Scan {
source: name.to_string(),
},
}
}
pub fn filter(self, predicate: Expr) -> Self {
Self {
plan: QueryPlan::Filter {
input: Box::new(self.plan),
predicate,
},
}
}
pub fn select(self, expressions: &[Expr]) -> Self {
Self {
plan: QueryPlan::Project {
input: Box::new(self.plan),
expressions: expressions.to_vec(),
},
}
}
pub fn group_by(self, keys: &[Expr]) -> AggregateBuilder {
AggregateBuilder {
input: self.plan,
group_by: keys.to_vec(),
aggregates: vec![],
window: None,
having: None,
}
}
pub fn join(self, right_source: &str, join_type: JoinType, on: Expr) -> JoinBuilder {
JoinBuilder {
left: self.plan,
right: QueryPlan::Scan {
source: right_source.to_string(),
},
join_type,
on,
within: None,
}
}
pub fn as_stream(self, name: &str, topic: &str) -> SinkBuilder {
SinkBuilder {
input: self.plan,
name: name.to_string(),
topic: topic.to_string(),
sink_type: SinkType::Stream,
}
}
pub fn as_table(self, name: &str, topic: &str) -> SinkBuilder {
SinkBuilder {
input: self.plan,
name: name.to_string(),
topic: topic.to_string(),
sink_type: SinkType::Table,
}
}
pub fn build(self) -> QueryPlan {
self.plan
}
}
pub struct AggregateBuilder {
input: QueryPlan,
group_by: Vec<Expr>,
aggregates: Vec<AggregateExpr>,
window: Option<WindowSpec>,
having: Option<Expr>,
}
impl AggregateBuilder {
pub fn count_star(mut self, alias: &str) -> Self {
self.aggregates.push(AggregateExpr {
function: AggregateFunction::Count,
expr: Expr::Wildcard,
alias: alias.to_string(),
distinct: false,
});
self
}
pub fn count(mut self, expr: Expr, alias: &str) -> Self {
self.aggregates.push(AggregateExpr {
function: AggregateFunction::Count,
expr,
alias: alias.to_string(),
distinct: false,
});
self
}
pub fn count_distinct(mut self, expr: Expr, alias: &str) -> Self {
self.aggregates.push(AggregateExpr {
function: AggregateFunction::Count,
expr,
alias: alias.to_string(),
distinct: true,
});
self
}
pub fn sum(mut self, expr: Expr, alias: &str) -> Self {
self.aggregates.push(AggregateExpr {
function: AggregateFunction::Sum,
expr,
alias: alias.to_string(),
distinct: false,
});
self
}
pub fn avg(mut self, expr: Expr, alias: &str) -> Self {
self.aggregates.push(AggregateExpr {
function: AggregateFunction::Avg,
expr,
alias: alias.to_string(),
distinct: false,
});
self
}
pub fn min(mut self, expr: Expr, alias: &str) -> Self {
self.aggregates.push(AggregateExpr {
function: AggregateFunction::Min,
expr,
alias: alias.to_string(),
distinct: false,
});
self
}
pub fn max(mut self, expr: Expr, alias: &str) -> Self {
self.aggregates.push(AggregateExpr {
function: AggregateFunction::Max,
expr,
alias: alias.to_string(),
distinct: false,
});
self
}
pub fn collect_list(mut self, expr: Expr, alias: &str) -> Self {
self.aggregates.push(AggregateExpr {
function: AggregateFunction::CollectList,
expr,
alias: alias.to_string(),
distinct: false,
});
self
}
pub fn collect_set(mut self, expr: Expr, alias: &str) -> Self {
self.aggregates.push(AggregateExpr {
function: AggregateFunction::CollectSet,
expr,
alias: alias.to_string(),
distinct: false,
});
self
}
pub fn topk(mut self, k: usize, expr: Expr, alias: &str) -> Self {
self.aggregates.push(AggregateExpr {
function: AggregateFunction::TopK(k),
expr,
alias: alias.to_string(),
distinct: false,
});
self
}
pub fn aggregate(mut self, agg: AggregateExpr) -> Self {
self.aggregates.push(agg);
self
}
pub fn tumbling(mut self, size: Duration) -> Self {
self.window = Some(WindowSpec::Tumbling { size, grace: None });
self
}
pub fn tumbling_with_grace(mut self, size: Duration, grace: Duration) -> Self {
self.window = Some(WindowSpec::Tumbling {
size,
grace: Some(grace),
});
self
}
pub fn hopping(mut self, size: Duration, advance: Duration) -> Self {
self.window = Some(WindowSpec::Hopping {
size,
advance,
grace: None,
});
self
}
pub fn hopping_with_grace(
mut self,
size: Duration,
advance: Duration,
grace: Duration,
) -> Self {
self.window = Some(WindowSpec::Hopping {
size,
advance,
grace: Some(grace),
});
self
}
pub fn session(mut self, gap: Duration) -> Self {
self.window = Some(WindowSpec::Session { gap, grace: None });
self
}
pub fn session_with_grace(mut self, gap: Duration, grace: Duration) -> Self {
self.window = Some(WindowSpec::Session {
gap,
grace: Some(grace),
});
self
}
pub fn having(mut self, predicate: Expr) -> Self {
self.having = Some(predicate);
self
}
pub fn as_stream(self, name: &str, topic: &str) -> SinkBuilder {
SinkBuilder {
input: self.build_aggregate(),
name: name.to_string(),
topic: topic.to_string(),
sink_type: SinkType::Stream,
}
}
pub fn as_table(self, name: &str, topic: &str) -> SinkBuilder {
SinkBuilder {
input: self.build_aggregate(),
name: name.to_string(),
topic: topic.to_string(),
sink_type: SinkType::Table,
}
}
pub fn build(self) -> QueryPlan {
self.build_aggregate()
}
fn build_aggregate(self) -> QueryPlan {
QueryPlan::Aggregate {
input: Box::new(self.input),
group_by: self.group_by,
aggregates: self.aggregates,
window: self.window,
having: self.having,
}
}
}
pub struct JoinBuilder {
left: QueryPlan,
right: QueryPlan,
join_type: JoinType,
on: Expr,
within: Option<Duration>,
}
impl JoinBuilder {
pub fn within(mut self, duration: Duration) -> Self {
self.within = Some(duration);
self
}
pub fn select(self, expressions: &[Expr]) -> QueryBuilder {
QueryBuilder {
plan: QueryPlan::Project {
input: Box::new(self.build()),
expressions: expressions.to_vec(),
},
}
}
pub fn filter(self, predicate: Expr) -> QueryBuilder {
QueryBuilder {
plan: QueryPlan::Filter {
input: Box::new(self.build()),
predicate,
},
}
}
pub fn as_stream(self, name: &str, topic: &str) -> SinkBuilder {
SinkBuilder {
input: self.build(),
name: name.to_string(),
topic: topic.to_string(),
sink_type: SinkType::Stream,
}
}
pub fn build(self) -> QueryPlan {
QueryPlan::Join {
left: Box::new(self.left),
right: Box::new(self.right),
join_type: self.join_type,
on: self.on,
within: self.within,
}
}
}
pub struct SinkBuilder {
input: QueryPlan,
name: String,
topic: String,
sink_type: SinkType,
}
impl SinkBuilder {
pub fn build(self) -> QueryPlan {
QueryPlan::Sink {
input: Box::new(self.input),
name: self.name,
topic: self.topic,
sink_type: self.sink_type,
}
}
}
pub use crate::expr::{ExprExt, col, lit_bool, lit_f64, lit_i64, lit_null, lit_str};