use rustc_hash::{FxHashMap, FxHashSet};
use selene_core::{NodeId, Value};
use crate::{
BindingId, HiddenBindingId, HopContributor, JoinTree, PathSelector, TailBinding,
runtime::{Binding, ExecutorError},
};
use super::{pattern, visited_set};
type EndpointPair = (NodeId, NodeId);
pub(crate) fn execute(
child: &JoinTree,
selector: PathSelector,
source_binding: TailBinding,
final_binding: TailBinding,
hop_contributors: &[HopContributor],
env: pattern::WalkContext<'_, '_, '_, '_, '_, '_>,
) -> Result<Vec<Binding>, ExecutorError> {
let child_rows = pattern::walk_join_tree(child, env)?;
match selector {
PathSelector::All => Ok(child_rows),
PathSelector::Any { paths } => {
select_any(child_rows, source_binding, final_binding, env, paths)
}
PathSelector::AllShortest => select_counted(
child_rows,
source_binding,
final_binding,
hop_contributors,
env,
1,
true,
),
PathSelector::AnyShortest => select_counted(
child_rows,
source_binding,
final_binding,
hop_contributors,
env,
1,
false,
),
PathSelector::CountedShortest { paths } => select_counted(
child_rows,
source_binding,
final_binding,
hop_contributors,
env,
paths,
false,
),
PathSelector::CountedShortestGroup { groups } => select_counted(
child_rows,
source_binding,
final_binding,
hop_contributors,
env,
groups,
true,
),
}
}
fn select_any(
rows: Vec<Binding>,
source_binding: TailBinding,
final_binding: TailBinding,
env: pattern::WalkContext<'_, '_, '_, '_, '_, '_>,
count: u32,
) -> Result<Vec<Binding>, ExecutorError> {
if count == 0 {
return Ok(Vec::new());
}
let mut selected = Vec::new();
let mut per_pair_selected: FxHashMap<EndpointPair, usize> = FxHashMap::default();
let limit = usize::try_from(count).unwrap_or(usize::MAX);
let mut rows_since_check = 0;
for row in rows {
env.ctx
.tx
.check_cancellation_stride(&mut rows_since_check, 1)?;
let Some(pair) = endpoint_pair(&row, source_binding, final_binding, env)? else {
continue;
};
let selected_count = per_pair_selected.entry(pair).or_default();
if *selected_count < limit {
*selected_count += 1;
selected.push(row);
}
}
Ok(selected)
}
fn select_counted(
rows: Vec<Binding>,
source_binding: TailBinding,
final_binding: TailBinding,
hop_contributors: &[HopContributor],
env: pattern::WalkContext<'_, '_, '_, '_, '_, '_>,
count: u32,
group: bool,
) -> Result<Vec<Binding>, ExecutorError> {
let mut per_pair: FxHashMap<EndpointPair, Vec<(usize, u32)>> = FxHashMap::default();
let mut rows_since_check = 0;
for (index, row) in rows.iter().enumerate() {
env.ctx
.tx
.check_cancellation_stride(&mut rows_since_check, 1)?;
if !visited_set::trail_allows_hops(row, hop_contributors, env)? {
continue;
}
let Some(pair) = endpoint_pair(row, source_binding, final_binding, env)? else {
continue;
};
let hops = hop_count(row, hop_contributors, env)?;
per_pair.entry(pair).or_default().push((index, hops));
}
let mut kept: FxHashSet<usize> = FxHashSet::default();
let limit = count as usize;
for entries in per_pair.values() {
if group {
let mut distinct: Vec<u32> = entries.iter().map(|(_, hops)| *hops).collect();
distinct.sort_unstable();
distinct.dedup();
distinct.truncate(limit);
let kept_lengths: FxHashSet<u32> = distinct.into_iter().collect();
for &(index, hops) in entries {
if kept_lengths.contains(&hops) {
kept.insert(index);
}
}
} else if entries.len() <= limit {
for &(index, _) in entries {
kept.insert(index);
}
} else {
let mut ranked = entries.clone();
ranked.sort_by_key(|(_, hops)| *hops);
for &(index, _) in ranked.iter().take(limit) {
kept.insert(index);
}
}
}
let mut selected = Vec::with_capacity(kept.len());
rows_since_check = 0;
for (index, row) in rows.into_iter().enumerate() {
env.ctx
.tx
.check_cancellation_stride(&mut rows_since_check, 1)?;
if kept.contains(&index) {
selected.push(row);
}
}
Ok(selected)
}
fn endpoint_pair(
row: &Binding,
source_binding: TailBinding,
final_binding: TailBinding,
env: pattern::WalkContext<'_, '_, '_, '_, '_, '_>,
) -> Result<Option<EndpointPair>, ExecutorError> {
let Some(source) = node_value(row, source_binding, env)? else {
return Ok(None);
};
let Some(final_node) = node_value(row, final_binding, env)? else {
return Ok(None);
};
Ok(Some((source, final_node)))
}
fn node_value(
row: &Binding,
binding: TailBinding,
env: pattern::WalkContext<'_, '_, '_, '_, '_, '_>,
) -> Result<Option<NodeId>, ExecutorError> {
match tail_value(row, binding, env)? {
Value::NodeRef(id) => Ok(Some(id)),
Value::Null => Ok(None),
_ => Err(ExecutorError::ImplementationDefined {
detail: "path-search endpoint binding is not a node",
}),
}
}
fn hop_count(
row: &Binding,
contributors: &[HopContributor],
env: pattern::WalkContext<'_, '_, '_, '_, '_, '_>,
) -> Result<u32, ExecutorError> {
let mut total = 0u32;
for contributor in contributors {
let value = match contributor {
HopContributor::Fixed(count) => *count,
HopContributor::EdgeNamed(binding) => {
edge_hop_count(binding_value(row, *binding, env)?)?
}
HopContributor::EdgeHidden(hidden) => edge_hop_count(hidden_value(row, *hidden, env)?)?,
HopContributor::QuestionedNamed(binding) => {
questioned_hop_count(binding_value(row, *binding, env)?)?
}
HopContributor::QuestionedHidden(hidden) => {
questioned_hop_count(hidden_value(row, *hidden, env)?)?
}
HopContributor::GroupNamed(binding) => list_len(binding_value(row, *binding, env)?)?,
HopContributor::GroupHidden(hidden) => list_len(hidden_value(row, *hidden, env)?)?,
};
total = total
.checked_add(value)
.ok_or(ExecutorError::ImplementationDefined {
detail: "path-search hop count overflow",
})?;
}
Ok(total)
}
fn edge_hop_count(value: Value) -> Result<u32, ExecutorError> {
match value {
Value::EdgeRef(_) => Ok(1),
_ => Err(ExecutorError::ImplementationDefined {
detail: "path-search fixed hop contributor is not an edge",
}),
}
}
fn questioned_hop_count(value: Value) -> Result<u32, ExecutorError> {
match value {
Value::Null => Ok(0),
Value::EdgeRef(_) => Ok(1),
_ => Err(ExecutorError::ImplementationDefined {
detail: "path-search questioned hop contributor is not an edge or null",
}),
}
}
fn list_len(value: Value) -> Result<u32, ExecutorError> {
match value {
Value::List(values) => {
u32::try_from(values.len()).map_err(|_| ExecutorError::ImplementationDefined {
detail: "path-search hop list length exceeds u32",
})
}
_ => Err(ExecutorError::ImplementationDefined {
detail: "path-search hop contributor is not a list",
}),
}
}
fn tail_value(
row: &Binding,
binding: TailBinding,
env: pattern::WalkContext<'_, '_, '_, '_, '_, '_>,
) -> Result<Value, ExecutorError> {
match binding {
TailBinding::Named(binding) => binding_value(row, binding, env),
TailBinding::Hidden(hidden) => hidden_value(row, hidden, env),
}
}
fn binding_value(
row: &Binding,
binding: BindingId,
env: pattern::WalkContext<'_, '_, '_, '_, '_, '_>,
) -> Result<Value, ExecutorError> {
let Some(index) = pattern::binding_index(env.pattern, env.schema, binding) else {
return Err(ExecutorError::ImplementationDefined {
detail: "path-search binding column missing",
});
};
Ok(row.get(index).cloned().unwrap_or(Value::Null))
}
fn hidden_value(
row: &Binding,
hidden: HiddenBindingId,
env: pattern::WalkContext<'_, '_, '_, '_, '_, '_>,
) -> Result<Value, ExecutorError> {
let Some(index) = pattern::hidden_index(env.schema, hidden) else {
return Err(ExecutorError::ImplementationDefined {
detail: "path-search hidden binding column missing",
});
};
Ok(row.get(index).cloned().unwrap_or(Value::Null))
}