use crate::helpers::expr_to_value;
use crate::types::PendingOp;
use sparrowdb_catalog::catalog::Catalog;
use sparrowdb_common::{col_id_of, NodeId};
use std::collections::{HashMap, HashSet};
pub(crate) fn pending_candidates_for(
pending_ops: &[PendingOp],
label_id: u32,
node_props: &[sparrowdb_cypher::ast::PropEntry],
) -> Vec<NodeId> {
pending_ops
.iter()
.filter_map(|op| {
let PendingOp::NodeCreate {
label_id: op_lid,
slot,
props: op_props,
} = op
else {
return None;
};
if *op_lid != label_id {
return None;
}
let all_match = node_props.iter().all(|pe| {
let wanted_col = col_id_of(&pe.key);
let wanted_val = expr_to_value(&pe.value);
op_props
.iter()
.any(|&(c, ref v)| c == wanted_col && *v == wanted_val)
});
if all_match {
Some(NodeId((label_id as u64) << 32 | *slot as u64))
} else {
None
}
})
.collect()
}
pub(crate) fn augment_rows_with_pending(
patterns: &[sparrowdb_cypher::ast::PathPattern],
pending_ops: &[PendingOp],
catalog: &Catalog,
existing_rows: &[HashMap<String, NodeId>],
) -> crate::Result<Vec<HashMap<String, NodeId>>> {
let mut var_candidates: HashMap<String, Vec<NodeId>> = HashMap::new();
for pat in patterns {
if pat.rels.is_empty() {
for node_pat in &pat.nodes {
if node_pat.var.is_empty() {
continue;
}
if var_candidates.contains_key(&node_pat.var) {
continue;
}
let label = node_pat.labels.first().cloned().unwrap_or_default();
let label_id: u32 = match catalog.get_label(&label)? {
Some(id) => id as u32,
None => {
var_candidates.insert(node_pat.var.clone(), vec![]);
continue;
}
};
let pending = pending_candidates_for(pending_ops, label_id, &node_pat.props);
var_candidates.insert(node_pat.var.clone(), pending);
}
}
}
if var_candidates.is_empty() {
return Ok(vec![]);
}
let mut already_seen: HashMap<String, HashSet<NodeId>> = HashMap::new();
for row in existing_rows {
for (var, nid) in row {
already_seen.entry(var.clone()).or_default().insert(*nid);
}
}
let vars: Vec<String> = var_candidates.keys().cloned().collect();
let mut new_rows: Vec<HashMap<String, NodeId>> = vec![HashMap::new()];
for var in &vars {
let candidates = var_candidates.get(var).map(Vec::as_slice).unwrap_or(&[]);
if candidates.is_empty() {
continue;
}
let mut expanded: Vec<HashMap<String, NodeId>> = Vec::new();
for partial in &new_rows {
for &cand in candidates {
let mut row = partial.clone();
row.insert(var.clone(), cand);
expanded.push(row);
}
}
new_rows = expanded;
}
let added: Vec<HashMap<String, NodeId>> = new_rows
.into_iter()
.filter(|row| {
if row.is_empty() {
return false;
}
row.iter().any(|(var, nid)| {
!already_seen
.get(var)
.map(|s| s.contains(nid))
.unwrap_or(false)
})
})
.collect();
Ok(added)
}