use super::delta::LogicalTime;
use super::molecule::Molecule;
use super::store::TypeRegistry;
use crate::syntax::ast::*;
use crate::value::Value;
use crate::Error;
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Clone, Default)]
pub struct EvalCtx {
pub bindings: BTreeMap<String, Molecule>,
pub groups: BTreeMap<String, Vec<Molecule>>,
}
impl EvalCtx {
pub fn bind(mut self, name: impl Into<String>, m: Molecule) -> Self {
self.bindings.insert(name.into(), m);
self
}
pub fn bind_group(mut self, name: impl Into<String>, g: Vec<Molecule>) -> Self {
self.groups.insert(name.into(), g);
self
}
}
pub fn eval_expr(expr: &Expr, ctx: &EvalCtx) -> Result<Value, Error> {
match expr {
Expr::LitNull => Ok(Value::Null),
Expr::LitBool(b) => Ok(Value::Bool(*b)),
Expr::LitInt(n) => Ok(Value::Int(*n)),
Expr::LitFloat(f) => Ok(Value::Float(*f)),
Expr::LitString(s) => Ok(Value::String(s.clone())),
Expr::LitDuration(ms) => Ok(Value::Duration(*ms)),
Expr::Ident(name) => {
Err(Error::Runtime(format!(
"bare identifier `{name}` not allowed in expression context (use `name.field`)"
)))
}
Expr::FieldAccess(base, field) => {
let name = match base.as_ref() {
Expr::Ident(n) => n,
other => {
return Err(Error::Runtime(format!(
"v0 only supports binding.field, got {other:?}.field"
)))
}
};
let mol = ctx.bindings.get(name).ok_or_else(|| {
Error::Runtime(format!("unbound name `{name}`"))
})?;
mol.fields.get(field).cloned().ok_or_else(|| {
Error::Runtime(format!(
"no field `{field}` on `{}`",
mol.kind_name
))
})
}
Expr::Call(name, args) => match name.as_str() {
"now" if args.is_empty() => Ok(Value::Timestamp(now_ms())),
"uuid" if args.is_empty() => Ok(Value::Uuid(uuid::Uuid::new_v4())),
"count" if args.len() == 1 => {
if let Expr::Ident(n) = &args[0] {
if let Some(g) = ctx.groups.get(n) {
return Ok(Value::Int(g.len() as i64));
}
}
match eval_expr(&args[0], ctx)? {
Value::List(items) => Ok(Value::Int(items.len() as i64)),
other => Err(Error::Runtime(format!(
"count(): expected group binding or List, got {}",
other.type_name()
))),
}
}
"sum" if args.len() == 1 => aggregate_int_list("sum", &args[0], ctx, |xs| {
Ok(xs.iter().sum::<i64>())
}),
"min" if args.len() == 1 => aggregate_int_list("min", &args[0], ctx, |xs| {
xs.iter()
.copied()
.min()
.ok_or_else(|| Error::Runtime("min(): empty list".into()))
}),
"max" if args.len() == 1 => aggregate_int_list("max", &args[0], ctx, |xs| {
xs.iter()
.copied()
.max()
.ok_or_else(|| Error::Runtime("max(): empty list".into()))
}),
"avg" if args.len() == 1 => aggregate_int_list("avg", &args[0], ctx, |xs| {
if xs.is_empty() {
Err(Error::Runtime("avg(): empty list".into()))
} else {
Ok(xs.iter().sum::<i64>() / xs.len() as i64)
}
}),
other => Err(Error::Runtime(format!(
"unknown builtin `{other}`"
))),
},
Expr::ListComp {
var,
src,
body,
where_clause,
} => {
let group = match src.as_ref() {
Expr::Ident(name) => ctx.groups.get(name).cloned(),
_ => None,
}
.ok_or_else(|| {
Error::Runtime(
"list comprehension source must be a group binding (plan 18+ adds Value::List sources)"
.into(),
)
})?;
let mut results = Vec::with_capacity(group.len());
for mol in group {
let mut tmp = ctx.clone();
tmp.bindings.insert(var.clone(), mol);
if let Some(pred) = where_clause {
match eval_expr(pred, &tmp)? {
Value::Bool(true) => {}
Value::Bool(false) => continue,
other => {
return Err(Error::Runtime(format!(
"list comprehension `where` returned {}, expected Bool",
other.type_name()
)))
}
}
}
results.push(eval_expr(body, &tmp)?);
}
Ok(Value::List(results))
}
Expr::BinaryOp(op, lhs, rhs) => {
match op {
BinaryOp::And => {
let l = eval_expr(lhs, ctx)?;
let lb = as_bool(&l)?;
if !lb {
return Ok(Value::Bool(false));
}
let r = eval_expr(rhs, ctx)?;
Ok(Value::Bool(as_bool(&r)?))
}
BinaryOp::Or => {
let l = eval_expr(lhs, ctx)?;
let lb = as_bool(&l)?;
if lb {
return Ok(Value::Bool(true));
}
let r = eval_expr(rhs, ctx)?;
Ok(Value::Bool(as_bool(&r)?))
}
_ => {
let l = eval_expr(lhs, ctx)?;
let r = eval_expr(rhs, ctx)?;
apply_binop(*op, l, r)
}
}
}
Expr::UnaryOp(op, inner) => {
let v = eval_expr(inner, ctx)?;
apply_unaryop(*op, v)
}
Expr::ListLit(items) => {
let mut values = Vec::with_capacity(items.len());
for item in items {
values.push(eval_expr(item, ctx)?);
}
Ok(Value::List(values))
}
Expr::StructLit(_, _) => Err(Error::Runtime(
"nested struct literals not supported in v0".into(),
)),
}
}
fn aggregate_int_list<F>(
name: &str,
arg: &Expr,
ctx: &EvalCtx,
fold: F,
) -> Result<Value, Error>
where
F: FnOnce(&[i64]) -> Result<i64, Error>,
{
let value = eval_expr(arg, ctx)?;
let items = match value {
Value::List(items) => items,
other => {
return Err(Error::Runtime(format!(
"{name}(): expected List, got {}",
other.type_name()
)))
}
};
let mut numbers = Vec::with_capacity(items.len());
for item in items {
match item {
Value::Int(n) => numbers.push(n),
other => {
return Err(Error::Runtime(format!(
"{name}(): non-Int element {}",
other.type_name()
)))
}
}
}
Ok(Value::Int(fold(&numbers)?))
}
fn as_bool(v: &Value) -> Result<bool, Error> {
match v {
Value::Bool(b) => Ok(*b),
other => Err(Error::Runtime(format!(
"expected Bool, got {}",
other.type_name()
))),
}
}
fn integral(v: &Value) -> Option<i64> {
match v {
Value::Int(n) | Value::Timestamp(n) | Value::Duration(n) => Some(*n),
_ => None,
}
}
fn apply_binop(op: BinaryOp, l: Value, r: Value) -> Result<Value, Error> {
use BinaryOp::*;
match op {
Add | Sub | Mul | Div => {
let (a, b) = (
integral(&l).ok_or_else(|| arith_err(op, &l, &r))?,
integral(&r).ok_or_else(|| arith_err(op, &l, &r))?,
);
let result = match op {
Add => a + b,
Sub => a - b,
Mul => a * b,
Div => {
if b == 0 {
return Err(Error::Runtime("division by zero".into()));
}
a / b
}
_ => unreachable!(),
};
Ok(promote_arith(&l, &r, result))
}
Eq => Ok(Value::Bool(value_eq(&l, &r))),
Ne => Ok(Value::Bool(!value_eq(&l, &r))),
Lt | Le | Gt | Ge => {
let (a, b) = (
integral(&l).ok_or_else(|| arith_err(op, &l, &r))?,
integral(&r).ok_or_else(|| arith_err(op, &l, &r))?,
);
let result = match op {
Lt => a < b,
Le => a <= b,
Gt => a > b,
Ge => a >= b,
_ => unreachable!(),
};
Ok(Value::Bool(result))
}
And | Or => unreachable!("handled above with short-circuit"),
}
}
fn promote_arith(l: &Value, r: &Value, n: i64) -> Value {
match (l, r) {
(Value::Timestamp(_), _) | (_, Value::Timestamp(_)) => Value::Timestamp(n),
(Value::Duration(_), _) | (_, Value::Duration(_)) => Value::Duration(n),
_ => Value::Int(n),
}
}
fn arith_err(op: BinaryOp, l: &Value, r: &Value) -> Error {
Error::Runtime(format!(
"unsupported binary op {op:?} on {} and {}",
l.type_name(),
r.type_name()
))
}
fn value_eq(l: &Value, r: &Value) -> bool {
match (l, r) {
(Value::Int(a), Value::Int(b)) => a == b,
(Value::Timestamp(a), Value::Timestamp(b)) => a == b,
(Value::Duration(a), Value::Duration(b)) => a == b,
_ => l == r,
}
}
fn apply_unaryop(op: UnaryOp, v: Value) -> Result<Value, Error> {
match (op, v) {
(UnaryOp::Not, Value::Bool(b)) => Ok(Value::Bool(!b)),
(UnaryOp::Neg, Value::Int(n)) => Ok(Value::Int(-n)),
(UnaryOp::Neg, Value::Float(f)) => Ok(Value::Float(-f)),
(UnaryOp::Neg, Value::Duration(n)) => Ok(Value::Duration(-n)),
(op, v) => Err(Error::Runtime(format!(
"unsupported unary op {op:?} on {}",
v.type_name()
))),
}
}
pub fn eval_emit(
emit: &EmitClause,
ctx: &EvalCtx,
registry: &Arc<TypeRegistry>,
) -> Result<Molecule, Error> {
let schema = registry
.schema_by_name(&emit.molecule_name)
.ok_or_else(|| Error::Runtime(format!("unknown emit target {}", emit.molecule_name)))?;
let kind = registry
.id_by_name(&emit.molecule_name)
.ok_or_else(|| Error::Runtime(format!("unknown kind {}", emit.molecule_name)))?;
let mut fields: BTreeMap<String, Value> = BTreeMap::new();
for fa in &emit.fields {
fields.insert(fa.name.clone(), eval_expr(&fa.value, ctx)?);
}
for f in &schema.fields {
if !fields.contains_key(&f.name) {
if let Some(default) = &f.default {
fields.insert(f.name.clone(), eval_expr(default, ctx)?);
}
}
}
Ok(Molecule {
kind,
kind_name: emit.molecule_name.clone(),
fields,
ts: LogicalTime(0), })
}
pub fn eval_merge(
merge: &MergeFn,
old: &Molecule,
new: &Molecule,
registry: &Arc<TypeRegistry>,
) -> Result<Molecule, Error> {
let ctx = EvalCtx::default()
.bind(&merge.old_binding, old.clone())
.bind(&merge.new_binding, new.clone());
match &merge.body {
Expr::StructLit(name, fields) => {
let synthetic = EmitClause {
molecule_name: name.clone(),
fields: fields.clone(),
};
let mut out = eval_emit(&synthetic, &ctx, registry)?;
out.kind = old.kind;
out.kind_name = old.kind_name.clone();
Ok(out)
}
Expr::Ident(name) => ctx
.bindings
.get(name)
.cloned()
.ok_or_else(|| Error::Runtime(format!("merge body refers to unbound `{name}`"))),
other => Err(Error::Runtime(format!(
"merge body must be a struct literal or binding name, got {other:?}"
))),
}
}
fn now_ms() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.unwrap_or(0)
}