#![allow(clippy::cast_precision_loss)]
#![allow(clippy::cast_possible_truncation)]
mod index_prefilter;
mod similarity;
mod start_nodes;
mod vector_first;
mod where_eval;
use crate::collection::graph::{concurrent_bfs_stream, StreamingConfig};
use crate::collection::types::Collection;
use crate::error::{Error, Result};
use crate::guardrails::QueryContext;
use crate::storage::LogPayloadStorage;
use crate::velesql::{GraphPattern, MatchClause};
use std::collections::HashMap;
#[derive(Debug, Clone)]
pub struct MatchResult {
pub node_id: u64,
pub depth: u32,
pub path: Vec<u64>,
pub bindings: HashMap<String, u64>,
pub score: Option<f32>,
pub projected: HashMap<String, serde_json::Value>,
}
impl MatchResult {
#[must_use]
pub fn new(node_id: u64, depth: u32, path: Vec<u64>) -> Self {
Self {
node_id,
depth,
path,
bindings: HashMap::new(),
score: None,
projected: HashMap::new(),
}
}
#[must_use]
pub fn with_binding(mut self, alias: String, node_id: u64) -> Self {
self.bindings.insert(alias, node_id);
self
}
#[must_use]
pub fn with_projected(mut self, projected: HashMap<String, serde_json::Value>) -> Self {
self.projected = projected;
self
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum ProjectionItem<'a> {
Wildcard,
FunctionCall(&'a str),
PropertyPath {
alias: &'a str,
property: &'a str,
},
BareAlias(&'a str),
}
#[must_use]
pub fn parse_projection_item(expression: &str) -> ProjectionItem<'_> {
if expression == "*" {
return ProjectionItem::Wildcard;
}
if let Some(paren_pos) = expression.find('(') {
let name = &expression[..paren_pos];
return ProjectionItem::FunctionCall(name);
}
if let Some(dot_pos) = expression.find('.') {
let alias = &expression[..dot_pos];
let property = &expression[dot_pos + 1..];
if !alias.is_empty() && !property.is_empty() {
return ProjectionItem::PropertyPath { alias, property };
}
}
ProjectionItem::BareAlias(expression)
}
#[must_use]
pub fn parse_property_path(expression: &str) -> Option<(&str, &str)> {
match parse_projection_item(expression) {
ProjectionItem::PropertyPath { alias, property } => Some((alias, property)),
_ => None,
}
}
struct SingleNodeCtx<'a> {
match_clause: &'a MatchClause,
params: &'a HashMap<String, serde_json::Value>,
payload_guard: &'a LogPayloadStorage,
seen_pairs: &'a mut std::collections::HashSet<(u64, u64)>,
all_results: &'a mut Vec<MatchResult>,
limit: usize,
prefilter: Option<std::collections::HashSet<u64>>,
}
struct TraversalCtx<'a> {
match_clause: &'a MatchClause,
params: &'a HashMap<String, serde_json::Value>,
payload_guard: &'a LogPayloadStorage,
guardrail: Option<&'a QueryContext>,
seen_pairs: &'a mut std::collections::HashSet<(u64, u64)>,
all_results: &'a mut Vec<MatchResult>,
limit: usize,
iteration_count: &'a mut u32,
reported_cardinality: &'a mut usize,
}
impl Collection {
pub fn execute_match(
&self,
match_clause: &MatchClause,
params: &HashMap<String, serde_json::Value>,
) -> Result<Vec<MatchResult>> {
self.execute_match_with_context(match_clause, params, None)
}
pub fn execute_match_with_context(
&self,
match_clause: &MatchClause,
params: &HashMap<String, serde_json::Value>,
ctx: Option<&QueryContext>,
) -> Result<Vec<MatchResult>> {
if match_clause.patterns.is_empty() {
return Err(Error::Config(
"MATCH query must have at least one pattern".to_string(),
));
}
let limit = match_clause.return_clause.limit.map_or(100, |l| l as usize);
let mut all_results: Vec<MatchResult> = Vec::new();
let mut iteration_count: u32 = 0;
let mut reported_cardinality: usize = 0;
let payload_guard = self.payload_storage.read();
for pattern in &match_clause.patterns {
if all_results.len() >= limit {
break;
}
self.execute_single_pattern(
pattern,
match_clause,
params,
ctx,
&payload_guard,
&self.edge_store,
limit,
&mut all_results,
&mut iteration_count,
&mut reported_cardinality,
)?;
}
Ok(all_results)
}
#[allow(clippy::too_many_arguments)]
fn execute_single_pattern(
&self,
pattern: &GraphPattern,
match_clause: &MatchClause,
params: &HashMap<String, serde_json::Value>,
ctx: Option<&QueryContext>,
payload_guard: &LogPayloadStorage,
edge_store: &crate::collection::graph::ConcurrentEdgeStore,
limit: usize,
all_results: &mut Vec<MatchResult>,
iteration_count: &mut u32,
reported_cardinality: &mut usize,
) -> Result<()> {
let start_nodes = self.find_start_nodes(pattern)?;
if start_nodes.is_empty() {
return Ok(());
}
let prefilter = match_clause
.where_clause
.as_ref()
.and_then(|wc| index_prefilter::compute_index_prefilter(self, pattern, wc, params));
let mut seen_pairs: std::collections::HashSet<(u64, u64)> =
std::collections::HashSet::new();
if pattern.relationships.is_empty() {
let mut sn_ctx = SingleNodeCtx {
match_clause,
params,
payload_guard,
seen_pairs: &mut seen_pairs,
all_results,
limit,
prefilter,
};
return self.collect_single_node_results(&start_nodes, &mut sn_ctx);
}
let mut trav_ctx = TraversalCtx {
match_clause,
params,
payload_guard,
guardrail: ctx,
seen_pairs: &mut seen_pairs,
all_results,
limit,
iteration_count,
reported_cardinality,
};
self.traverse_pattern(pattern, &start_nodes, edge_store, &mut trav_ctx)
}
fn traverse_pattern(
&self,
pattern: &GraphPattern,
start_nodes: &[(u64, HashMap<String, u64>)],
edge_store: &crate::collection::graph::ConcurrentEdgeStore,
ctx: &mut TraversalCtx<'_>,
) -> Result<()> {
let max_depth = Self::compute_max_depth(pattern);
let rel_types = Self::extract_rel_types(pattern);
for (start_id, start_bindings) in start_nodes {
if ctx.all_results.len() >= ctx.limit {
break;
}
let config = StreamingConfig::default()
.with_limit(ctx.limit.saturating_sub(ctx.all_results.len()))
.with_max_depth(max_depth)
.with_rel_types(rel_types.clone());
for traversal_result in concurrent_bfs_stream(edge_store, *start_id, config) {
if ctx.all_results.len() >= ctx.limit {
break;
}
*ctx.iteration_count += 1;
self.check_periodic_guardrails(
ctx.guardrail,
*ctx.iteration_count,
ctx.all_results,
ctx.reported_cardinality,
)?;
self.accept_traversal_hit(
*start_id,
&traversal_result,
start_bindings,
pattern,
ctx,
)?;
}
}
Ok(())
}
fn accept_traversal_hit(
&self,
start_id: u64,
traversal_result: &crate::collection::graph::TraversalResult,
start_bindings: &HashMap<String, u64>,
pattern: &GraphPattern,
ctx: &mut TraversalCtx<'_>,
) -> Result<()> {
let match_result = self.build_traversal_match_result(
traversal_result,
start_bindings,
pattern,
ctx.guardrail,
)?;
if let Some(ref where_clause) = ctx.match_clause.where_clause {
if !self.evaluate_where_condition(
traversal_result.target_id,
Some(&match_result.bindings),
where_clause,
ctx.params,
ctx.payload_guard,
)? {
return Ok(());
}
}
let pair = (start_id, traversal_result.target_id);
if !ctx.seen_pairs.insert(pair) {
return Ok(());
}
let mut final_result = match_result;
final_result.projected = self.project_properties(
&final_result.bindings,
&ctx.match_clause.return_clause,
ctx.payload_guard,
);
ctx.all_results.push(final_result);
Ok(())
}
fn collect_single_node_results(
&self,
start_nodes: &[(u64, HashMap<String, u64>)],
ctx: &mut SingleNodeCtx<'_>,
) -> Result<()> {
for (node_id, bindings) in start_nodes {
if ctx.all_results.len() >= ctx.limit {
break;
}
if !index_prefilter::passes_prefilter(ctx.prefilter.as_ref(), *node_id) {
continue;
}
if let Some(ref where_clause) = ctx.match_clause.where_clause {
if !self.evaluate_where_condition(
*node_id,
Some(bindings),
where_clause,
ctx.params,
ctx.payload_guard,
)? {
continue;
}
}
if ctx.seen_pairs.contains(&(*node_id, *node_id)) {
continue;
}
ctx.seen_pairs.insert((*node_id, *node_id));
let mut result = MatchResult::new(*node_id, 0, Vec::new());
result.bindings.clone_from(bindings);
result.projected = self.project_properties(
bindings,
&ctx.match_clause.return_clause,
ctx.payload_guard,
);
ctx.all_results.push(result);
}
Ok(())
}
#[allow(clippy::unused_self)]
fn check_periodic_guardrails(
&self,
ctx: Option<&QueryContext>,
iteration_count: u32,
all_results: &[MatchResult],
reported_cardinality: &mut usize,
) -> Result<()> {
if iteration_count % 100 != 0 {
return Ok(());
}
let Some(ctx) = ctx else { return Ok(()) };
ctx.check_timeout()
.map_err(|e| Error::GuardRail(e.to_string()))?;
let new_results = all_results.len().saturating_sub(*reported_cardinality);
if new_results > 0 {
ctx.check_cardinality(new_results)
.map_err(|e| Error::GuardRail(e.to_string()))?;
*reported_cardinality = all_results.len();
}
Ok(())
}
#[allow(clippy::unused_self)]
fn build_traversal_match_result(
&self,
traversal_result: &crate::collection::graph::TraversalResult,
start_bindings: &HashMap<String, u64>,
pattern: &GraphPattern,
ctx: Option<&QueryContext>,
) -> Result<MatchResult> {
let mut match_result = MatchResult::new(
traversal_result.target_id,
traversal_result.depth,
traversal_result.path.clone(),
);
match_result.bindings.clone_from(start_bindings);
if let Some(ctx) = ctx {
ctx.check_depth(traversal_result.depth)
.map_err(|e| Error::GuardRail(e.to_string()))?;
}
if let Some(target_pattern) = pattern.nodes.get(traversal_result.depth as usize) {
if let Some(ref alias) = target_pattern.alias {
match_result
.bindings
.insert(alias.clone(), traversal_result.target_id);
}
}
Ok(match_result)
}
}