use std::collections::BTreeSet;
use serde_json::{Map, Value};
use crate::{
Db,
database::SchemaProvider,
executor::{eval::Eval, helpers::Helpers},
parser::{
aggregators_helper::{Accumulator as AggAcc, AggregateRegistry},
analyzer::AnalyzerError,
ast::{Column, JoinType, ScalarExpr, Truth},
},
planner::{aggregate_call::AggregateCall, logical_plan::LogicalPlan},
};
pub trait Executor {
fn execute(&self, db: &Db) -> Result<Vec<Value>, AnalyzerError>;
}
pub struct PlanExecutor {
plan: LogicalPlan,
}
impl Executor for PlanExecutor {
fn execute(&self, db: &Db) -> Result<Vec<Value>, AnalyzerError> {
Self::run_plan(&self.plan, db)
}
}
type GroupEntry = (Vec<Value>, Vec<Box<dyn AggAcc>>);
impl PlanExecutor {
pub fn new(plan: LogicalPlan) -> Self {
Self { plan }
}
pub fn run_plan(plan: &LogicalPlan, db: &Db) -> Result<Vec<Value>, AnalyzerError> {
match plan {
LogicalPlan::Scan { backing, visible } => {
let coll = db
.get(backing)
.ok_or_else(|| AnalyzerError::Other(format!("unknown collection {backing}")))?;
let mut out = Vec::new();
for v in coll.get_all() {
if let Value::Object(map) = v {
let mut m = Map::new();
for (k, vv) in map {
m.insert(format!("{}.{}", visible, k), vv);
}
out.push(Value::Object(m));
}
}
Ok(out)
}
LogicalPlan::Filter { input, predicate } => {
let rows = Self::run_plan(input, db)?;
let mut out = Vec::new();
for v in rows {
if let Value::Object(m) = &v {
if matches!(Eval::eval_predicate3(predicate, m), Truth::True) {
out.push(v);
}
}
}
Ok(out)
}
LogicalPlan::Aggregate {
input,
group_keys,
aggs,
} => {
let rows = Self::run_plan(input, db)?;
Self::aggregate_rows(rows, group_keys, aggs)
}
LogicalPlan::Project { input, exprs } => {
let rows = Self::run_plan(input, db)?;
let mut out = Vec::new();
for v in rows {
let mut proj = Map::new();
let obj = v.as_object().unwrap();
for id in exprs {
let val = Eval::eval_scalar(&id.expression, obj);
proj.insert(id.output_name.clone(), val);
}
out.push(Value::Object(proj));
}
Ok(out)
}
LogicalPlan::Sort { input, keys } => {
let mut rows = Self::run_plan(input, db)?;
rows.sort_by(|a, b| {
let ao = a.as_object().unwrap();
let bo = b.as_object().unwrap();
for k in keys {
let av = Eval::eval_scalar(&k.expr, ao);
let bv = Eval::eval_scalar(&k.expr, bo);
let ord = Helpers::cmp_json_for_sort(&av, &bv, k.ascending);
if !ord.is_eq() {
return ord;
}
}
std::cmp::Ordering::Equal
});
Ok(rows)
}
LogicalPlan::Limit {
input,
limit,
offset,
} => {
let rows = Self::run_plan(input, db)?;
let start = offset.unwrap_or(0).max(0) as usize;
let mut end = rows.len();
if let Some(lim) = limit {
end = (start + (*lim).max(0) as usize).min(rows.len());
}
Ok(rows.get(start..end).unwrap_or(&[]).to_vec())
}
LogicalPlan::Join {
left,
right,
join_type,
on,
} => {
let left_rows = Self::run_plan(left, db)?;
let right_rows = Self::run_plan(right, db)?;
let left_keys = Self::keyset_for_side(left, &left_rows, db);
let right_keys = Self::keyset_for_side(right, &right_rows, db);
let merge_objs = |lo: &Map<String, Value>, ro: &Map<String, Value>| -> Value {
let mut out = Map::new();
for (k, v) in lo {
out.insert(k.clone(), v.clone());
}
for (k, v) in ro {
out.insert(k.clone(), v.clone());
}
Value::Object(out)
};
let null_extended =
|obj: &Map<String, Value>, all_keys: &BTreeSet<String>| -> Map<String, Value> {
let mut out = Map::new();
for k in all_keys {
if let Some(v) = obj.get(k) {
out.insert(k.clone(), v.clone());
} else {
out.insert(k.clone(), Value::Null);
}
}
out
};
let mut out: Vec<Value> = Vec::new();
match join_type {
JoinType::Inner => {
for l in &left_rows {
let lo = l.as_object().unwrap();
for r in &right_rows {
let ro = r.as_object().unwrap();
let merged = merge_objs(lo, ro);
let mref = merged.as_object().unwrap();
if matches!(
crate::executor::eval::Eval::eval_predicate3(on, mref),
Truth::True
) {
out.push(merged);
}
}
}
}
JoinType::Left => {
for l in &left_rows {
let lo = l.as_object().unwrap();
let mut matched = false;
for r in &right_rows {
let ro = r.as_object().unwrap();
let merged = merge_objs(lo, ro);
let mref = merged.as_object().unwrap();
if matches!(
crate::executor::eval::Eval::eval_predicate3(on, mref),
Truth::True
) {
out.push(merged);
matched = true;
}
}
if !matched {
let right_nulls = null_extended(&Map::new(), &right_keys);
out.push(Value::Object(
merge_objs(lo, &right_nulls).as_object().unwrap().clone(),
));
}
}
}
JoinType::Right => {
for r in &right_rows {
let ro = r.as_object().unwrap();
let mut matched = false;
for l in &left_rows {
let lo = l.as_object().unwrap();
let merged = merge_objs(lo, ro);
let mref = merged.as_object().unwrap();
if matches!(
crate::executor::eval::Eval::eval_predicate3(on, mref),
Truth::True
) {
out.push(merged);
matched = true;
}
}
if !matched {
let left_nulls = null_extended(&Map::new(), &left_keys);
out.push(Value::Object(
merge_objs(&left_nulls, ro).as_object().unwrap().clone(),
));
}
}
}
JoinType::Full => {
let mut right_matched: Vec<bool> = vec![false; right_rows.len()];
for l in &left_rows {
let lo = l.as_object().unwrap();
let mut matched_any = false;
for (i, r) in right_rows.iter().enumerate() {
let ro = r.as_object().unwrap();
let merged = merge_objs(lo, ro);
let mref = merged.as_object().unwrap();
if matches!(
crate::executor::eval::Eval::eval_predicate3(on, mref),
Truth::True
) {
out.push(merged);
right_matched[i] = true;
matched_any = true;
}
}
if !matched_any {
let right_nulls = null_extended(&Map::new(), &right_keys);
out.push(Value::Object(
merge_objs(lo, &right_nulls).as_object().unwrap().clone(),
));
}
}
for (i, r) in right_rows.iter().enumerate() {
if !right_matched[i] {
let ro = r.as_object().unwrap();
let left_nulls = null_extended(&Map::new(), &left_keys);
out.push(Value::Object(
merge_objs(&left_nulls, ro).as_object().unwrap().clone(),
));
}
}
}
}
Ok(out)
}
}
}
fn keyset_for_side(side_plan: &LogicalPlan, rows: &Vec<Value>, db: &Db) -> BTreeSet<String> {
let mut keys: BTreeSet<String> = BTreeSet::new();
if let LogicalPlan::Scan { backing, visible } = side_plan {
if let Some(schema) = db.schema_of(backing) {
for (col, _fi) in schema.fields {
keys.insert(format!("{}.{}", visible, col));
}
return keys;
}
}
for v in rows {
if let Some(m) = v.as_object() {
for k in m.keys() {
keys.insert(k.clone());
}
}
}
keys
}
fn aggregate_rows(
rows: Vec<Value>,
group_keys: &[Column],
calls: &[AggregateCall],
) -> Result<Vec<Value>, AnalyzerError> {
use std::collections::{HashMap, HashSet};
let mut groups: HashMap<String, GroupEntry> = HashMap::new();
let registry = AggregateRegistry::default_aggregate_registry();
let mut distinct: HashMap<(String, usize), HashSet<String>> = HashMap::new();
for v in rows {
let obj = v.as_object().unwrap();
let gb_vals: Vec<Value> = group_keys
.iter()
.map(|c| {
let expr = ScalarExpr::Column(c.clone());
Eval::eval_scalar(&expr, obj)
})
.collect();
let gk = Helpers::canonical_tuple(&gb_vals);
let entry = groups.entry(gk.clone()).or_insert_with(|| {
let accs: Vec<Box<dyn AggAcc>> = calls
.iter()
.map(|call| registry.get(&call.func).unwrap().create_accumulator())
.collect();
(gb_vals.clone(), accs)
});
for (i, call) in calls.iter().enumerate() {
let args: Vec<Value> = if call.func.eq_ignore_ascii_case("count")
&& call.args.len() == 1
&& matches!(call.args[0], ScalarExpr::WildCard)
{
vec![Value::Bool(true)]
} else {
call.args
.iter()
.map(|a| Eval::eval_scalar(a, obj))
.collect()
};
if call.distinct {
let key = Helpers::canonical_tuple(&args);
let set = distinct.entry((gk.clone(), i)).or_default();
if set.insert(key) {
entry.1[i].update(&args)?;
}
} else {
entry.1[i].update(&args)?;
}
}
}
let mut out = Vec::new();
for (_gk, (gb_vals, accs)) in groups.into_iter() {
let mut m = Map::new();
for (idx, c) in group_keys.iter().enumerate() {
let key = match c {
Column::WithCollection { collection, name } => {
format!("{}.{}", collection, name)
}
Column::Name { name } => name.clone(),
};
m.insert(key, gb_vals[idx].clone());
}
let mut used: HashSet<String> = m.keys().cloned().collect();
for (call, acc) in calls.iter().zip(accs.iter()) {
let base = call.func.to_ascii_lowercase();
let mut name = base.clone();
let mut k = 1usize;
while used.contains(&name) {
name = format!("{}_{}", base, k);
k += 1;
}
used.insert(name.clone());
m.insert(name, acc.finalize());
}
out.push(Value::Object(m));
}
Ok(out)
}
pub fn default_name_for_expr(e: &ScalarExpr) -> String {
match e {
ScalarExpr::Column(Column::WithCollection { collection, name }) => {
format!("{}.{}", collection, name)
}
ScalarExpr::Column(Column::Name { name }) => name.clone(),
ScalarExpr::Function(f) => f.name.to_ascii_lowercase(),
ScalarExpr::Literal(_) => "_lit".into(),
ScalarExpr::WildCard | ScalarExpr::WildCardWithCollection(_) => "*".into(),
ScalarExpr::Parameter => "?".into(),
ScalarExpr::Args(_) => "(...)".into(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::JsonPrimitive;
use crate::database::{Db, DbConfig, IdType};
use crate::parser::analyzer::AnalyzedIdentifier;
use crate::parser::analyzer::AnalyzedQuery;
use crate::parser::ast::{
Column, ComparatorOp, Function, JoinType, Literal, OrderBy, Predicate, ScalarExpr,
};
use crate::planner::logical_plan::LogicalPlan;
use crate::planner::plan_builder::PlanBuilder;
use serde_json::json;
fn mk_db() -> Db {
let db = Db::new_with_config(DbConfig {
id_type: IdType::None,
id_key: "id".into(),
});
let t = db.create("t");
t.add_batch(json!([
{ "id": 1, "cat": "a", "amt": 10.0 },
{ "id": 2, "cat": "a", "amt": 15.0 },
{ "id": 3, "cat": "b", "amt": 7.5 },
{ "id": 4, "cat": "b", "amt": null },
{ "id": 5, "cat": "a", "amt": 22.5 }
]));
db
}
fn analyzed_sum_by_cat() -> AnalyzedQuery {
AnalyzedQuery {
projection: vec![
AnalyzedIdentifier {
expression: ScalarExpr::Column(Column::WithCollection {
collection: "t".into(),
name: "cat".into(),
}),
alias: Some("cat".into()),
ty: JsonPrimitive::String,
nullable: false,
output_name: "cat".into(),
},
AnalyzedIdentifier {
expression: ScalarExpr::Function(Function {
name: "sum".into(),
distinct: false,
args: vec![ScalarExpr::Column(Column::WithCollection {
collection: "t".into(),
name: "amt".into(),
})],
}),
alias: Some("total".into()),
ty: JsonPrimitive::Float, nullable: true,
output_name: "total".into(),
},
],
collections: vec![("t".into(), "t".into())],
criteria: Some(Predicate::Compare {
left: ScalarExpr::Column(Column::WithCollection {
collection: "t".into(),
name: "id".into(),
}),
op: ComparatorOp::Gt,
right: ScalarExpr::Literal(Literal::Int(1)),
}),
group_by: vec![Column::WithCollection {
collection: "t".into(),
name: "cat".into(),
}],
having: Some(Predicate::Compare {
left: ScalarExpr::Function(Function {
name: "sum".into(),
distinct: false,
args: vec![ScalarExpr::Column(Column::WithCollection {
collection: "t".into(),
name: "amt".into(),
})],
}),
op: ComparatorOp::Gt,
right: ScalarExpr::Literal(Literal::Float(
ordered_float::NotNan::new(20.0).unwrap(),
)),
}),
order_by: vec![OrderBy {
expr: ScalarExpr::Column(Column::Name { name: "cat".into() }), ascending: true,
}],
limit: Some(10),
offset: None,
joins: vec![],
}
}
#[test]
fn execute_group_by_sum_having_sort_limit() {
let db = mk_db();
let aq = analyzed_sum_by_cat();
let plan = PlanBuilder::from_analyzed(&aq).unwrap();
let exec = PlanExecutor::new(plan);
let rows = exec.execute(&db).unwrap();
assert_eq!(rows.len(), 1);
let obj = rows[0].as_object().unwrap();
assert_eq!(obj.get("cat").unwrap(), "a");
let total = obj.get("total").unwrap().as_f64().unwrap();
assert!((total - 37.5).abs() < 1e-9);
}
fn mk_db_simple() -> Db {
Db::new_with_config(DbConfig {
id_type: IdType::None,
id_key: "id".into(),
})
}
fn mk_db_for_scan() -> Db {
let db = mk_db_simple();
let t = db.create("t");
t.add_batch(json!([
{ "id": 1, "name": "Ana", "k": 2, "val": 10.0 },
{ "id": 2, "name": "Bob", "k": 1, "val": null }
]));
db
}
#[test]
fn scan_prefixes_columns_with_visible_name() {
let db = mk_db_for_scan();
let plan = LogicalPlan::Scan {
backing: "t".into(),
visible: "t".into(),
};
let out = PlanExecutor::run_plan(&plan, &db).unwrap();
for row in &out {
let obj = row.as_object().unwrap();
assert!(obj.contains_key("t.id"));
assert!(obj.contains_key("t.name"));
}
}
#[test]
fn filter_only_truth_rows_pass() {
let db = mk_db_for_scan();
let scan = LogicalPlan::Scan {
backing: "t".into(),
visible: "t".into(),
};
let pred = Predicate::Compare {
left: ScalarExpr::Column(Column::WithCollection {
collection: "t".into(),
name: "val".into(),
}),
op: ComparatorOp::Gt,
right: ScalarExpr::Literal(Literal::Float(ordered_float::NotNan::new(5.0).unwrap())),
};
let plan = LogicalPlan::Filter {
input: Box::new(scan),
predicate: pred,
};
let out = PlanExecutor::run_plan(&plan, &db).unwrap();
assert_eq!(out.len(), 1);
assert_eq!(out[0]["t.id"], json!(1));
}
#[test]
fn project_uses_alias_or_default_names() {
let db = mk_db_for_scan();
let scan = LogicalPlan::Scan {
backing: "t".into(),
visible: "t".into(),
};
let exprs = vec![
crate::parser::analyzer::AnalyzedIdentifier {
expression: ScalarExpr::Function(Function {
name: "upper".into(),
distinct: false,
args: vec![ScalarExpr::Column(Column::WithCollection {
collection: "t".into(),
name: "name".into(),
})],
}),
alias: Some("uname".into()),
ty: crate::JsonPrimitive::String,
nullable: false,
output_name: "uname".into(),
},
crate::parser::analyzer::AnalyzedIdentifier {
expression: ScalarExpr::Column(Column::WithCollection {
collection: "t".into(),
name: "k".into(),
}),
alias: None,
ty: crate::JsonPrimitive::Int,
nullable: false,
output_name: "k".into(),
},
];
let plan = LogicalPlan::Project {
input: Box::new(scan),
exprs,
};
let out = PlanExecutor::run_plan(&plan, &db).unwrap();
let row = out[0].as_object().unwrap();
assert!(row.contains_key("uname")); assert!(row.contains_key("k")); }
#[test]
fn sort_ascending_and_descending_nulls_last() {
let db = mk_db_for_scan();
let scan = LogicalPlan::Scan {
backing: "t".into(),
visible: "t".into(),
};
let asc = LogicalPlan::Sort {
input: Box::new(scan.clone()),
keys: vec![OrderBy {
expr: ScalarExpr::Column(Column::WithCollection {
collection: "t".into(),
name: "val".into(),
}),
ascending: true,
}],
};
let rows_asc = PlanExecutor::run_plan(&asc, &db).unwrap();
assert_eq!(rows_asc[0]["t.id"], json!(1));
assert_eq!(rows_asc[1]["t.id"], json!(2));
let desc = LogicalPlan::Sort {
input: Box::new(scan),
keys: vec![OrderBy {
expr: ScalarExpr::Column(Column::WithCollection {
collection: "t".into(),
name: "val".into(),
}),
ascending: false,
}],
};
let rows_desc = PlanExecutor::run_plan(&desc, &db).unwrap();
assert_eq!(rows_desc[0]["t.id"], json!(1));
assert_eq!(rows_desc[1]["t.id"], json!(2));
}
#[test]
fn limit_and_offset_bounds() {
let db = mk_db_for_scan();
let scan = LogicalPlan::Scan {
backing: "t".into(),
visible: "t".into(),
};
let sorted = LogicalPlan::Sort {
input: Box::new(scan),
keys: vec![OrderBy {
expr: ScalarExpr::Column(Column::WithCollection {
collection: "t".into(),
name: "id".into(),
}),
ascending: true,
}],
};
let p = LogicalPlan::Limit {
input: Box::new(sorted),
limit: Some(5),
offset: Some(1),
};
let out = PlanExecutor::run_plan(&p, &db).unwrap();
assert_eq!(out.len(), 1);
assert_eq!(out[0]["t.id"], json!(2));
}
fn mk_db_for_agg() -> Db {
let db = mk_db_simple();
let t = db.create("t");
t.add_batch(json!([
{ "id": 1, "cat": "a", "amt": 10.0 },
{ "id": 2, "cat": "a", "amt": 15.0 },
{ "id": 3, "cat": "b", "amt": 7.5 },
{ "id": 4, "cat": "b", "amt": null },
{ "id": 5, "cat": "a", "amt": 22.5 },
{ "id": 6, "cat": "c", "amt": null }
]));
db
}
#[test]
fn aggregate_group_by_with_multiple_aggs_and_having_and_order() {
let db = mk_db_for_agg();
let scan = LogicalPlan::Scan {
backing: "t".into(),
visible: "t".into(),
};
let filter = LogicalPlan::Filter {
input: Box::new(scan),
predicate: Predicate::Compare {
left: ScalarExpr::Column(Column::WithCollection {
collection: "t".into(),
name: "id".into(),
}),
op: ComparatorOp::Gt,
right: ScalarExpr::Literal(Literal::Int(1)),
},
};
let aggs = vec![
AggregateCall {
func: "sum".into(),
args: vec![ScalarExpr::Column(Column::WithCollection {
collection: "t".into(),
name: "amt".into(),
})],
distinct: false,
},
AggregateCall {
func: "count".into(),
args: vec![ScalarExpr::WildCard],
distinct: false,
},
];
let group_keys = vec![Column::WithCollection {
collection: "t".into(),
name: "cat".into(),
}];
let agg = LogicalPlan::Aggregate {
input: Box::new(filter),
group_keys: group_keys.clone(),
aggs,
};
let having = LogicalPlan::Filter {
input: Box::new(agg),
predicate: Predicate::Compare {
left: ScalarExpr::Column(Column::Name { name: "sum".into() }),
op: ComparatorOp::Gt,
right: ScalarExpr::Literal(Literal::Float(
ordered_float::NotNan::new(20.0).unwrap(),
)),
},
};
let sort = LogicalPlan::Sort {
input: Box::new(having),
keys: vec![OrderBy {
expr: ScalarExpr::Column(Column::Name {
name: "t.cat".into(),
}),
ascending: true,
}],
};
let proj = LogicalPlan::Project {
input: Box::new(sort),
exprs: vec![
crate::parser::analyzer::AnalyzedIdentifier {
expression: ScalarExpr::Column(Column::Name {
name: "t.cat".into(),
}),
alias: Some("cat".into()),
ty: crate::JsonPrimitive::String,
nullable: false,
output_name: "cat".into(),
},
crate::parser::analyzer::AnalyzedIdentifier {
expression: ScalarExpr::Column(Column::Name { name: "sum".into() }),
alias: Some("total".into()),
ty: crate::JsonPrimitive::Float,
nullable: true,
output_name: "total".into(),
},
crate::parser::analyzer::AnalyzedIdentifier {
expression: ScalarExpr::Column(Column::Name {
name: "count".into(),
}),
alias: Some("n".into()),
ty: crate::JsonPrimitive::Int,
nullable: false,
output_name: "n".into(),
},
],
};
let out = PlanExecutor::run_plan(&proj, &db).unwrap();
assert_eq!(out.len(), 1);
let r = out[0].as_object().unwrap();
assert_eq!(r["cat"], json!("a"));
assert!((r["total"].as_f64().unwrap() - 37.5).abs() < 1e-9);
assert_eq!(r["n"], json!(2)); }
#[test]
fn aggregate_distinct_count_and_sum_distinct() {
let db = mk_db_simple();
let t = db.create("t");
t.add_batch(json!([
{ "id": 1, "x": 1, "y": 10.0 },
{ "id": 2, "x": 1, "y": 10.0 },
{ "id": 3, "x": 2, "y": 10.0 },
{ "id": 4, "x": 2, "y": null }
]));
drop(t);
let scan = LogicalPlan::Scan {
backing: "t".into(),
visible: "t".into(),
};
let aggs = vec![
AggregateCall {
func: "count".into(),
args: vec![ScalarExpr::Column(Column::WithCollection {
collection: "t".into(),
name: "y".into(),
})],
distinct: true,
},
AggregateCall {
func: "sum".into(),
args: vec![ScalarExpr::Column(Column::WithCollection {
collection: "t".into(),
name: "y".into(),
})],
distinct: true,
},
];
let group_keys = vec![Column::WithCollection {
collection: "t".into(),
name: "x".into(),
}];
let plan = LogicalPlan::Aggregate {
input: Box::new(scan),
group_keys: group_keys.clone(),
aggs,
};
let out = PlanExecutor::run_plan(&plan, &db).unwrap();
let mut byx = std::collections::HashMap::new();
for r in out {
let o = r.as_object().unwrap();
byx.insert(o["t.x"].clone(), (o["count"].clone(), o["sum"].clone()));
}
let (c1, s1) = byx.get(&json!(1)).unwrap();
assert_eq!(*c1, json!(1));
assert!((s1.as_f64().unwrap() - 10.0).abs() < 1e-9);
let (c2, s2) = byx.get(&json!(2)).unwrap();
assert_eq!(*c2, json!(1));
assert!((s2.as_f64().unwrap() - 10.0).abs() < 1e-9);
}
#[test]
fn aggregate_avg_min_max_and_null_only_group() {
let db = mk_db_simple();
let t = db.create("t");
t.add_batch(json!([
{ "id": 1, "g": "a", "v": 2.0 },
{ "id": 2, "g": "a", "v": 4.0 },
{ "id": 3, "g": "b", "v": null }
]));
drop(t);
let scan = LogicalPlan::Scan {
backing: "t".into(),
visible: "t".into(),
};
let aggs = vec![
AggregateCall {
func: "avg".into(),
args: vec![ScalarExpr::Column(Column::WithCollection {
collection: "t".into(),
name: "v".into(),
})],
distinct: false,
},
AggregateCall {
func: "min".into(),
args: vec![ScalarExpr::Column(Column::WithCollection {
collection: "t".into(),
name: "v".into(),
})],
distinct: false,
},
AggregateCall {
func: "max".into(),
args: vec![ScalarExpr::Column(Column::WithCollection {
collection: "t".into(),
name: "v".into(),
})],
distinct: false,
},
];
let group_keys = vec![Column::WithCollection {
collection: "t".into(),
name: "g".into(),
}];
let plan = LogicalPlan::Aggregate {
input: Box::new(scan),
group_keys,
aggs,
};
let mut out = PlanExecutor::run_plan(&plan, &db).unwrap();
let mut map = std::collections::HashMap::new();
for r in out.drain(..) {
let o = r.as_object().unwrap();
map.insert(
o["t.g"].clone(),
(o["avg"].clone(), o["min"].clone(), o["max"].clone()),
);
}
let (a_avg, a_min, a_max) = map.get(&json!("a")).unwrap();
assert!((a_avg.as_f64().unwrap() - 3.0).abs() < 1e-9);
assert!((a_min.as_f64().unwrap() - 2.0).abs() < 1e-9);
assert!((a_max.as_f64().unwrap() - 4.0).abs() < 1e-9);
let (b_avg, b_min, b_max) = map.get(&json!("b")).unwrap();
assert!(b_avg.is_null());
assert!(b_min.is_null());
assert!(b_max.is_null());
}
#[test]
fn join_node_executes_inner_cross_join() {
use serde_json::json;
let db = Db::new_with_config(DbConfig {
id_type: IdType::None,
id_key: "id".into(),
});
let t = db.create("t");
let u = db.create("u");
t.add_batch(json!([
{ "id": 1, "x": "A" },
{ "id": 2, "x": "B" }
]));
u.add_batch(json!([
{ "id": 10, "y": true },
{ "id": 20, "y": false }
]));
let plan = LogicalPlan::Join {
left: Box::new(LogicalPlan::Scan {
backing: "t".into(),
visible: "t".into(),
}),
right: Box::new(LogicalPlan::Scan {
backing: "u".into(),
visible: "u".into(),
}),
join_type: JoinType::Inner,
on: Predicate::Const3(Truth::True),
};
let rows = PlanExecutor::run_plan(&plan, &db).expect("join should execute");
assert_eq!(rows.len(), 4);
let r0 = rows[0].as_object().expect("object row");
assert!(r0.contains_key("t.id"));
assert!(r0.contains_key("t.x"));
assert!(r0.contains_key("u.id"));
assert!(r0.contains_key("u.y"));
}
#[test]
fn left_join_emits_unmatched_left_rows_with_null_right_side() {
let db = Db::new_with_config(DbConfig {
id_type: IdType::None,
id_key: "id".into(),
});
let t = db.create("t");
let _u = db.create("u");
t.add_batch(json!([
{ "id": 1, "x": "A" },
{ "id": 2, "x": "B" }
]));
let plan = LogicalPlan::Join {
left: Box::new(LogicalPlan::Scan {
backing: "t".into(),
visible: "t".into(),
}),
right: Box::new(LogicalPlan::Scan {
backing: "u".into(),
visible: "u".into(),
}),
join_type: JoinType::Left,
on: Predicate::Const3(Truth::True),
};
let rows = PlanExecutor::run_plan(&plan, &db).expect("left join should execute");
assert_eq!(rows.len(), 2);
for row in rows {
let obj = row.as_object().unwrap();
assert!(obj.contains_key("t.id"));
assert!(obj.contains_key("t.x"));
assert!(obj.contains_key("u.id"));
}
}
#[test]
fn left_join_null_ext_uses_schema_even_when_right_is_empty() {
let db = Db::new_with_config(DbConfig {
id_type: IdType::None,
id_key: "id".into(),
});
let t = db.create("t");
let u = db.create("u");
t.add_batch(json!([
{ "id": 1, "x": "A" },
{ "id": 2, "x": "B" }
]));
u.add_batch(json!([
{ "id": 999, "y": true }
]));
u.clear();
let plan = LogicalPlan::Join {
left: Box::new(LogicalPlan::Scan {
backing: "t".into(),
visible: "t".into(),
}),
right: Box::new(LogicalPlan::Scan {
backing: "u".into(),
visible: "u".into(),
}),
join_type: JoinType::Left,
on: Predicate::Const3(Truth::True),
};
let rows = PlanExecutor::run_plan(&plan, &db).expect("left join should execute");
assert_eq!(rows.len(), 2);
for row in rows {
let obj = row.as_object().unwrap();
assert!(obj.contains_key("t.id"));
assert!(obj.contains_key("t.x"));
assert!(obj.contains_key("u.id") && obj.get("u.id").unwrap().is_null());
assert!(obj.contains_key("u.y") && obj.get("u.y").unwrap().is_null());
}
}
#[test]
fn keyset_for_side_scan_uses_schema_with_visible_prefix() {
let db = mk_db();
let t = db.create("t");
t.add_batch(json!([
{ "id": 1, "x": "A" },
{ "id": 2, "x": "B" }
]));
let plan = LogicalPlan::Scan {
backing: "t".into(),
visible: "tt".into(),
};
let rows: Vec<serde_json::Value> = vec![];
let keys = PlanExecutor::keyset_for_side(&plan, &rows, &db);
let expected = BTreeSet::from_iter(["tt.id".to_string(), "tt.x".to_string()]);
assert_eq!(keys, expected);
}
#[test]
fn keyset_for_side_scan_empty_rows_but_schema_known_still_returns_prefixed_keys() {
let db = mk_db();
let u = db.create("u");
u.add_batch(json!([{ "id": 99, "y": true }]));
let _ = u.clear();
let plan = LogicalPlan::Scan {
backing: "u".into(),
visible: "uuu".into(),
};
let rows: Vec<serde_json::Value> = vec![];
let keys = PlanExecutor::keyset_for_side(&plan, &rows, &db);
let expected = BTreeSet::from_iter(["uuu.id".to_string(), "uuu.y".to_string()]);
assert_eq!(keys, expected);
}
#[test]
fn keyset_for_side_non_scan_falls_back_to_observed_row_keys() {
let db = mk_db();
let t = db.create("t");
t.add_batch(json!([{ "id": 1, "x": "A" }]));
let scan = LogicalPlan::Scan {
backing: "t".into(),
visible: "t".into(),
};
let observed = PlanExecutor::run_plan(&scan, &db).unwrap();
let non_scan = LogicalPlan::Project {
input: Box::new(scan),
exprs: vec![], };
let keys = PlanExecutor::keyset_for_side(&non_scan, &observed, &db);
let expected = BTreeSet::from_iter(["t.id".to_string(), "t.x".to_string()]);
assert_eq!(keys, expected);
}
#[test]
fn keyset_for_side_no_schema_and_no_rows_returns_empty_set() {
let db = mk_db();
let plan = LogicalPlan::Scan {
backing: "missing".into(),
visible: "m".into(),
};
let rows: Vec<Value> = vec![];
let keys = PlanExecutor::keyset_for_side(&plan, &rows, &db);
assert!(keys.is_empty());
}
}