use super::*;
use crate::datatypes::values::Value;
impl<'a> CypherExecutor<'a> {
pub(super) fn execute_call_subquery(
&self,
import: &[String],
body: &CypherQuery,
result_set: ResultSet,
declared: &std::collections::HashSet<String>,
) -> Result<ResultSet, String> {
self.check_deadline()?;
if !import.is_empty() {
return self.execute_correlated_call_subquery(import, body, result_set, declared);
}
self.execute_uncorrelated_call_subquery(body, result_set)
}
fn execute_correlated_call_subquery(
&self,
import: &[String],
body: &CypherQuery,
result_set: ResultSet,
declared: &std::collections::HashSet<String>,
) -> Result<ResultSet, String> {
for name in import {
if !declared.contains(name) {
return Err(format!(
"CALL {{ }} subquery imports variable `{name}` via its leading WITH, but \
`{name}` is not bound in the outer scope at the CALL; import only \
variables introduced by an earlier MATCH / WITH / UNWIND"
));
}
}
let outer_rows = result_set.rows;
if outer_rows.is_empty() {
return Ok(ResultSet {
rows: Vec::new(),
columns: result_set.columns,
lazy_return_items: None,
});
}
let anchor_imports = import_pattern_anchors(body, import);
let sub = CypherExecutor::with_params(self.graph, self.params, self.deadline)
.with_streaming(self.streaming);
let mut combined_rows: Vec<ResultRow> = Vec::new();
let mut sub_columns: Option<Vec<String>> = None;
for outer_row in outer_rows.into_iter() {
self.check_deadline()?;
let seed = self.seed_row_from_imports(&outer_row, import, &anchor_imports);
let seed_set = ResultSet {
rows: vec![seed],
columns: Vec::new(),
lazy_return_items: None,
};
let body_set = sub.execute_clauses(body, seed_set)?;
let body_result = sub.finalize_result(body_set)?;
if sub_columns.is_none() {
for col in &body_result.columns {
let collides = outer_row.node_bindings.contains_key(col)
|| outer_row.edge_bindings.contains_key(col)
|| outer_row.path_bindings.contains_key(col)
|| outer_row.projected.contains_key(col);
if collides {
return Err(format!(
"CALL {{ }} subquery returns a column `{col}` that already exists in \
the outer scope; rename the subquery's RETURN alias (re-returning an \
imported variable under the same name is a collision in Neo4j)"
));
}
}
sub_columns = Some(body_result.columns.clone());
}
let cols = sub_columns.as_deref().unwrap();
let s = body_result.rows.len();
if s == 0 {
continue;
}
for sub_row in &body_result.rows[..s - 1] {
let mut row = outer_row.clone();
splice_subquery_columns(&mut row, sub_row, cols);
combined_rows.push(row);
}
let mut row = outer_row;
splice_subquery_columns(&mut row, &body_result.rows[s - 1], cols);
combined_rows.push(row);
}
let mut columns = result_set.columns;
if let Some(cols) = sub_columns {
for col in cols {
if !columns.contains(&col) {
columns.push(col);
}
}
}
Ok(ResultSet {
rows: combined_rows,
columns,
lazy_return_items: None,
})
}
fn seed_row_from_imports(
&self,
outer_row: &ResultRow,
import: &[String],
anchor_imports: &[String],
) -> ResultRow {
let mut seed = ResultRow::with_capacity(import.len(), 0, 0);
for name in import {
if let Some(idx) = outer_row.node_bindings.get(name) {
seed.node_bindings.insert(name.clone(), *idx);
} else if let Some(edge) = outer_row.edge_bindings.get(name) {
seed.edge_bindings.insert(name.clone(), *edge);
} else if let Some(path) = outer_row.path_bindings.get(name) {
seed.path_bindings.insert(name.clone(), path.clone());
} else {
match outer_row.projected.get(name) {
Some(val) if !matches!(val, Value::Null) => {
seed.projected.insert(name.clone(), val.clone());
}
_ => self.seed_null_import(&mut seed, name, anchor_imports),
}
}
}
seed
}
fn seed_null_import(&self, seed: &mut ResultRow, name: &str, anchor_imports: &[String]) {
if anchor_imports.iter().any(|a| a == name) {
let sentinel = petgraph::graph::NodeIndex::new(self.graph.graph.node_count());
seed.node_bindings.insert(name.to_string(), sentinel);
} else {
seed.projected.insert(name.to_string(), Value::Null);
}
}
fn execute_uncorrelated_call_subquery(
&self,
body: &CypherQuery,
result_set: ResultSet,
) -> Result<ResultSet, String> {
let sub = CypherExecutor::with_params(self.graph, self.params, self.deadline)
.with_streaming(self.streaming);
let sub_result = sub.execute(body)?;
let sub_columns = sub_result.columns;
let sub_rows = sub_result.rows;
if let Some(first) = result_set.rows.first() {
for col in &sub_columns {
let collides = first.node_bindings.contains_key(col)
|| first.edge_bindings.contains_key(col)
|| first.path_bindings.contains_key(col)
|| first.projected.contains_key(col);
if collides {
return Err(format!(
"CALL {{ }} subquery returns a column `{col}` that already exists in \
the outer scope; rename the subquery's RETURN alias (Neo4j errors on \
shadowing an outer variable)"
));
}
}
}
let outer_rows = result_set.rows;
let mut combined_rows: Vec<ResultRow> = Vec::new();
if outer_rows.is_empty() {
combined_rows.reserve(sub_rows.len());
for sub_row in &sub_rows {
combined_rows.push(subquery_row_to_result_row(sub_row, &sub_columns));
}
} else {
let s = sub_rows.len();
combined_rows.reserve(outer_rows.len().saturating_mul(s));
for outer_row in outer_rows {
if s == 0 {
continue;
}
for sub_row in &sub_rows[..s - 1] {
let mut row = outer_row.clone();
splice_subquery_columns(&mut row, sub_row, &sub_columns);
combined_rows.push(row);
}
let mut row = outer_row;
splice_subquery_columns(&mut row, &sub_rows[s - 1], &sub_columns);
combined_rows.push(row);
}
}
let mut columns = result_set.columns;
for col in &sub_columns {
if !columns.contains(col) {
columns.push(col.clone());
}
}
Ok(ResultSet {
rows: combined_rows,
columns,
lazy_return_items: None,
})
}
}
fn subquery_row_to_result_row(sub_row: &[Value], sub_columns: &[String]) -> ResultRow {
let mut projected = Bindings::with_capacity(sub_columns.len());
for (col, val) in sub_columns.iter().zip(sub_row.iter()) {
projected.insert(col.clone(), val.clone());
}
ResultRow::from_projected(projected)
}
fn splice_subquery_columns(row: &mut ResultRow, sub_row: &[Value], sub_columns: &[String]) {
for (col, val) in sub_columns.iter().zip(sub_row.iter()) {
row.projected.insert(col.clone(), val.clone());
}
}
use crate::graph::languages::cypher::planner::import_pattern_anchors;