use toasty_core::driver::{ExecResponse, Rows};
use toasty_core::stmt;
use crate::Result;
use crate::engine::eval;
use crate::engine::exec::{Action, Exec, Output, VarId};
#[derive(Debug, Clone)]
pub(crate) struct NestedMerge {
pub(crate) inputs: Vec<VarId>,
pub(crate) output: Output,
pub(crate) root: NestedLevel,
pub(crate) hash_indexes: Vec<MergeIndex>,
pub(crate) sort_indexes: Vec<MergeIndex>,
}
#[derive(Debug, Clone)]
pub(crate) struct MergeIndex {
pub(crate) source: usize,
pub(crate) child_projections: Vec<stmt::Projection>,
}
#[derive(Debug, Clone)]
pub(crate) struct NestedLevel {
pub(crate) source: usize,
pub(crate) projection: eval::Func,
pub(crate) nested: Vec<NestedChild>,
}
#[derive(Debug, Clone)]
pub(crate) struct NestedChild {
pub(crate) level: NestedLevel,
pub(crate) qualification: MergeQualification,
pub(crate) single: bool,
}
#[derive(Debug, Clone)]
pub(crate) enum MergeQualification {
All,
HashLookup {
index: usize,
lookup_key: eval::Func,
},
SortLookup {
index: usize,
lookup_key: eval::Func,
},
Scan(eval::Func),
}
#[derive(Debug)]
struct RowStack<'a> {
parent: Option<&'a RowStack<'a>>,
row: &'a stmt::Value,
position: usize,
}
#[derive(Debug)]
struct RowAndNested<'a> {
row: &'a stmt::Value,
nested: &'a [stmt::Value],
}
#[derive(Debug)]
enum Input {
Count(u64),
Value(Vec<stmt::Value>),
}
struct Indices<'a> {
hash: Vec<stmt::HashIndex<'a>>,
sort: Vec<stmt::SortedIndex<'a>>,
}
impl Exec<'_> {
pub(super) async fn action_nested_merge(&mut self, action: &NestedMerge) -> Result<()> {
let mut inputs = Vec::with_capacity(action.inputs.len());
for var_id in &action.inputs {
let response = self.vars.load(*var_id).await?;
inputs.push(match response.values {
Rows::Count(count) => Input::Count(count),
Rows::Value(value) => Input::Value(match value {
stmt::Value::List(items) => items,
value => vec![value],
}),
Rows::Stream(value_stream) => Input::Value(value_stream.collect().await?),
});
}
let indices = Indices {
hash: action
.hash_indexes
.iter()
.map(|mi| {
let Input::Value(values) = &inputs[mi.source] else {
panic!("HashLookup source must be a Value input, not Count")
};
stmt::HashIndex::new(values, &mi.child_projections)
})
.collect(),
sort: action
.sort_indexes
.iter()
.map(|mi| {
let Input::Value(values) = &inputs[mi.source] else {
panic!("SortLookup source must be a Value input, not Count")
};
stmt::SortedIndex::new(values, &mi.child_projections)
})
.collect(),
};
let mut merged_rows = vec![];
match &inputs[action.root.source] {
Input::Count(count) => {
let row_stack = RowStack {
parent: None,
row: &stmt::Value::Null,
position: 0,
};
for _ in 0..*count {
merged_rows.push(self.merge_nested_row(
&row_stack,
&action.root,
&inputs,
&indices,
)?);
}
}
Input::Value(root_rows) => {
for row in root_rows {
let stack = RowStack {
parent: None,
row,
position: 0,
};
merged_rows.push(self.merge_nested_row(
&stack,
&action.root,
&inputs,
&indices,
)?);
}
}
}
self.vars.store(
action.output.var,
action.output.num_uses,
ExecResponse::from_rows(Rows::value_stream(merged_rows)),
);
Ok(())
}
fn merge_nested_row(
&self,
row_stack: &RowStack<'_>,
level: &NestedLevel,
inputs: &[Input],
indices: &Indices<'_>,
) -> Result<stmt::Value> {
let mut nested = vec![];
for nested_child in &level.nested {
let Input::Value(nested_input) = &inputs[nested_child.level.source] else {
todo!("input={:#?}", inputs[nested_child.level.source])
};
let mut nested_rows_projected = vec![];
let mut process = |nested_row: &stmt::Value| -> Result<()> {
let nested_stack = RowStack {
parent: Some(row_stack),
row: nested_row,
position: row_stack.position + 1,
};
nested_rows_projected.push(self.merge_nested_row(
&nested_stack,
&nested_child.level,
inputs,
indices,
)?);
Ok(())
};
match &nested_child.qualification {
MergeQualification::All => {
for row in nested_input {
process(row)?;
}
}
MergeQualification::HashLookup { index, lookup_key } => {
let key_val = lookup_key.eval(row_stack)?;
if let Some(row) = indices.hash[*index].find(key_as_slice(&key_val)) {
process(row)?;
}
}
MergeQualification::SortLookup { index, lookup_key } => {
let key_val = lookup_key.eval(row_stack)?;
let key = key_as_slice(&key_val);
for row in indices.sort[*index].find_range(
std::ops::Bound::Included(key),
std::ops::Bound::Included(key),
) {
process(row)?;
}
}
MergeQualification::Scan(func) => {
for row in nested_input {
let stack = RowStack {
parent: Some(row_stack),
row,
position: row_stack.position + 1,
};
if func.eval_bool(&stack)? {
process(row)?;
}
}
}
}
nested.push(if nested_child.single {
assert!(nested_rows_projected.len() <= 1, "TODO: error handling");
if let Some(row) = nested_rows_projected.into_iter().next() {
row
} else {
stmt::Value::Null
}
} else {
stmt::Value::List(nested_rows_projected)
});
}
let eval_input = RowAndNested {
row: row_stack.row,
nested: &nested[..],
};
level.projection.eval(&eval_input)
}
}
fn key_as_slice(val: &stmt::Value) -> &[stmt::Value] {
match val {
stmt::Value::Record(r) => r.as_slice(),
v => std::slice::from_ref(v),
}
}
impl stmt::Input for &RowStack<'_> {
fn resolve_arg(
&mut self,
expr_arg: &stmt::ExprArg,
projection: &stmt::Projection,
) -> Option<stmt::Expr> {
let mut current: &RowStack<'_> = self;
loop {
if current.position == expr_arg.position {
break;
}
let Some(parent) = current.parent else {
todo!()
};
current = parent;
}
Some(current.row.entry(projection).to_expr())
}
}
impl stmt::Input for &RowAndNested<'_> {
fn resolve_arg(
&mut self,
expr_arg: &stmt::ExprArg,
projection: &stmt::Projection,
) -> Option<stmt::Expr> {
let base = if expr_arg.position == 0 {
self.row
} else {
&self.nested[expr_arg.position - 1]
};
Some(base.entry(projection).to_expr())
}
}
impl From<NestedMerge> for Action {
fn from(src: NestedMerge) -> Self {
Self::NestedMerge(src)
}
}