use alloc::borrow::Cow;
use alloc::string::{String, ToString};
use alloc::vec::Vec;
use spg_sql::ast::{Expr, FromClause, JoinKind, SelectItem, SelectStatement, TableRef};
use spg_storage::{ColumnSchema, DataType, Row, Table, Value};
use crate::eval::EvalContext;
use crate::{
ByteBudget, CancelToken, Engine, EngineError, QueryResult, aggregate, apply_offset_and_limit,
approx_row_bytes, approx_rows_bytes, approx_value_bytes, build_order_keys, build_projection,
collect_column_qualifiers, collect_qualified_refs, eval, expr_has_subquery, memoize, reorder,
value_cmp, value_to_literal_expr,
};
pub(crate) struct JoinedPeer<'a> {
pub(crate) eager_rows: Option<Vec<Row>>,
pub(crate) cols: Vec<ColumnSchema>,
pub(crate) alias: String,
pub(crate) kind: JoinKind,
pub(crate) on: Option<&'a Expr>,
pub(crate) lateral: Option<&'a SelectStatement>,
pub(crate) join_table: Option<String>,
pub(crate) where_preds: Vec<Expr>,
}
pub(crate) enum JoinSrc<'a> {
Owned(Vec<Row>),
Eager(&'a [Row]),
Stored(&'a spg_storage::persistent::PersistentVec<Row>),
}
impl JoinSrc<'_> {
pub(crate) fn get(&self, i: usize) -> Option<&Row> {
match self {
Self::Owned(v) => v.get(i),
Self::Eager(s) => s.get(i),
Self::Stored(p) => p.get(i),
}
}
pub(crate) fn len(&self) -> usize {
match self {
Self::Owned(v) => v.len(),
Self::Eager(s) => s.len(),
Self::Stored(p) => p.len(),
}
}
}
pub(crate) fn tuple_value<'s>(
sources: &'s [JoinSrc<'_>],
offsets: &[usize],
tuple: &[usize],
pos: usize,
) -> Option<&'s Value> {
let k = offsets.partition_point(|&o| o <= pos).checked_sub(1)?;
let ri = *tuple.get(k)?;
if ri == usize::MAX {
return None;
}
sources.get(k)?.get(ri)?.values.get(pos - offsets[k])
}
pub(crate) enum RowRef<'a> {
Owned(&'a Row),
Tuple {
sources: &'a [JoinSrc<'a>],
offsets: &'a [usize],
tuple: &'a [usize],
},
}
impl RowRef<'_> {
#[inline]
pub(crate) fn get(&self, pos: usize) -> Option<&Value> {
match self {
RowRef::Owned(r) => r.values.get(pos),
RowRef::Tuple {
sources,
offsets,
tuple,
} => tuple_value(sources, offsets, tuple, pos),
}
}
pub(crate) fn as_row(&self) -> Cow<'_, Row> {
match self {
RowRef::Owned(r) => Cow::Borrowed(r),
RowRef::Tuple {
sources,
offsets,
tuple,
} => {
let width = offsets.last().copied().unwrap_or(0);
let mut vals: Vec<Value> = Vec::with_capacity(width);
for pos in 0..width {
vals.push(
tuple_value(sources, offsets, tuple, pos)
.cloned()
.unwrap_or(Value::Null),
);
}
Cow::Owned(Row::new(vals))
}
}
}
}
pub(crate) fn extend_masked(vals: &mut Vec<Value>, row: &Row, mask: Option<&[bool]>) {
match mask {
Some(keep) => {
for (i, v) in row.values.iter().enumerate() {
if keep.get(i).copied().unwrap_or(false) {
vals.push(v.clone());
} else {
vals.push(Value::Null);
}
}
}
None => vals.extend(row.values.iter().cloned()),
}
}
pub(crate) fn materialise_tuple_vals(
sources: &[JoinSrc<'_>],
widths: &[usize],
masks: &[Option<Vec<bool>>],
tuple: &[usize],
cap: usize,
) -> Vec<Value> {
let mut vals: Vec<Value> = Vec::with_capacity(cap);
for (k, &ri) in tuple.iter().enumerate() {
let row = if ri == usize::MAX {
None
} else {
sources[k].get(ri)
};
match row {
Some(r) => extend_masked(&mut vals, r, masks[k].as_deref()),
None => {
for _ in 0..widths[k] {
vals.push(Value::Null);
}
}
}
}
vals
}
pub(crate) struct DeferredJoin<'a> {
pub(crate) sources: Vec<JoinSrc<'a>>,
pub(crate) offsets: Vec<usize>,
pub(crate) widths: Vec<usize>,
pub(crate) masks: Vec<Option<Vec<bool>>>,
pub(crate) survivors: Vec<usize>,
pub(crate) stride: usize,
pub(crate) combined_schema: Vec<ColumnSchema>,
}
impl DeferredJoin<'_> {
pub(crate) fn len(&self) -> usize {
if self.stride == 0 {
0
} else {
self.survivors.len() / self.stride
}
}
pub(crate) fn row_refs(&self) -> Vec<RowRef<'_>> {
if self.stride == 0 {
return Vec::new();
}
self.survivors
.chunks(self.stride)
.map(|tuple| RowRef::Tuple {
sources: &self.sources,
offsets: &self.offsets,
tuple,
})
.collect()
}
pub(crate) fn materialise(&self) -> Vec<Row> {
if self.stride == 0 {
return Vec::new();
}
let cap = self.offsets.last().copied().unwrap_or(0);
self.survivors
.chunks(self.stride)
.map(|tuple| {
Row::new(materialise_tuple_vals(
&self.sources,
&self.widths,
&self.masks,
tuple,
cap,
))
})
.collect()
}
}
pub(crate) fn approx_tuple_bytes(
sources: &[JoinSrc<'_>],
offsets: &[usize],
masks: &[Option<Vec<bool>>],
tuple: &[usize],
) -> usize {
let width = offsets.last().copied().unwrap_or(0);
let mut bytes = width * core::mem::size_of::<Value>();
for (k, &ri) in tuple.iter().enumerate() {
if ri == usize::MAX {
continue;
}
let Some(row) = sources.get(k).and_then(|s| s.get(ri)) else {
continue;
};
let mask = masks.get(k).and_then(|m| m.as_deref());
for (i, v) in row.values.iter().enumerate() {
let kept = mask.map_or(true, |m| m.get(i).copied().unwrap_or(false));
if kept {
bytes += approx_value_bytes(v);
}
}
}
bytes
}
struct TopNEntry {
keys: Vec<f64>,
seq: u64,
row: Row,
}
impl TopNEntry {
fn cmp_keys(a: &[f64], b: &[f64]) -> core::cmp::Ordering {
for (ka, kb) in a.iter().zip(b.iter()) {
let ord = ka.partial_cmp(kb).unwrap_or(core::cmp::Ordering::Equal);
if ord != core::cmp::Ordering::Equal {
return ord;
}
}
core::cmp::Ordering::Equal
}
}
impl PartialEq for TopNEntry {
fn eq(&self, other: &Self) -> bool {
self.cmp(other) == core::cmp::Ordering::Equal
}
}
impl Eq for TopNEntry {}
impl PartialOrd for TopNEntry {
fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for TopNEntry {
fn cmp(&self, other: &Self) -> core::cmp::Ordering {
Self::cmp_keys(&self.keys, &other.keys).then(self.seq.cmp(&other.seq))
}
}
const MAX_JOIN_INTERMEDIATE_ROWS: usize = 4_000_000;
struct JoinPipeline<'a> {
sources: Vec<JoinSrc<'a>>,
masks: Vec<Option<Vec<bool>>>,
widths: Vec<usize>,
offsets: Vec<usize>,
working: Vec<usize>,
stride: usize,
consumed_cols: usize,
}
impl<'a> JoinPipeline<'a> {
fn new(
primary: JoinSrc<'a>,
mask: Option<Vec<bool>>,
width: usize,
working: Vec<usize>,
) -> Self {
Self {
sources: alloc::vec![primary],
masks: alloc::vec![mask],
widths: alloc::vec![width],
offsets: alloc::vec![0, width],
working,
stride: 1,
consumed_cols: width,
}
}
fn rows(&self) -> usize {
self.working.len() / self.stride
}
fn advance(
&mut self,
next: Vec<usize>,
source: JoinSrc<'a>,
mask: Option<Vec<bool>>,
right_arity: usize,
) {
self.working = next;
self.stride += 1;
self.sources.push(source);
self.masks.push(mask);
self.consumed_cols += right_arity;
self.offsets.push(self.consumed_cols);
self.widths.push(right_arity);
}
}
fn keep_mask(
needed: Option<&alloc::collections::BTreeSet<(String, String)>>,
cols: &[ColumnSchema],
alias: &str,
) -> Option<Vec<bool>> {
let needed = needed?;
let keep: Vec<bool> = cols
.iter()
.map(|c| needed.contains(&(alias.to_string(), c.name.clone())))
.collect();
if keep.iter().all(|k| *k) {
None
} else {
Some(keep)
}
}
fn extract_join_keys<'a>(
peer: &JoinedPeer<'a>,
combined_schema: &[ColumnSchema],
consumed_cols: usize,
) -> (Vec<(usize, usize)>, Vec<&'a Expr>) {
let mut eq_pairs: Vec<(usize, usize)> = Vec::new();
let mut residual: Vec<&Expr> = Vec::new();
if let (Some(on_expr), None) = (peer.on, peer.lateral) {
for sub in reorder::split_and_conjunctions(on_expr) {
let mut matched = None;
if let Expr::Binary {
lhs,
op: spg_sql::ast::BinOp::Eq,
rhs,
} = sub
&& let (Expr::Column(a), Expr::Column(b)) = (lhs.as_ref(), rhs.as_ref())
{
let left_slice = &combined_schema[..consumed_cols];
if let (Some(l), Some(r)) = (
Engine::composite_col_pos(left_slice, a),
Engine::peer_col_pos(&peer.alias, &peer.cols, b),
) {
matched = Some((l, r));
} else if let (Some(l), Some(r)) = (
Engine::composite_col_pos(left_slice, b),
Engine::peer_col_pos(&peer.alias, &peer.cols, a),
) {
matched = Some((l, r));
}
}
match matched {
Some(pair) => eq_pairs.push(pair),
None => residual.push(sub),
}
}
}
(eq_pairs, residual)
}
impl Engine {
#[allow(clippy::type_complexity)]
fn build_join_peers<'a>(
&self,
from: &'a FromClause,
peer_preds: &[Vec<&Expr>],
needed: Option<&alloc::collections::BTreeSet<(String, String)>>,
budget: &mut ByteBudget,
) -> Result<Vec<JoinedPeer<'a>>, EngineError> {
let mut joined: Vec<JoinedPeer<'a>> = Vec::new();
for j in &from.joins {
let a = j
.table
.alias
.as_deref()
.unwrap_or(j.table.name.as_str())
.to_string();
if let Some(inner_box) = &j.table.lateral_subquery {
let schema = self.lateral_probe_schema(inner_box)?;
joined.push(JoinedPeer {
eager_rows: None,
cols: schema,
alias: a,
kind: j.kind,
on: j.on.as_ref(),
lateral: Some(inner_box.as_ref()),
join_table: None,
where_preds: Vec::new(),
});
} else {
let pidx = from
.joins
.iter()
.position(|jj| core::ptr::eq(jj, j))
.unwrap_or(0);
let plain = j.table.unnest_expr.is_none() && j.table.as_of_segment.is_none();
if plain && let Some(t) = self.active_catalog().get(&j.table.name) {
const SMALL_PEER_EAGER_ROWS: usize = 256;
let has_pushdown = !peer_preds[pidx].is_empty();
let peer_total = t
.rows()
.len()
.saturating_add(t.count_cold_locators() as usize);
if has_pushdown && peer_total <= SMALL_PEER_EAGER_ROWS {
let (mut rows, cols) =
self.materialise_table_ref_filtered(&j.table, &peer_preds[pidx])?;
if let Some(needed) = needed {
Self::null_out_unreferenced(&mut rows, &cols, &a, needed);
}
budget.charge(approx_rows_bytes(&rows))?;
joined.push(JoinedPeer {
eager_rows: Some(rows),
cols,
alias: a,
kind: j.kind,
on: j.on.as_ref(),
lateral: None,
join_table: Some(j.table.name.clone()),
where_preds: Vec::new(),
});
continue;
}
joined.push(JoinedPeer {
eager_rows: None,
cols: t.schema().columns.clone(),
alias: a,
kind: j.kind,
on: j.on.as_ref(),
lateral: None,
join_table: Some(j.table.name.clone()),
where_preds: peer_preds[pidx].iter().map(|e| (*e).clone()).collect(),
});
continue;
}
let (mut rows, cols) =
self.materialise_table_ref_filtered(&j.table, &peer_preds[pidx])?;
if let Some(needed) = needed {
Self::null_out_unreferenced(&mut rows, &cols, &a, needed);
}
budget.charge(approx_rows_bytes(&rows))?;
joined.push(JoinedPeer {
eager_rows: Some(rows),
cols,
alias: a,
kind: j.kind,
on: j.on.as_ref(),
lateral: None,
join_table: Some(j.table.name.clone()),
where_preds: Vec::new(),
});
}
}
Ok(joined)
}
pub(crate) fn build_joined_filtered_rows(
&self,
from: &FromClause,
where_: Option<&Expr>,
cancel: CancelToken<'_>,
needed: Option<&alloc::collections::BTreeSet<(String, String)>>,
budget: &mut ByteBudget,
) -> Result<DeferredJoin<'_>, EngineError> {
let (swapped_from, primary_preds, peer_preds) = analyze_join_pushdown(from, where_);
let from = swapped_from.as_ref().unwrap_or(from);
let primary_alias = from
.primary
.alias
.as_deref()
.unwrap_or(from.primary.name.as_str())
.to_string();
let primary_table: Option<&Table> = if !from.joins.is_empty()
&& from.primary.unnest_expr.is_none()
&& from.primary.lateral_subquery.is_none()
&& from.primary.as_of_segment.is_none()
{
self.active_catalog().get(&from.primary.name)
} else {
None
};
let (primary_rows, primary_cols, primary_indices) = match primary_table {
Some(t) => {
let idxs = self.filter_table_indices(t, &primary_alias, &primary_preds)?;
(Vec::new(), t.schema().columns.clone(), Some(idxs))
}
None => {
let (mut rows, cols) =
self.materialise_table_ref_filtered(&from.primary, &primary_preds)?;
if let Some(needed) = needed {
Self::null_out_unreferenced(&mut rows, &cols, &primary_alias, needed);
}
budget.charge(approx_rows_bytes(&rows))?;
(rows, cols, None)
}
};
let mut joined = self.build_join_peers(from, &peer_preds, needed, budget)?;
let combined_schema = build_combined_schema(&primary_alias, &primary_cols, &joined);
let ctx = EvalContext::new(&combined_schema, None);
if joined.is_empty() {
let mut filtered: Vec<Row> = Vec::new();
let mut memo = memoize::MemoizeCache::default();
for row in primary_rows {
if let Some(where_expr) = where_ {
let cond = self.eval_expr_with_correlated(
where_expr,
&row,
&ctx,
cancel,
Some(&mut memo),
)?;
if !matches!(cond, Value::Bool(true)) {
continue;
}
}
filtered.push(row);
}
let width = combined_schema.len();
let n = filtered.len();
return Ok(DeferredJoin {
sources: alloc::vec![JoinSrc::Owned(filtered)],
offsets: alloc::vec![0, width],
widths: alloc::vec![width],
masks: alloc::vec![None],
survivors: (0..n).collect(),
stride: 1,
combined_schema,
});
}
let primary_width = primary_cols.len();
#[allow(clippy::type_complexity)]
let (primary_source, primary_mask, working): (
JoinSrc<'_>,
Option<Vec<bool>>,
Vec<usize>,
) = match primary_indices {
Some(idxs) => {
let t = primary_table.expect("stored primary");
(
JoinSrc::Stored(t.rows()),
keep_mask(needed, &primary_cols, &primary_alias),
idxs,
)
}
None => {
let n = primary_rows.len();
(JoinSrc::Owned(primary_rows), None, (0..n).collect())
}
};
let mut pipe = JoinPipeline::new(primary_source, primary_mask, primary_width, working);
for peer in &mut joined {
if pipe.rows() > MAX_JOIN_INTERMEDIATE_ROWS {
return Err(EngineError::Unsupported(alloc::format!(
"join intermediate result exceeds {MAX_JOIN_INTERMEDIATE_ROWS} rows ({} so far) - add join predicates",
pipe.rows()
)));
}
let right_arity = peer.cols.len();
let peer_mask = keep_mask(needed, &peer.cols, &peer.alias);
let (eq_pairs, residual) =
extract_join_keys(peer, &combined_schema, pipe.consumed_cols);
let extra_preds = core::mem::take(&mut peer.where_preds);
let residual: Vec<&Expr> = residual.into_iter().chain(extra_preds.iter()).collect();
if self.join_stage_inl(
&mut pipe,
peer,
&eq_pairs,
&residual,
&peer_mask,
right_arity,
&ctx,
cancel,
)? {
continue;
}
if !eq_pairs.is_empty() && peer.lateral.is_none() {
self.join_stage_hash(
&mut pipe,
peer,
&eq_pairs,
&residual,
&peer_mask,
right_arity,
&combined_schema,
&ctx,
cancel,
)?;
continue;
}
self.join_stage_nested(
&mut pipe,
peer,
right_arity,
&combined_schema,
&ctx,
cancel,
needed,
budget,
)?;
}
let survivors = self.filter_join_survivors(&pipe, where_, &ctx, cancel, budget)?;
Ok(DeferredJoin {
sources: pipe.sources,
offsets: pipe.offsets,
widths: pipe.widths,
masks: pipe.masks,
survivors,
stride: pipe.stride,
combined_schema,
})
}
#[allow(clippy::too_many_arguments)]
fn join_stage_inl<'a, 'p>(
&'a self,
pipe: &mut JoinPipeline<'a>,
peer: &JoinedPeer<'p>,
eq_pairs: &[(usize, usize)],
residual: &[&Expr],
peer_mask: &Option<Vec<bool>>,
right_arity: usize,
ctx: &EvalContext,
cancel: CancelToken<'_>,
) -> Result<bool, EngineError> {
const INL_MAX_LEFT: usize = 1024;
let Some(tname) = &peer.join_table else {
return Ok(false);
};
if !(peer.eager_rows.is_none() && !eq_pairs.is_empty() && pipe.rows() <= INL_MAX_LEFT) {
return Ok(false);
}
let Some(table) = self.active_catalog().get(tname) else {
return Ok(false);
};
let Some(idx) = peer
.cols
.iter()
.position(|c| c.name == peer.cols[eq_pairs[0].1].name)
.and_then(|pos| table.index_on(pos))
else {
return Ok(false);
};
let stored = table.rows();
let (lpos0, _) = eq_pairs[0];
let mut next: Vec<usize> = Vec::new();
for tuple in pipe.working.chunks(pipe.stride) {
cancel.check()?;
let mut left_matched = false;
if let Some(kv) = tuple_value(&pipe.sources, &pipe.offsets, tuple, lpos0)
&& !matches!(kv, Value::Null)
&& let Some(key) = spg_storage::IndexKey::from_value(kv)
{
for loc in idx.lookup_eq(&key) {
let ri = match *loc {
spg_storage::RowLocator::Hot(i) => i,
spg_storage::RowLocator::Cold { .. } => continue,
};
let right = match stored.get(ri) {
Some(r) => r,
None => continue,
};
let mut ok = true;
for (lp, rp) in eq_pairs.iter().skip(1) {
let lv = tuple_value(&pipe.sources, &pipe.offsets, tuple, *lp);
let rv = right.values.get(*rp);
let eq = match (lv, rv) {
(Some(a), Some(b)) => {
!matches!(a, Value::Null)
&& !matches!(b, Value::Null)
&& value_cmp(a, b) == core::cmp::Ordering::Equal
}
_ => false,
};
if !eq {
ok = false;
break;
}
}
if !ok {
continue;
}
let keep = if residual.is_empty() {
true
} else {
let mut combined_vals = materialise_tuple_vals(
&pipe.sources,
&pipe.widths,
&pipe.masks,
tuple,
pipe.consumed_cols + right_arity,
);
extend_masked(&mut combined_vals, right, peer_mask.as_deref());
let combined = Row::new(combined_vals);
let mut k = true;
for r in residual {
let cond =
self.eval_expr_with_correlated(r, &combined, ctx, cancel, None)?;
if !matches!(cond, Value::Bool(true)) {
k = false;
break;
}
}
k
};
if keep {
next.extend_from_slice(tuple);
next.push(ri);
left_matched = true;
}
}
}
if !left_matched && matches!(peer.kind, JoinKind::Left) {
next.extend_from_slice(tuple);
next.push(usize::MAX);
}
}
pipe.advance(
next,
JoinSrc::Stored(stored),
peer_mask.clone(),
right_arity,
);
Ok(true)
}
#[allow(clippy::too_many_arguments)]
fn join_stage_hash<'a, 'p>(
&'a self,
pipe: &mut JoinPipeline<'a>,
peer: &mut JoinedPeer<'p>,
eq_pairs: &[(usize, usize)],
residual: &[&Expr],
peer_mask: &Option<Vec<bool>>,
right_arity: usize,
combined_schema: &[ColumnSchema],
ctx: &EvalContext,
cancel: CancelToken<'_>,
) -> Result<(), EngineError> {
let rights_src: JoinSrc<'a> = match peer.eager_rows.take() {
Some(rows) => JoinSrc::Owned(rows),
None => match peer
.join_table
.as_deref()
.and_then(|n| self.active_catalog().get(n))
{
Some(t) => JoinSrc::Stored(t.rows()),
None => JoinSrc::Owned(Vec::new()),
},
};
let n_rights = rights_src.len();
let mut table: hashbrown::HashMap<String, Vec<usize>> =
hashbrown::HashMap::with_capacity(n_rights);
let mut keybuf: Vec<&Value> = Vec::with_capacity(eq_pairs.len());
let mut keystr = String::new();
'build: for ri in 0..n_rights {
let Some(right) = rights_src.get(ri) else {
continue;
};
keybuf.clear();
for (_, rpos) in eq_pairs {
match right.values.get(*rpos) {
Some(v) if !matches!(v, Value::Null) => keybuf.push(v),
_ => continue 'build,
}
}
aggregate::encode_key_refs_into(&keybuf, &mut keystr);
table.entry_ref(keystr.as_str()).or_default().push(ri);
}
let mut next: Vec<usize> = Vec::new();
let mut probebuf: Vec<&Value> = Vec::with_capacity(eq_pairs.len());
for tuple in pipe.working.chunks(pipe.stride) {
cancel.check()?;
let mut left_matched = false;
probebuf.clear();
let mut left_has_null = false;
for (lpos, _) in eq_pairs {
match tuple_value(&pipe.sources, &pipe.offsets, tuple, *lpos) {
Some(v) if !matches!(v, Value::Null) => probebuf.push(v),
_ => {
left_has_null = true;
break;
}
}
}
if !left_has_null {
aggregate::encode_key_refs_into(&probebuf, &mut keystr);
}
if !left_has_null && let Some(cands) = table.get(keystr.as_str()) {
for &ri in cands {
let keep = if residual.is_empty() {
true
} else {
let right = rights_src.get(ri).expect("hash candidate row");
let mut combined_vals = materialise_tuple_vals(
&pipe.sources,
&pipe.widths,
&pipe.masks,
tuple,
pipe.consumed_cols + right_arity,
);
extend_masked(&mut combined_vals, right, peer_mask.as_deref());
let combined = Row::new(combined_vals);
let mut ok = true;
for r in residual {
let cond =
self.eval_expr_with_correlated(r, &combined, ctx, cancel, None)?;
if !matches!(cond, Value::Bool(true)) {
ok = false;
break;
}
}
ok
};
if keep {
next.extend_from_slice(tuple);
next.push(ri);
left_matched = true;
}
}
}
if !left_matched && matches!(peer.kind, JoinKind::Left) {
next.extend_from_slice(tuple);
next.push(usize::MAX);
}
}
pipe.advance(next, rights_src, peer_mask.clone(), right_arity);
debug_assert!(pipe.consumed_cols <= combined_schema.len());
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn join_stage_nested<'a, 'p>(
&'a self,
pipe: &mut JoinPipeline<'a>,
peer: &mut JoinedPeer<'p>,
right_arity: usize,
combined_schema: &[ColumnSchema],
ctx: &EvalContext,
cancel: CancelToken<'_>,
needed: Option<&alloc::collections::BTreeSet<(String, String)>>,
budget: &mut ByteBudget,
) -> Result<(), EngineError> {
let lazy_rows: Option<Vec<Row>> = if peer.eager_rows.is_none() && peer.lateral.is_none() {
let tname = peer.join_table.as_deref().unwrap_or("");
let mut rows: Vec<Row> = self
.active_catalog()
.get(tname)
.map(|t| t.rows().iter().cloned().collect())
.unwrap_or_default();
if let Some(needed) = needed {
Self::null_out_unreferenced(&mut rows, &peer.cols, &peer.alias, needed);
}
budget.charge(approx_rows_bytes(&rows))?;
Some(rows)
} else {
None
};
let mut arena: Vec<Row> = Vec::new();
let rights_eager: Option<&[Row]> = peer.eager_rows.as_deref().or(lazy_rows.as_deref());
let mut next: Vec<usize> = Vec::new();
for tuple in pipe.working.chunks(pipe.stride) {
cancel.check()?;
let mut left_matched = false;
let left_vals = materialise_tuple_vals(
&pipe.sources,
&pipe.widths,
&pipe.masks,
tuple,
pipe.consumed_cols,
);
let per_left_rrows: Cow<'_, [Row]> = match peer.lateral {
Some(inner) => {
let outer_schema = &combined_schema[..pipe.consumed_cols];
let left_row = Row::new(left_vals.clone());
let rows =
self.materialise_lateral_for_outer(inner, outer_schema, &left_row)?;
Cow::Owned(rows)
}
None => Cow::Borrowed(rights_eager.expect("non-lateral peer eager")),
};
for (ri, right) in per_left_rrows.as_ref().iter().enumerate() {
let mut combined_vals = left_vals.clone();
combined_vals.extend(right.values.iter().cloned());
let combined = Row::new(combined_vals);
let keep = if let Some(on_expr) = peer.on {
let cond =
self.eval_expr_with_correlated(on_expr, &combined, ctx, cancel, None)?;
matches!(cond, Value::Bool(true))
} else {
true
};
if keep {
next.extend_from_slice(tuple);
if peer.lateral.is_some() {
let mut cv = combined.values;
let rv = cv.split_off(left_vals.len());
arena.push(Row::new(rv));
next.push(arena.len() - 1);
} else {
next.push(ri);
}
left_matched = true;
}
}
if !left_matched && matches!(peer.kind, JoinKind::Left) {
next.extend_from_slice(tuple);
next.push(usize::MAX);
}
}
if next.len() / (pipe.stride + 1) > MAX_JOIN_INTERMEDIATE_ROWS {
return Err(EngineError::Unsupported(alloc::format!(
"join intermediate result exceeds {MAX_JOIN_INTERMEDIATE_ROWS} rows ({} so far) - add join predicates",
next.len() / (pipe.stride + 1)
)));
}
let source = if peer.lateral.is_some() {
JoinSrc::Owned(arena)
} else if let Some(lz) = lazy_rows {
JoinSrc::Owned(lz)
} else {
JoinSrc::Owned(peer.eager_rows.take().expect("non-lateral peer eager"))
};
pipe.advance(next, source, None, right_arity);
debug_assert!(pipe.consumed_cols <= combined_schema.len());
Ok(())
}
fn filter_join_survivors(
&self,
pipe: &JoinPipeline<'_>,
where_: Option<&Expr>,
ctx: &EvalContext,
cancel: CancelToken<'_>,
budget: &mut ByteBudget,
) -> Result<Vec<usize>, EngineError> {
let mut memo = memoize::MemoizeCache::default();
let compiled_where: Option<eval::CompiledExpr> = where_
.filter(|w| eval::fully_compilable(w))
.map(|w| eval::compile_expr(w, ctx));
let mut eval_stack: Vec<Value> = Vec::new();
let mut survivors: Vec<usize> = Vec::new();
for tuple in pipe.working.chunks(pipe.stride) {
let rr = RowRef::Tuple {
sources: &pipe.sources,
offsets: &pipe.offsets,
tuple,
};
let pass = if let Some(cw) = &compiled_where {
matches!(
eval::eval_compiled_ref(cw, &rr, ctx, &mut eval_stack)
.map_err(EngineError::Eval)?,
Value::Bool(true)
)
} else if let Some(where_expr) = where_ {
let row = rr.as_row();
matches!(
self.eval_expr_with_correlated(where_expr, &row, ctx, cancel, Some(&mut memo))?,
Value::Bool(true)
)
} else {
true
};
if !pass {
continue;
}
budget.charge(approx_tuple_bytes(
&pipe.sources,
&pipe.offsets,
&pipe.masks,
tuple,
))?;
survivors.extend_from_slice(tuple);
}
Ok(survivors)
}
fn lateral_probe_schema(
&self,
inner: &SelectStatement,
) -> Result<Vec<ColumnSchema>, EngineError> {
match self.execute_readonly_select_for_lateral_probe(inner) {
Ok(QueryResult::Rows { columns, .. }) => Ok(columns),
_ => {
let mut out: Vec<ColumnSchema> = Vec::new();
for (i, item) in inner.items.iter().enumerate() {
let name = match item {
SelectItem::Expr { alias: Some(a), .. } => a.clone(),
SelectItem::Expr { expr, .. } => synth_lateral_col_name(expr, i),
SelectItem::Wildcard => alloc::format!("col{i}"),
};
out.push(ColumnSchema::new(name, DataType::Text, true));
}
Ok(out)
}
}
}
fn execute_readonly_select_for_lateral_probe(
&self,
inner: &SelectStatement,
) -> Result<QueryResult, EngineError> {
self.exec_bare_select_cancel(inner, CancelToken::none())
}
fn materialise_lateral_for_outer(
&self,
inner: &SelectStatement,
outer_schema: &[ColumnSchema],
outer_row: &Row,
) -> Result<Vec<Row>, EngineError> {
let mut substituted = inner.clone();
substitute_outer_columns_multi(&mut substituted, outer_row, outer_schema);
let result = self.exec_bare_select_cancel(&substituted, CancelToken::none())?;
match result {
QueryResult::Rows { rows, .. } => Ok(rows),
_ => Err(EngineError::Unsupported(
"LATERAL subquery must be a SELECT (cannot be a write statement)".into(),
)),
}
}
pub(crate) fn try_streamed_inner_join_topn(
&self,
stmt: &SelectStatement,
from: &FromClause,
cancel: CancelToken<'_>,
) -> Result<Option<QueryResult>, EngineError> {
let Some(limit) = stmt.limit_literal() else {
return Ok(None);
};
if stmt.offset.is_some() && stmt.offset_literal().is_none() {
return Ok(None);
}
if stmt.distinct
|| stmt.group_by.is_some()
|| stmt.having.is_some()
|| aggregate::uses_aggregate(stmt)
{
return Ok(None);
}
if from.joins.len() != 1 {
return Ok(None);
}
let j = &from.joins[0];
if !matches!(j.kind, JoinKind::Inner) {
return Ok(None);
}
let plain = |t: &TableRef| {
t.unnest_expr.is_none() && t.lateral_subquery.is_none() && t.as_of_segment.is_none()
};
if !plain(&from.primary) || !plain(&j.table) {
return Ok(None);
}
let Some(on_expr) = j.on.as_ref() else {
return Ok(None);
};
let Some(primary_table) = self.active_catalog().get(&from.primary.name) else {
return Ok(None);
};
if self.active_catalog().get(&j.table.name).is_none() {
return Ok(None);
}
let primary_alias = from
.primary
.alias
.as_deref()
.unwrap_or(from.primary.name.as_str())
.to_string();
let peer_alias = j
.table
.alias
.as_deref()
.unwrap_or(j.table.name.as_str())
.to_string();
let mut needed = alloc::collections::BTreeSet::new();
let prunable = collect_qualified_refs(stmt, &mut needed).is_some();
let mut budget = ByteBudget::new(self.max_query_bytes);
let (mut peer_rows, peer_cols) = self.materialise_table_ref_filtered(&j.table, &[])?;
if prunable {
Self::null_out_unreferenced(&mut peer_rows, &peer_cols, &peer_alias, &needed);
}
budget.charge(approx_rows_bytes(&peer_rows))?;
let primary_cols = primary_table.schema().columns.clone();
let mut combined_schema: Vec<ColumnSchema> = Vec::new();
for col in &primary_cols {
combined_schema.push(ColumnSchema::new(
alloc::format!("{primary_alias}.{}", col.name),
col.ty,
col.nullable,
));
}
for col in &peer_cols {
combined_schema.push(ColumnSchema::new(
alloc::format!("{peer_alias}.{}", col.name),
col.ty,
col.nullable,
));
}
let ctx = EvalContext::new(&combined_schema, None);
let left_arity = primary_cols.len();
let mut eq_pairs: Vec<(usize, usize)> = Vec::new();
let mut residual: Vec<&Expr> = Vec::new();
for sub in reorder::split_and_conjunctions(on_expr) {
let mut matched = None;
if let Expr::Binary {
lhs,
op: spg_sql::ast::BinOp::Eq,
rhs,
} = sub
&& let (Expr::Column(a), Expr::Column(b)) = (lhs.as_ref(), rhs.as_ref())
{
let left_slice = &combined_schema[..left_arity];
if let (Some(l), Some(r)) = (
Self::composite_col_pos(left_slice, a),
Self::peer_col_pos(&peer_alias, &peer_cols, b),
) {
matched = Some((l, r));
} else if let (Some(l), Some(r)) = (
Self::composite_col_pos(left_slice, b),
Self::peer_col_pos(&peer_alias, &peer_cols, a),
) {
matched = Some((l, r));
}
}
match matched {
Some(pair) => eq_pairs.push(pair),
None => residual.push(sub),
}
}
if eq_pairs.is_empty() {
return Ok(None); }
let mut htable: hashbrown::HashMap<String, Vec<usize>> =
hashbrown::HashMap::with_capacity(peer_rows.len());
let mut keybuf: Vec<Value> = Vec::with_capacity(eq_pairs.len());
'build: for (ri, right) in peer_rows.iter().enumerate() {
keybuf.clear();
for (_, rpos) in &eq_pairs {
let v = right.values.get(*rpos).cloned().unwrap_or(Value::Null);
if matches!(v, Value::Null) {
continue 'build;
}
keybuf.push(v);
}
htable
.entry(aggregate::encode_key(&keybuf))
.or_default()
.push(ri);
}
let keep_mask: Vec<bool> = primary_cols
.iter()
.map(|c| !prunable || needed.contains(&(primary_alias.clone(), c.name.clone())))
.collect();
let keep = (limit as usize).saturating_add(stmt.offset_literal().map_or(0, |o| o as usize));
let descs: Vec<bool> = stmt.order_by.iter().map(|o| o.desc).collect();
let mut where_memo = memoize::MemoizeCache::default();
let mut heap: alloc::collections::BinaryHeap<TopNEntry> =
alloc::collections::BinaryHeap::new();
let mut plain_sink: Vec<Row> = Vec::new();
let mut seq: u64 = 0;
'scan: for left in primary_table.rows().iter() {
cancel.check()?;
if keep == 0 {
break 'scan;
}
keybuf.clear();
let mut left_has_null = false;
for (lpos, _) in &eq_pairs {
let v = left.values.get(*lpos).cloned().unwrap_or(Value::Null);
if matches!(v, Value::Null) {
left_has_null = true;
break;
}
keybuf.push(v);
}
if left_has_null {
continue;
}
let Some(cands) = htable.get(&aggregate::encode_key(&keybuf)) else {
continue;
};
for &ri in cands {
let right = &peer_rows[ri];
let mut combined_vals: Vec<Value> =
Vec::with_capacity(left_arity + peer_cols.len());
for (i, v) in left.values.iter().enumerate() {
combined_vals.push(if keep_mask.get(i).copied().unwrap_or(true) {
v.clone()
} else {
Value::Null
});
}
combined_vals.extend(right.values.iter().cloned());
let combined = Row::new(combined_vals);
let mut ok = true;
for r in &residual {
let cond = self.eval_expr_with_correlated(r, &combined, &ctx, cancel, None)?;
if !matches!(cond, Value::Bool(true)) {
ok = false;
break;
}
}
if !ok {
continue;
}
if let Some(w) = stmt.where_.as_ref() {
let cond = self.eval_expr_with_correlated(
w,
&combined,
&ctx,
cancel,
Some(&mut where_memo),
)?;
if !matches!(cond, Value::Bool(true)) {
continue;
}
}
if stmt.order_by.is_empty() {
budget.charge(approx_row_bytes(&combined))?;
plain_sink.push(combined);
if plain_sink.len() >= keep {
break 'scan;
}
} else {
let raw = build_order_keys(&stmt.order_by, &combined, &ctx)?;
let keys: Vec<f64> = raw
.into_iter()
.enumerate()
.map(|(i, k)| {
if descs.get(i).copied().unwrap_or(false) {
-k
} else {
k
}
})
.collect();
let entry = TopNEntry {
keys,
seq,
row: combined,
};
seq += 1;
if heap.len() < keep {
budget.charge(approx_row_bytes(&entry.row))?;
heap.push(entry);
} else if let Some(top) = heap.peek()
&& entry < *top
{
if let Some(evicted) = heap.pop() {
budget.release(approx_row_bytes(&evicted.row));
}
budget.charge(approx_row_bytes(&entry.row))?;
heap.push(entry);
}
}
}
}
let mut output: Vec<Row> = if stmt.order_by.is_empty() {
plain_sink
} else {
heap.into_sorted_vec().into_iter().map(|e| e.row).collect()
};
apply_offset_and_limit(&mut output, stmt.offset_literal(), stmt.limit_literal());
let projection = build_projection(&stmt.items, &combined_schema, "")?;
let mut proj_memo = memoize::MemoizeCache::default();
let mut rows: Vec<Row> = Vec::with_capacity(output.len());
for row in &output {
let mut values = Vec::with_capacity(projection.len());
for p in &projection {
values.push(self.eval_expr_with_correlated(
&p.expr,
row,
&ctx,
cancel,
Some(&mut proj_memo),
)?);
}
rows.push(Row::new(values));
}
let columns: Vec<ColumnSchema> = projection
.into_iter()
.map(|p| ColumnSchema::new(p.output_name, p.ty, p.nullable))
.collect();
Ok(Some(QueryResult::Rows { columns, rows }))
}
}
pub(crate) fn synth_lateral_col_name(expr: &Expr, idx: usize) -> String {
match expr {
Expr::Column(c) => c.name.clone(),
Expr::FunctionCall { name, .. } => name.clone(),
Expr::Cast { expr: inner, .. } => synth_lateral_col_name(inner, idx),
_ => alloc::format!("column{}", idx + 1),
}
}
pub(crate) fn substitute_outer_columns_multi(
stmt: &mut SelectStatement,
outer_row: &Row,
outer_schema: &[ColumnSchema],
) {
substitute_outer_in_select(stmt, outer_row, outer_schema);
}
fn substitute_outer_in_select(
stmt: &mut SelectStatement,
outer_row: &Row,
outer_schema: &[ColumnSchema],
) {
for item in &mut stmt.items {
if let SelectItem::Expr { expr, .. } = item {
substitute_outer_in_expr(expr, outer_row, outer_schema);
}
}
if let Some(w) = &mut stmt.where_ {
substitute_outer_in_expr(w, outer_row, outer_schema);
}
if let Some(gs) = &mut stmt.group_by {
for g in gs {
substitute_outer_in_expr(g, outer_row, outer_schema);
}
}
if let Some(h) = &mut stmt.having {
substitute_outer_in_expr(h, outer_row, outer_schema);
}
for o in &mut stmt.order_by {
substitute_outer_in_expr(&mut o.expr, outer_row, outer_schema);
}
for (_, peer) in &mut stmt.unions {
substitute_outer_in_select(peer, outer_row, outer_schema);
}
}
fn substitute_outer_in_expr(e: &mut Expr, outer_row: &Row, outer_schema: &[ColumnSchema]) {
if let Expr::Column(c) = e
&& let Some(qual) = &c.qualifier
{
let composite = alloc::format!("{qual}.{}", c.name);
if let Some(idx) = outer_schema
.iter()
.position(|sc| sc.name.eq_ignore_ascii_case(&composite))
{
let v = outer_row.values.get(idx).cloned().unwrap_or(Value::Null);
if let Ok(lit) = value_to_literal_expr(v) {
*e = lit;
return;
}
}
}
match e {
Expr::Binary { lhs, rhs, .. } => {
substitute_outer_in_expr(lhs, outer_row, outer_schema);
substitute_outer_in_expr(rhs, outer_row, outer_schema);
}
Expr::Unary { expr: inner, .. } => {
substitute_outer_in_expr(inner, outer_row, outer_schema);
}
Expr::FunctionCall { args, .. } => {
for a in args {
substitute_outer_in_expr(a, outer_row, outer_schema);
}
}
Expr::Cast { expr: inner, .. } => {
substitute_outer_in_expr(inner, outer_row, outer_schema);
}
Expr::Case {
operand,
branches,
else_branch,
} => {
if let Some(op) = operand {
substitute_outer_in_expr(op, outer_row, outer_schema);
}
for (cond, val) in branches {
substitute_outer_in_expr(cond, outer_row, outer_schema);
substitute_outer_in_expr(val, outer_row, outer_schema);
}
if let Some(e) = else_branch {
substitute_outer_in_expr(e, outer_row, outer_schema);
}
}
_ => {}
}
}
fn analyze_join_pushdown<'w>(
from: &FromClause,
where_: Option<&'w Expr>,
) -> (Option<FromClause>, Vec<&'w Expr>, Vec<Vec<&'w Expr>>) {
let primary_alias = from
.primary
.alias
.as_deref()
.unwrap_or(from.primary.name.as_str());
let mut primary_preds: Vec<&Expr> = Vec::new();
let mut peer_preds: Vec<Vec<&Expr>> = alloc::vec![Vec::new(); from.joins.len()];
if let Some(w) = where_ {
for sub in reorder::split_and_conjunctions(w) {
if expr_has_subquery(sub) || aggregate::contains_aggregate(sub) {
continue;
}
let mut quals: Vec<&str> = Vec::new();
let mut all_qualified = true;
collect_column_qualifiers(sub, &mut quals, &mut all_qualified);
if !all_qualified || quals.is_empty() {
continue;
}
let q0 = quals[0];
if !quals.iter().all(|q| q.eq_ignore_ascii_case(q0)) {
continue;
}
if q0.eq_ignore_ascii_case(primary_alias) {
primary_preds.push(sub);
continue;
}
for (i, j) in from.joins.iter().enumerate() {
if matches!(j.kind, JoinKind::Inner)
&& j.table.lateral_subquery.is_none()
&& q0.eq_ignore_ascii_case(
j.table.alias.as_deref().unwrap_or(j.table.name.as_str()),
)
{
peer_preds[i].push(sub);
break;
}
}
}
}
if primary_preds.is_empty()
&& let Some(j0) = from.joins.first()
&& matches!(j0.kind, JoinKind::Inner)
&& j0.table.lateral_subquery.is_none()
&& !peer_preds[0].is_empty()
{
let peer_alias = j0.table.alias.as_deref().unwrap_or(j0.table.name.as_str());
let on_safe = j0.on.as_ref().is_some_and(|on| {
let mut quals: Vec<&str> = Vec::new();
let mut all_q = true;
collect_column_qualifiers(on, &mut quals, &mut all_q);
all_q
&& quals.iter().all(|q| {
q.eq_ignore_ascii_case(primary_alias) || q.eq_ignore_ascii_case(peer_alias)
})
});
if on_safe {
let mut from_owned = from.clone();
core::mem::swap(&mut from_owned.primary, &mut from_owned.joins[0].table);
let primary_preds = peer_preds[0].drain(..).collect();
return (Some(from_owned), primary_preds, peer_preds);
}
}
(None, primary_preds, peer_preds)
}
fn build_combined_schema(
primary_alias: &str,
primary_cols: &[ColumnSchema],
joined: &[JoinedPeer<'_>],
) -> Vec<ColumnSchema> {
let mut combined_schema: Vec<ColumnSchema> = Vec::new();
for col in primary_cols {
combined_schema.push(ColumnSchema::new(
alloc::format!("{primary_alias}.{}", col.name),
col.ty,
col.nullable,
));
}
for peer in joined {
for col in &peer.cols {
combined_schema.push(ColumnSchema::new(
alloc::format!("{}.{}", peer.alias, col.name),
col.ty,
col.nullable,
));
}
}
combined_schema
}