pub(crate) mod raw;
use super::{
err_generic, error_no_locals,
helper::Scope,
node_id::NodeId,
visitors::{ArgsRewriter, ConstFolder},
walkers::QueryWalker,
EventPath, HashMap, Helper, Ident, ImutExpr, InvokeAggrFn, NodeMeta, Path, Result, Script,
Serialize, Stmts, Upable, Value,
};
use super::{raw::BaseExpr, Consts};
use crate::ast::optimizer::Optimizer;
use crate::ast::Literal;
use crate::{errors::error_generic, impl_expr};
use raw::WindowName;
use simd_json::{Builder, Mutable, ValueAccess};
#[derive(Clone, Debug, PartialEq, Serialize)]
pub struct Query<'script> {
pub config: HashMap<String, Value<'script>>,
pub from: Vec<Ident<'script>>,
pub into: Vec<Ident<'script>>,
pub stmts: Stmts<'script>,
pub params: DefinitionalArgs<'script>,
pub scope: Scope<'script>,
pub(crate) mid: Box<NodeMeta>,
}
impl_expr!(Query);
#[derive(Clone, Debug, PartialEq, Serialize)]
pub enum Stmt<'script> {
WindowDefinition(Box<WindowDefinition<'script>>),
OperatorDefinition(OperatorDefinition<'script>),
ScriptDefinition(Box<ScriptDefinition<'script>>),
PipelineDefinition(Box<PipelineDefinition<'script>>),
StreamCreate(StreamCreate),
OperatorCreate(OperatorCreate<'script>),
ScriptCreate(ScriptCreate<'script>),
PipelineCreate(PipelineCreate<'script>),
SelectStmt(SelectStmt<'script>),
}
impl<'script> BaseExpr for Stmt<'script> {
fn meta(&self) -> &NodeMeta {
match self {
Stmt::WindowDefinition(s) => s.meta(),
Stmt::StreamCreate(s) => s.meta(),
Stmt::OperatorDefinition(s) => s.meta(),
Stmt::ScriptDefinition(s) => s.meta(),
Stmt::PipelineDefinition(s) => s.meta(),
Stmt::PipelineCreate(s) => s.meta(),
Stmt::OperatorCreate(s) => s.meta(),
Stmt::ScriptCreate(s) => s.meta(),
Stmt::SelectStmt(s) => s.meta(),
}
}
}
pub type Aggregates<'f> = Vec<InvokeAggrFn<'f>>;
pub type AggrSlice<'f> = [InvokeAggrFn<'f>];
#[derive(Clone, Debug, PartialEq, Serialize)]
pub struct SelectStmt<'script> {
pub stmt: Box<Select<'script>>,
pub aggregates: Aggregates<'script>,
pub consts: Consts<'script>,
pub locals: usize,
}
impl<'script> BaseExpr for SelectStmt<'script> {
fn meta(&self) -> &NodeMeta {
self.stmt.meta()
}
}
pub enum SelectType {
Passthrough,
Simple,
Normal,
}
impl SelectStmt<'_> {
#[must_use]
pub fn complexity(&self) -> SelectType {
if matches!(
&self.stmt.target,
ImutExpr::Path(Path::Event(EventPath {
segments, ..
})) if segments.is_empty()
) && self.stmt.maybe_group_by.is_none()
&& self.stmt.windows.is_empty()
{
if self.stmt.maybe_having.is_none() && self.stmt.maybe_where.is_none() {
SelectType::Passthrough
} else {
SelectType::Simple
}
} else {
SelectType::Normal
}
}
}
#[derive(Clone, Debug, PartialEq, Serialize, Eq)]
pub struct OperatorKind {
pub(crate) mid: Box<NodeMeta>,
pub module: String,
pub operation: String,
}
impl BaseExpr for OperatorKind {
fn meta(&self) -> &NodeMeta {
&self.mid
}
}
#[derive(Clone, Debug, PartialEq, Serialize)]
pub struct OperatorDefinition<'script> {
pub id: String,
pub(crate) mid: Box<NodeMeta>,
pub kind: OperatorKind,
pub params: DefinitionalArgsWith<'script>,
}
impl_expr!(OperatorDefinition);
#[derive(Clone, Debug, PartialEq, Serialize)]
pub struct OperatorCreate<'script> {
pub id: String,
pub(crate) mid: Box<NodeMeta>,
pub target: NodeId,
pub params: CreationalWith<'script>,
}
impl_expr!(OperatorCreate);
#[derive(Clone, Debug, PartialEq, Serialize)]
pub struct ScriptDefinition<'script> {
pub(crate) mid: Box<NodeMeta>,
pub id: String,
pub params: DefinitionalArgs<'script>,
pub script: Script<'script>,
pub named: HashMap<String, Script<'script>>,
}
impl_expr!(ScriptDefinition);
#[derive(Clone, Debug, PartialEq, Serialize)]
pub struct ScriptCreate<'script> {
pub id: String,
pub(crate) mid: Box<NodeMeta>,
pub target: NodeId,
pub params: CreationalWith<'script>,
}
impl_expr!(ScriptCreate);
pub type Config<'script> = Vec<(String, ImutExpr<'script>)>;
#[derive(Clone, Debug, PartialEq, Serialize)]
pub struct PipelineDefinition<'script> {
pub id: String,
pub(crate) mid: Box<NodeMeta>,
pub params: DefinitionalArgs<'script>,
pub from: Vec<Ident<'script>>,
pub into: Vec<Ident<'script>>,
pub config: Config<'script>,
pub stmts: Stmts<'script>,
pub scope: Scope<'script>,
}
impl_expr!(PipelineDefinition);
impl<'script> PipelineDefinition<'script> {
pub fn to_query<'registry>(
&self,
create: &CreationalWith<'script>,
helper: &mut Helper<'script, 'registry>,
) -> Result<Query<'script>> {
let mut create = create.clone();
Optimizer::new(helper).walk_creational_with(&mut create)?;
let mut args = create.render()?;
let mut config = HashMap::new();
for (k, v) in &self.config {
let v = v.clone();
let v = v.try_into_value(helper)?;
config.insert(k.to_string(), v);
}
let scope = self.scope.clone();
helper.set_scope(scope);
let mut params = self.params.clone();
for (k, v) in &mut params.args.0 {
if let Some(new) = args.remove(k.as_str())? {
*v = Some(*Literal::boxed_expr(Box::new(k.meta().clone()), new));
}
}
if let Some(k) = args.as_object().and_then(|o| o.keys().next()) {
return err_generic(&create, &create, &format!("Unknown parameter {k}"));
}
Optimizer::new(helper).walk_definitional_args(&mut params)?;
let inner_args = params.render()?;
let stmts = self
.stmts
.iter()
.cloned()
.map(|s| s.apply_args(&inner_args, helper, params.meta()))
.collect::<Result<_>>()?;
Ok(Query {
config,
stmts,
from: self.from.clone(),
into: self.into.clone(),
params: self.params.clone(),
scope: helper.leave_scope()?,
mid: self.mid.clone(),
})
}
}
#[derive(Clone, Debug, PartialEq, Serialize)]
pub struct PipelineCreate<'script> {
pub(crate) mid: Box<NodeMeta>,
pub target: NodeId,
pub port_stream_map: HashMap<String, String>,
pub params: CreationalWith<'script>,
pub alias: String,
}
impl<'script> BaseExpr for PipelineCreate<'script> {
fn meta(&self) -> &NodeMeta {
&self.mid
}
}
#[derive(Clone, Debug, PartialEq, Serialize, Eq)]
pub enum WindowKind {
Sliding,
Tumbling,
}
#[derive(Clone, Debug, PartialEq, Serialize)]
pub struct WindowDefinition<'script> {
pub id: String,
pub(crate) mid: Box<NodeMeta>,
pub kind: WindowKind,
pub params: CreationalWith<'script>,
pub script: Option<Script<'script>>,
pub tick_script: Option<Script<'script>>,
pub state: Option<Value<'script>>,
}
impl_expr!(WindowDefinition);
impl<'script> WindowDefinition<'script> {
pub const EMIT_EMPTY_WINDOWS: &'static str = "emit_empty_windows";
pub const MAX_GROUPS: &'static str = "max_groups";
pub const INTERVAL: &'static str = "interval";
pub const SIZE: &'static str = "size";
}
#[derive(Clone, Debug, PartialEq, Serialize)]
pub struct Select<'script> {
pub mid: Box<NodeMeta>,
pub from: (Ident<'script>, Ident<'script>),
pub into: (Ident<'script>, Ident<'script>),
pub target: ImutExpr<'script>,
pub maybe_where: Option<ImutExpr<'script>>,
pub maybe_having: Option<ImutExpr<'script>>,
pub maybe_group_by: Option<GroupBy<'script>>,
pub windows: Vec<WindowName>,
}
impl_expr!(Select);
#[derive(Clone, Debug, PartialEq, Serialize)]
pub enum GroupBy<'script> {
Expr {
mid: Box<NodeMeta>,
expr: ImutExpr<'script>,
},
Set {
mid: Box<NodeMeta>,
items: Vec<GroupBy<'script>>,
},
Each {
mid: Box<NodeMeta>,
expr: ImutExpr<'script>,
},
}
#[derive(Clone, Debug, PartialEq, Serialize, Eq)]
pub struct StreamCreate {
pub(crate) mid: Box<NodeMeta>,
pub id: String,
}
impl BaseExpr for StreamCreate {
fn meta(&self) -> &NodeMeta {
&self.mid
}
}
#[derive(Clone, Debug, PartialEq, Serialize)]
pub struct CreationalWith<'script> {
pub with: WithExprs<'script>,
pub(crate) mid: Box<NodeMeta>,
}
impl_expr!(CreationalWith);
impl<'script> CreationalWith<'script> {
pub(crate) fn substitute_args<'registry>(
&mut self,
args: &Value<'script>,
helper: &mut Helper<'script, 'registry>,
) -> Result<()> {
self.with.substitute_args(args, helper, &self.mid)
}
pub fn render(&self) -> Result<Value<'script>> {
let mut res = Value::object();
for (k, v) in &self.with.0 {
res.try_insert(k.id.clone(), v.try_as_lit()?.clone());
}
Ok(res)
}
}
#[derive(Clone, Debug, PartialEq, Serialize)]
pub struct DefinitionalArgsWith<'script> {
pub args: ArgsExprs<'script>,
pub with: WithExprs<'script>,
pub mid: Box<NodeMeta>,
}
impl<'script> DefinitionalArgsWith<'script> {
pub fn ingest_creational_with(&mut self, creational: &CreationalWith<'script>) -> Result<()> {
for (k, v) in &creational.with.0 {
if let Some((_, arg_v)) = self
.args
.0
.iter_mut()
.find(|(arg_key, _)| arg_key.id == k.id)
{
*arg_v = Some(v.clone());
} else {
return err_generic(creational, k, &"Unknown key");
}
}
if let Some((k, _)) = self.args.0.iter_mut().find(|(_, v)| v.is_none()) {
err_generic(creational, k, &"Missing key")
} else {
Ok(())
}
}
pub fn generate_config<'registry>(
&self,
helper: &mut Helper<'script, 'registry>,
) -> Result<Value<'script>> {
let args = self
.args
.0
.iter()
.map(|(k, expr)| {
let expr = expr
.clone()
.ok_or_else(|| format!("Missing configuration variable {k}"))?;
Ok((k.id.clone(), ConstFolder::reduce_to_val(helper, expr)?))
})
.collect::<Result<Value>>()?;
let config = self
.with
.0
.iter()
.map(|(k, v)| {
let mut expr = v.clone();
ArgsRewriter::new(args.clone(), helper, &self.mid).rewrite_expr(&mut expr)?;
Ok((k.id.to_string(), ConstFolder::reduce_to_val(helper, expr)?))
})
.collect();
config
}
}
#[derive(Clone, Debug, PartialEq, Serialize)]
pub struct DefinitionalArgs<'script> {
pub(crate) args: ArgsExprs<'script>,
pub(crate) mid: Box<NodeMeta>,
}
impl_expr!(DefinitionalArgs);
impl<'script> DefinitionalArgs<'script> {
pub fn ingest_creational_with(&mut self, creational: &CreationalWith<'script>) -> Result<()> {
for (k, v) in &creational.with.0 {
if let Some((_, arg_v)) = self
.args
.0
.iter_mut()
.find(|(arg_key, _)| arg_key.id == k.id)
{
*arg_v = Some(v.clone());
} else {
return err_generic(creational, k, &format!("Unknown argument: {}", k.as_str()));
}
}
if let Some((k, _)) = self.args.0.iter_mut().find(|(_, v)| v.is_none()) {
let k = k.clone();
err_generic(self, &k, &format!("Missing required argument: {k}"))
} else {
Ok(())
}
}
pub fn render(&self) -> Result<Value<'script>> {
let mut res = Value::object();
for (k, v) in &self.args.0 {
let v = v
.as_ref()
.ok_or_else(|| error_generic(k, k, &"Required key not provided"))?
.try_as_lit()?
.clone();
res.try_insert(k.id.clone(), v);
}
Ok(res)
}
}
pub type WithExpr<'script> = (Ident<'script>, ImutExpr<'script>);
#[derive(Clone, Debug, PartialEq, Serialize, Default)]
pub struct WithExprs<'script>(pub Vec<WithExpr<'script>>);
impl<'script> WithExprs<'script> {
pub(crate) fn substitute_args<'registry>(
&mut self,
args: &Value<'script>,
helper: &mut Helper<'script, 'registry>,
mid: &NodeMeta,
) -> Result<()> {
let mut old = Vec::new();
std::mem::swap(&mut old, &mut self.0);
self.0 = old
.into_iter()
.map(|(name, mut value_expr)| {
ArgsRewriter::new(args.clone(), helper, mid).rewrite_expr(&mut value_expr)?;
Optimizer::new(helper).walk_imut_expr(&mut value_expr)?;
Ok((name, value_expr))
})
.collect::<Result<_>>()?;
Ok(())
}
}
pub type ArgsExpr<'script> = (Ident<'script>, Option<ImutExpr<'script>>);
#[derive(Clone, Debug, PartialEq, Serialize, Default)]
pub struct ArgsExprs<'script>(pub Vec<ArgsExpr<'script>>);
impl<'script> Stmt<'script> {
fn apply_args(
mut self,
args: &Value<'script>,
helper: &mut Helper<'script, '_>,
mid: &NodeMeta,
) -> Result<Self> {
match &mut self {
Stmt::WindowDefinition(_)
| Stmt::OperatorDefinition(_)
| Stmt::ScriptDefinition(_)
| Stmt::PipelineDefinition(_)
| Stmt::StreamCreate(_) => (),
Stmt::SelectStmt(s) => {
ArgsRewriter::new(args.clone(), helper, mid).walk_select_stmt(s)?;
Optimizer::new(helper).walk_select_stmt(s)?;
}
Stmt::OperatorCreate(d) => d.params.substitute_args(args, helper)?,
Stmt::ScriptCreate(d) => d.params.substitute_args(args, helper)?,
Stmt::PipelineCreate(d) => d.params.substitute_args(args, helper)?,
};
Ok(self)
}
}