use crate::parser::{ParsedSelect, Predicate, PredicateValue};
use crate::schema::{ColumnName, Schema, TableDef};
use crate::value::Value;
pub const DEFAULT_CORRELATED_CAP: u64 = 10_000_000;
#[derive(Debug, Clone, Default)]
pub struct PlannerScope<'a> {
scopes: Vec<Vec<ScopeBinding<'a>>>,
}
#[derive(Debug, Clone)]
struct ScopeBinding<'a> {
alias: String,
table: &'a TableDef,
}
impl<'a> PlannerScope<'a> {
pub fn empty() -> Self {
Self::default()
}
#[must_use]
pub fn push(&self, bindings: Vec<(String, &'a TableDef)>) -> Self {
let mut scopes = self.scopes.clone();
scopes.push(
bindings
.into_iter()
.map(|(alias, table)| ScopeBinding { alias, table })
.collect(),
);
Self { scopes }
}
pub fn resolve(
&self,
qualifier: Option<&str>,
column: &ColumnName,
) -> Option<(usize, &'a TableDef)> {
let n = self.scopes.len();
for (i, scope) in self.scopes.iter().enumerate().rev() {
let depth = n - 1 - i;
for binding in scope {
if let Some(q) = qualifier {
if binding.alias.eq_ignore_ascii_case(q)
&& binding.table.find_column(column).is_some()
{
return Some((depth, binding.table));
}
} else if binding.table.find_column(column).is_some() {
return Some((depth, binding.table));
}
}
}
None
}
pub fn is_empty(&self) -> bool {
self.scopes.is_empty()
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct OuterRef {
pub qualifier: String,
pub column: ColumnName,
pub scope_depth: usize,
}
impl OuterRef {
pub fn as_column_ref(&self) -> String {
format!("{}.{}", self.qualifier, self.column)
}
}
pub fn collect_outer_refs(
subquery: &ParsedSelect,
outer_scope: &PlannerScope<'_>,
schema: &Schema,
) -> Vec<OuterRef> {
let mut inner_bindings: Vec<(String, &TableDef)> = Vec::new();
if let Some(t) = schema.get_table(&subquery.table.clone().into()) {
inner_bindings.push((subquery.table.clone(), t));
}
for join in &subquery.joins {
if let Some(t) = schema.get_table(&join.table.clone().into()) {
inner_bindings.push((join.table.clone(), t));
}
}
let inner_scope = outer_scope.push(inner_bindings);
let mut out = Vec::new();
for pred in &subquery.predicates {
collect_from_predicate(pred, &inner_scope, &mut out);
}
out
}
fn collect_from_predicate(
pred: &Predicate,
inner_scope: &PlannerScope<'_>,
out: &mut Vec<OuterRef>,
) {
match pred {
Predicate::Eq(_col, val)
| Predicate::Lt(_col, val)
| Predicate::Le(_col, val)
| Predicate::Gt(_col, val)
| Predicate::Ge(_col, val) => {
if let Some(r) = pv_as_outer_ref(val, inner_scope) {
out.push(r);
}
}
Predicate::In(_col, vals) | Predicate::NotIn(_col, vals) => {
for v in vals {
if let Some(r) = pv_as_outer_ref(v, inner_scope) {
out.push(r);
}
}
}
Predicate::NotBetween(_col, lo, hi) => {
if let Some(r) = pv_as_outer_ref(lo, inner_scope) {
out.push(r);
}
if let Some(r) = pv_as_outer_ref(hi, inner_scope) {
out.push(r);
}
}
Predicate::JsonExtractEq { value, .. } | Predicate::JsonContains { value, .. } => {
if let Some(r) = pv_as_outer_ref(value, inner_scope) {
out.push(r);
}
}
Predicate::Or(left, right) => {
for p in left {
collect_from_predicate(p, inner_scope, out);
}
for p in right {
collect_from_predicate(p, inner_scope, out);
}
}
Predicate::InSubquery { subquery, .. } | Predicate::Exists { subquery, .. } => {
for r in collect_outer_refs_nested(subquery, inner_scope) {
out.push(r);
}
}
Predicate::Always(_)
| Predicate::Like(_, _)
| Predicate::NotLike(_, _)
| Predicate::ILike(_, _)
| Predicate::NotILike(_, _)
| Predicate::IsNull(_)
| Predicate::IsNotNull(_)
| Predicate::ScalarCmp { .. } => {
}
}
}
fn collect_outer_refs_nested(
subquery: &ParsedSelect,
outer_scope: &PlannerScope<'_>,
) -> Vec<OuterRef> {
let mut out = Vec::new();
for pred in &subquery.predicates {
collect_from_predicate(pred, outer_scope, &mut out);
}
out
}
fn pv_as_outer_ref(pv: &PredicateValue, inner_scope: &PlannerScope<'_>) -> Option<OuterRef> {
let PredicateValue::ColumnRef(raw) = pv else {
return None;
};
let (qualifier, col_name) = match raw.split_once('.') {
Some((q, c)) => (q.to_string(), ColumnName::new(c.to_string())),
None => return None,
};
match inner_scope.resolve(Some(&qualifier), &col_name) {
Some((depth, _)) if depth >= 1 => Some(OuterRef {
qualifier,
column: col_name,
scope_depth: depth,
}),
Some(_) => None, None => {
Some(OuterRef {
qualifier,
column: col_name,
scope_depth: 1,
})
}
}
}
pub fn substitute_outer_refs<H: std::hash::BuildHasher>(
subquery: &ParsedSelect,
bindings: &std::collections::HashMap<String, Value, H>,
) -> ParsedSelect {
let mut out = subquery.clone();
out.predicates = out
.predicates
.into_iter()
.map(|p| substitute_in_predicate(p, bindings))
.collect();
out
}
fn substitute_in_predicate<H: std::hash::BuildHasher>(
pred: Predicate,
bindings: &std::collections::HashMap<String, Value, H>,
) -> Predicate {
match pred {
Predicate::Eq(col, v) => Predicate::Eq(col, substitute_pv(v, bindings)),
Predicate::Lt(col, v) => Predicate::Lt(col, substitute_pv(v, bindings)),
Predicate::Le(col, v) => Predicate::Le(col, substitute_pv(v, bindings)),
Predicate::Gt(col, v) => Predicate::Gt(col, substitute_pv(v, bindings)),
Predicate::Ge(col, v) => Predicate::Ge(col, substitute_pv(v, bindings)),
Predicate::In(col, vs) => Predicate::In(
col,
vs.into_iter().map(|v| substitute_pv(v, bindings)).collect(),
),
Predicate::NotIn(col, vs) => Predicate::NotIn(
col,
vs.into_iter().map(|v| substitute_pv(v, bindings)).collect(),
),
Predicate::NotBetween(col, lo, hi) => Predicate::NotBetween(
col,
substitute_pv(lo, bindings),
substitute_pv(hi, bindings),
),
Predicate::JsonExtractEq {
column,
path,
as_text,
value,
} => Predicate::JsonExtractEq {
column,
path,
as_text,
value: substitute_pv(value, bindings),
},
Predicate::JsonContains { column, value } => Predicate::JsonContains {
column,
value: substitute_pv(value, bindings),
},
Predicate::Or(l, r) => Predicate::Or(
l.into_iter()
.map(|p| substitute_in_predicate(p, bindings))
.collect(),
r.into_iter()
.map(|p| substitute_in_predicate(p, bindings))
.collect(),
),
Predicate::InSubquery {
column,
subquery,
negated,
} => Predicate::InSubquery {
column,
subquery: Box::new(substitute_outer_refs(&subquery, bindings)),
negated,
},
Predicate::Exists { subquery, negated } => Predicate::Exists {
subquery: Box::new(substitute_outer_refs(&subquery, bindings)),
negated,
},
other => other,
}
}
fn substitute_pv<H: std::hash::BuildHasher>(
pv: PredicateValue,
bindings: &std::collections::HashMap<String, Value, H>,
) -> PredicateValue {
if let PredicateValue::ColumnRef(ref name) = pv {
if let Some(v) = bindings.get(name) {
return PredicateValue::Literal(v.clone());
}
}
pv
}
pub fn try_semi_join_rewrite(
subquery: &ParsedSelect,
negated: bool,
outer_refs: &[OuterRef],
) -> Option<(ColumnName, ParsedSelect)> {
if !subquery.group_by.is_empty()
|| !subquery.aggregates.is_empty()
|| subquery.limit.is_some()
|| subquery.offset.is_some()
|| !subquery.order_by.is_empty()
|| subquery.distinct
|| !subquery.ctes.is_empty()
|| !subquery.joins.is_empty()
|| !subquery.having.is_empty()
{
return None;
}
let mut eq_idx: Option<usize> = None;
let mut inner_col: Option<ColumnName> = None;
let mut outer_col_ref: Option<String> = None;
for (i, p) in subquery.predicates.iter().enumerate() {
if let Predicate::Eq(col, PredicateValue::ColumnRef(raw)) = p {
if outer_refs.iter().any(|r| &r.as_column_ref() == raw) {
if eq_idx.is_some() {
return None;
}
eq_idx = Some(i);
inner_col = Some(col.clone());
outer_col_ref = Some(raw.clone());
}
}
}
let eq_idx = eq_idx?;
let inner_col = inner_col?;
let outer_col_ref = outer_col_ref?;
if outer_refs
.iter()
.filter(|r| r.as_column_ref() == outer_col_ref)
.count()
!= outer_refs.len()
{
return None;
}
let outer_col_name = outer_col_ref.rsplit_once('.').map_or_else(
|| ColumnName::new(outer_col_ref.clone()),
|(_, c)| ColumnName::new(c.to_string()),
);
let mut rewritten = subquery.clone();
rewritten.predicates.remove(eq_idx);
rewritten.columns = Some(vec![inner_col.clone()]);
rewritten.column_aliases = None;
let _ = negated;
Some((outer_col_name, rewritten))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::schema::{ColumnDef, DataType, SchemaBuilder, TableDef};
use kimberlite_store::TableId;
fn mini_schema() -> (Schema, TableDef, TableDef) {
let schema = SchemaBuilder::new()
.table(
"patient",
TableId::new(1),
vec![
ColumnDef::new("id", DataType::BigInt).not_null(),
ColumnDef::new("name", DataType::Text),
],
vec!["id".into()],
)
.table(
"consent",
TableId::new(2),
vec![
ColumnDef::new("id", DataType::BigInt).not_null(),
ColumnDef::new("subject_id", DataType::BigInt).not_null(),
ColumnDef::new("purpose", DataType::Text),
],
vec!["id".into()],
)
.build();
let patient = schema.get_table(&"patient".into()).unwrap().clone();
let consent = schema.get_table(&"consent".into()).unwrap().clone();
(schema, patient, consent)
}
#[test]
fn scope_resolve_innermost_first() {
let (_schema, patient, consent) = mini_schema();
let outer = PlannerScope::empty().push(vec![("p".into(), &patient)]);
let inner = outer.push(vec![("c".into(), &consent)]);
let res = inner.resolve(Some("c"), &"subject_id".into()).unwrap();
assert_eq!(res.0, 0);
let res = inner.resolve(Some("p"), &"id".into()).unwrap();
assert_eq!(res.0, 1);
assert!(inner.resolve(Some("p"), &"nonexistent".into()).is_none());
}
#[test]
fn outer_ref_round_trip() {
let r = OuterRef {
qualifier: "p".into(),
column: "id".into(),
scope_depth: 1,
};
assert_eq!(r.as_column_ref(), "p.id");
}
}