use crate::types::Atom;
use serde::{Deserialize, Serialize};
use std::fmt;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum QueryAst {
Select(SelectQuery),
Find(FindQuery),
Aggregate(AggregateQuery),
TemporalJoin(TemporalJoinQuery),
Stream(StreamQuery),
Explain(Box<QueryAst>),
Macro(MacroDefinition),
Lineage(LineageQuery),
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct SelectQuery {
pub projections: Vec<Projection>,
pub from: Option<KeyPattern>,
pub where_clause: Option<WhereClause>,
pub order_by: Option<OrderBy>,
pub limit: Option<u64>,
pub offset: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct FindQuery {
pub filter: FilterDocument,
pub projection: Option<ProjectionDocument>,
pub sort: Option<SortDocument>,
pub limit: Option<u64>,
pub skip: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
pub struct FilterDocument {
pub raw: serde_json::Value,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
pub struct ProjectionDocument {
pub raw: serde_json::Value,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
pub struct SortDocument {
pub raw: serde_json::Value,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct AggregateQuery {
pub aggregations: Vec<AggregateFunction>,
pub from: Option<KeyPattern>,
pub where_clause: Option<WhereClause>,
pub group_by: Option<GroupBy>,
pub having: Option<WhereClause>,
pub order_by: Option<OrderBy>,
pub limit: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum AggregateFunction {
Count,
Sum,
Avg,
Min,
Max,
First,
Last,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum GroupBy {
Key,
TimeBucket(TimeBucket),
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum TimeBucket {
Minute,
Hour,
Day,
Week,
Month,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct TemporalJoinQuery {
pub left: KeyPattern,
pub right: KeyPattern,
pub join_type: String,
pub within_micros: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct StreamQuery {
pub name: String,
pub body: Box<QueryAst>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct MacroDefinition {
pub name: String,
pub params: Vec<String>,
pub body: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct LineageQuery {
pub key: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum KeyPattern {
Exact(String),
Prefix(String),
Glob(String),
Regex(String),
Union(Vec<KeyPattern>),
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum ComparisonOp {
Eq,
Ne,
Gt,
Gte,
Lt,
Lte,
In,
Nin,
Like,
Regex,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum BooleanOp {
And(Vec<Condition>),
Or(Vec<Condition>),
Not(Box<Condition>),
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum OrderField {
Key,
Value,
Timestamp,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum Direction {
Asc,
Desc,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct OrderBy {
pub field: OrderField,
pub direction: Direction,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
pub struct TimeRange {
pub start: Option<u64>,
pub end: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum ValueFilter {
Single(Atom),
List(Vec<Atom>),
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct WhereClause {
pub root: Condition,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum Condition {
Comparison {
field: OrderField,
op: ComparisonOp,
rhs: ValueFilter,
},
Key(KeyPattern),
TimeRange(TimeRange),
Anomaly(AnomalyMethod),
Pattern(TimeSeriesPattern),
Freshness(FreshnessCondition),
Similarity(SimilarityCondition),
Boolean(BooleanOp),
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
pub enum AnomalyMethod {
ZScore(f64),
Iqr(f64),
MovingAverage {
window: u64,
threshold: f64,
},
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum TimeSeriesPattern {
Spike {
threshold: f64,
max_duration_micros: Option<u64>,
},
Dip {
threshold: f64,
max_duration_micros: Option<u64>,
},
Rising {
min_duration_micros: u64,
},
Falling {
min_duration_micros: u64,
},
Plateau {
min_duration_micros: u64,
tolerance: f64,
},
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum FreshnessCondition {
Compare {
op: ComparisonOp,
value: f64,
},
Stale,
Fresh,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct SimilarityCondition {
pub query_vector: Vec<f32>,
pub k: usize,
pub index_hint: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum Projection {
All,
Key,
Value,
Timestamp,
Function(FunctionCall),
Predict(PredictExpr),
Aliased {
inner: Box<Projection>,
alias: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct FunctionCall {
pub name: String,
pub args: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct PredictExpr {
pub method: String,
pub horizon: u64,
pub interval: Option<String>,
pub args: Vec<(String, String)>,
}
impl fmt::Display for QueryAst {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
QueryAst::Select(q) => write!(f, "{}", q),
QueryAst::Find(q) => write!(f, "FIND {:?}", q.filter.raw),
QueryAst::Aggregate(q) => write!(f, "{}", q),
QueryAst::TemporalJoin(q) => {
write!(f, "{} TEMPORAL JOIN {} {}", q.left, q.right, q.join_type)
}
QueryAst::Stream(q) => write!(f, "CREATE STREAM {} AS {}", q.name, q.body),
QueryAst::Explain(inner) => write!(f, "EXPLAIN {}", inner),
QueryAst::Macro(m) => {
write!(
f,
"CREATE MACRO {}({}) AS {}",
m.name,
m.params.join(", "),
m.body
)
}
QueryAst::Lineage(l) => write!(f, "LINEAGE({})", l.key),
}
}
}
impl fmt::Display for SelectQuery {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "SELECT ")?;
if self.projections.is_empty() {
write!(f, "*")?;
} else {
for (i, p) in self.projections.iter().enumerate() {
if i > 0 {
write!(f, ", ")?;
}
write!(f, "{}", p)?;
}
}
if let Some(from) = &self.from {
write!(f, " FROM {}", from)?;
}
if let Some(w) = &self.where_clause {
write!(f, " WHERE {}", w.root)?;
}
if let Some(order) = &self.order_by {
write!(f, " ORDER BY {:?} {:?}", order.field, order.direction)?;
}
if let Some(lim) = self.limit {
write!(f, " LIMIT {}", lim)?;
}
if let Some(off) = self.offset {
write!(f, " OFFSET {}", off)?;
}
Ok(())
}
}
impl fmt::Display for AggregateQuery {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "SELECT ")?;
for (i, a) in self.aggregations.iter().enumerate() {
if i > 0 {
write!(f, ", ")?;
}
write!(f, "{:?}", a)?;
}
if let Some(from) = &self.from {
write!(f, " FROM {}", from)?;
}
if let Some(w) = &self.where_clause {
write!(f, " WHERE {}", w.root)?;
}
if let Some(g) = &self.group_by {
write!(f, " GROUP BY {:?}", g)?;
}
Ok(())
}
}
impl fmt::Display for KeyPattern {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
KeyPattern::Exact(s) => write!(f, "\"{}\"", s),
KeyPattern::Prefix(s) => write!(f, "\"{}*\"", s),
KeyPattern::Glob(s) => write!(f, "\"{}\"", s),
KeyPattern::Regex(s) => write!(f, "REGEX(\"{}\")", s),
KeyPattern::Union(parts) => {
for (i, p) in parts.iter().enumerate() {
if i > 0 {
write!(f, " OR ")?;
}
write!(f, "{}", p)?;
}
Ok(())
}
}
}
}
impl fmt::Display for Condition {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Condition::Comparison { field, op, rhs } => {
write!(f, "{:?} {:?} {:?}", field, op, rhs)
}
Condition::Key(kp) => write!(f, "key MATCHES {}", kp),
Condition::TimeRange(tr) => {
write!(f, "timestamp BETWEEN {:?} AND {:?}", tr.start, tr.end)
}
Condition::Anomaly(m) => write!(f, "ANOMALY(value, {:?})", m),
Condition::Pattern(p) => write!(f, "MATCHES_PATTERN(value, {:?})", p),
Condition::Freshness(fc) => write!(f, "{:?}", fc),
Condition::Similarity(s) => write!(f, "SIMILAR_TO(<vec>, {})", s.k),
Condition::Boolean(BooleanOp::And(cs)) => {
for (i, c) in cs.iter().enumerate() {
if i > 0 {
write!(f, " AND ")?;
}
write!(f, "({})", c)?;
}
Ok(())
}
Condition::Boolean(BooleanOp::Or(cs)) => {
for (i, c) in cs.iter().enumerate() {
if i > 0 {
write!(f, " OR ")?;
}
write!(f, "({})", c)?;
}
Ok(())
}
Condition::Boolean(BooleanOp::Not(c)) => write!(f, "NOT ({})", c),
}
}
}
impl fmt::Display for Projection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Projection::All => write!(f, "*"),
Projection::Key => write!(f, "key"),
Projection::Value => write!(f, "value"),
Projection::Timestamp => write!(f, "timestamp"),
Projection::Function(fc) => write!(f, "{}({})", fc.name, fc.args.join(", ")),
Projection::Predict(p) => write!(f, "PREDICT({}, horizon={})", p.method, p.horizon),
Projection::Aliased { inner, alias } => write!(f, "{} AS {}", inner, alias),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn serde_roundtrip_simple_select() {
let ast = QueryAst::Select(SelectQuery {
projections: vec![Projection::All],
from: Some(KeyPattern::Glob("sensor/*".into())),
where_clause: None,
order_by: Some(OrderBy {
field: OrderField::Timestamp,
direction: Direction::Asc,
}),
limit: Some(100),
offset: None,
});
let bytes = bincode::serialize(&ast).unwrap();
let decoded: QueryAst = bincode::deserialize(&bytes).unwrap();
assert_eq!(ast, decoded);
}
#[test]
fn display_pretty_prints_select() {
let ast = QueryAst::Select(SelectQuery {
projections: vec![Projection::Key, Projection::Value],
from: Some(KeyPattern::Prefix("sensor/".into())),
where_clause: None,
order_by: None,
limit: Some(10),
offset: None,
});
let s = format!("{}", ast);
assert!(s.contains("SELECT"));
assert!(s.contains("FROM"));
assert!(s.contains("LIMIT 10"));
}
}