use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::Arc;
use super::{
ExecutionError, GraphPath, MatchedEdge, MatchedNode, QueryStats, UnifiedRecord, UnifiedResult,
};
use crate::storage::engine::graph_store::{GraphStore, Namespace, StoredNode};
use crate::storage::engine::graph_table_index::GraphTableIndex;
use crate::storage::query::ast::{
CompareOp, EdgeDirection, EdgePattern, FieldRef, Filter, GraphPattern, GraphQuery, JoinQuery,
JoinType, NodePattern, NodeSelector, PathQuery, Projection, QueryExpr, TableQuery,
};
use crate::storage::query::sql_lowering::{
effective_graph_filter, effective_graph_projections, effective_path_filter,
};
use crate::storage::schema::Value;
pub struct UnifiedExecutor {
graph: Arc<GraphStore>,
index: Arc<GraphTableIndex>,
node_properties: Arc<HashMap<String, HashMap<String, Value>>>,
}
impl UnifiedExecutor {
pub fn new(graph: Arc<GraphStore>, index: Arc<GraphTableIndex>) -> Self {
Self::new_with_node_properties(graph, index, HashMap::new())
}
pub fn new_with_node_properties(
graph: Arc<GraphStore>,
index: Arc<GraphTableIndex>,
node_properties: HashMap<String, HashMap<String, Value>>,
) -> Self {
Self {
graph,
index,
node_properties: Arc::new(node_properties),
}
}
fn matched_node(&self, node: &StoredNode) -> MatchedNode {
let mut node = MatchedNode::from_stored(node);
if let Some(properties) = self.node_properties.get(&node.id) {
node.properties = properties.clone();
}
node
}
fn node_stored_property_value(node: &StoredNode, property: &str) -> Option<Value> {
if let Some(properties) = match property {
"id" => Some(Value::text(node.id.clone())),
"label" => Some(Value::text(node.label.clone())),
"type" | "node_type" => Some(Value::text(node.node_type.as_str().to_string())),
_ => None,
} {
return Some(properties);
}
None
}
fn node_property_value(&self, node: &StoredNode, property: &str) -> Option<Value> {
self.node_properties
.get(&node.id)
.and_then(|properties| properties.get(property).cloned())
.or_else(|| Self::node_stored_property_value(node, property))
}
fn node_property_value_by_id(&self, node_id: &str, property: &str) -> Option<Value> {
if property == "id" {
return Some(Value::text(node_id.to_string()));
}
if property == "label" {
if let Some(node) = self.graph.get_node(node_id).as_ref() {
return Some(Value::text(node.label.clone()));
}
return None;
}
if property == "type" || property == "node_type" {
return self
.graph
.get_node(node_id)
.map(|node| Value::text(node.node_type.as_str().to_string()));
}
self.node_properties
.get(node_id)
.and_then(|properties| properties.get(property).cloned())
}
pub fn execute_on(
graph: &GraphStore,
query: &QueryExpr,
) -> Result<UnifiedResult, ExecutionError> {
Self::execute_on_with_node_properties(graph, query, HashMap::new())
}
pub fn execute_on_with_node_properties(
graph: &GraphStore,
query: &QueryExpr,
node_properties: HashMap<String, HashMap<String, Value>>,
) -> Result<UnifiedResult, ExecutionError> {
let temp = Self::new_with_node_properties(
Arc::new(GraphStore::new()),
Arc::new(GraphTableIndex::new()),
node_properties,
);
match query {
QueryExpr::Graph(q) => temp.exec_graph_on(graph, q),
QueryExpr::Path(q) => temp.exec_path_on(graph, q),
QueryExpr::Table(_) => Err(ExecutionError::new(
"Table queries require proper executor initialization",
)),
QueryExpr::Join(_) => Err(ExecutionError::new(
"Join queries require proper executor initialization",
)),
QueryExpr::Vector(_) => Err(ExecutionError::new(
"Vector queries require VectorStore integration",
)),
QueryExpr::Hybrid(_) => Err(ExecutionError::new(
"Hybrid queries require VectorStore integration",
)),
QueryExpr::Insert(_)
| QueryExpr::Update(_)
| QueryExpr::Delete(_)
| QueryExpr::CreateTable(_)
| QueryExpr::CreateCollection(_)
| QueryExpr::CreateVector(_)
| QueryExpr::DropTable(_)
| QueryExpr::DropGraph(_)
| QueryExpr::DropVector(_)
| QueryExpr::DropDocument(_)
| QueryExpr::DropKv(_)
| QueryExpr::DropCollection(_)
| QueryExpr::Truncate(_)
| QueryExpr::AlterTable(_)
| QueryExpr::GraphCommand(_)
| QueryExpr::SearchCommand(_)
| QueryExpr::CreateIndex(_)
| QueryExpr::DropIndex(_)
| QueryExpr::ProbabilisticCommand(_)
| QueryExpr::Ask(_)
| QueryExpr::SetConfig { .. }
| QueryExpr::ShowConfig { .. }
| QueryExpr::SetSecret { .. }
| QueryExpr::DeleteSecret { .. }
| QueryExpr::ShowSecrets { .. }
| QueryExpr::SetTenant(_)
| QueryExpr::ShowTenant
| QueryExpr::CreateTimeSeries(_)
| QueryExpr::DropTimeSeries(_)
| QueryExpr::CreateQueue(_)
| QueryExpr::AlterQueue(_)
| QueryExpr::DropQueue(_)
| QueryExpr::QueueSelect(_)
| QueryExpr::QueueCommand(_)
| QueryExpr::KvCommand(_)
| QueryExpr::ConfigCommand(_)
| QueryExpr::CreateTree(_)
| QueryExpr::DropTree(_)
| QueryExpr::TreeCommand(_)
| QueryExpr::ExplainAlter(_)
| QueryExpr::TransactionControl(_)
| QueryExpr::MaintenanceCommand(_)
| QueryExpr::CreateSchema(_)
| QueryExpr::DropSchema(_)
| QueryExpr::CreateSequence(_)
| QueryExpr::DropSequence(_)
| QueryExpr::CopyFrom(_)
| QueryExpr::CreateView(_)
| QueryExpr::DropView(_)
| QueryExpr::RefreshMaterializedView(_)
| QueryExpr::CreatePolicy(_)
| QueryExpr::DropPolicy(_)
| QueryExpr::CreateServer(_)
| QueryExpr::DropServer(_)
| QueryExpr::CreateForeignTable(_)
| QueryExpr::DropForeignTable(_)
| QueryExpr::Grant(_)
| QueryExpr::Revoke(_)
| QueryExpr::AlterUser(_)
| QueryExpr::CreateIamPolicy { .. }
| QueryExpr::DropIamPolicy { .. }
| QueryExpr::AttachPolicy { .. }
| QueryExpr::DetachPolicy { .. }
| QueryExpr::ShowPolicies { .. }
| QueryExpr::ShowEffectivePermissions { .. }
| QueryExpr::SimulatePolicy { .. }
| QueryExpr::CreateMigration(_)
| QueryExpr::ApplyMigration(_)
| QueryExpr::RollbackMigration(_)
| QueryExpr::ExplainMigration(_)
| QueryExpr::EventsBackfill(_)
| QueryExpr::EventsBackfillStatus { .. } => Err(ExecutionError::new(
"DML/DDL/Command statements are not supported in UnifiedExecutor",
)),
}
}
fn exec_graph_on(
&self,
graph: &GraphStore,
query: &GraphQuery,
) -> Result<UnifiedResult, ExecutionError> {
let mut result = UnifiedResult::empty();
let mut stats = QueryStats::default();
let effective_filter = effective_graph_filter(query);
let effective_projections = effective_graph_projections(query);
let matches = self.match_pattern_on(graph, &query.pattern, &mut stats)?;
for matched in matches {
if Self::graph_limit_reached(result.records.len(), query.limit) {
break;
}
if !self.eval_filter_on_match(&effective_filter, &matched) {
continue;
}
let record = self.project_match(&matched, &effective_projections);
result.records.push(record);
}
result.stats = stats;
Ok(result)
}
fn exec_path_on(
&self,
graph: &GraphStore,
query: &PathQuery,
) -> Result<UnifiedResult, ExecutionError> {
let mut result = UnifiedResult::empty();
let mut queue: VecDeque<(String, GraphPath)> = VecDeque::new();
let mut visited: HashSet<String> = HashSet::new();
let start_ids = self.resolve_selector_on(graph, &query.from);
for start in start_ids {
queue.push_back((start.clone(), GraphPath::start(&start)));
visited.insert(start);
}
let target_ids: HashSet<_> = self
.resolve_selector_on(graph, &query.to)
.into_iter()
.collect();
let max_len = query.max_length as usize;
while let Some((current, path)) = queue.pop_front() {
if path.len() > max_len {
continue;
}
if target_ids.contains(¤t) && !path.is_empty() {
let mut record = UnifiedRecord::new();
record.paths.push(path.clone());
result.records.push(record);
continue;
}
for (edge_type, neighbor, weight) in graph.outgoing_edges(¤t) {
if !query.via.is_empty() && !query.via.iter().any(|via| via == edge_type.as_str()) {
continue;
}
if !visited.contains(&neighbor) {
visited.insert(neighbor.clone());
let edge = MatchedEdge::from_tuple(¤t, edge_type, &neighbor, weight);
let new_path = path.extend(edge, &neighbor);
queue.push_back((neighbor, new_path));
}
}
}
result.stats.edges_scanned = visited.len() as u64;
Ok(result)
}
fn resolve_selector_on(&self, graph: &GraphStore, selector: &NodeSelector) -> Vec<String> {
match selector {
NodeSelector::ById(id) => vec![id.clone()],
NodeSelector::ByType {
node_label,
filter: _,
} => graph
.nodes_with_category(node_label)
.into_iter()
.map(|n| n.id)
.collect(),
NodeSelector::ByRow { table, row_id } => {
if let Some((table_id, row_id)) = match (table.as_str().parse::<u16>(), *row_id) {
(Ok(table_id), row_id) => Some((table_id, row_id)),
_ => None,
} {
let mut ids = Vec::new();
if let Some(node_id) = self.index.get_node_for_row(table_id, row_id) {
ids.push(node_id);
}
if ids.is_empty() {
ids.extend(graph.iter_nodes().filter_map(|node| {
let table_ref = node.table_ref?;
if table_ref.table_id == table_id && table_ref.row_id == row_id {
Some(node.id)
} else {
None
}
}));
}
ids
} else {
Vec::new()
}
}
}
}
pub fn execute(&self, query: &QueryExpr) -> Result<UnifiedResult, ExecutionError> {
match query {
QueryExpr::Table(q) => self.exec_table(q),
QueryExpr::Graph(q) => self.exec_graph(q),
QueryExpr::Join(q) => self.exec_join(q),
QueryExpr::Path(q) => self.exec_path(q),
QueryExpr::Vector(_) => {
Err(ExecutionError::new(
"Vector queries not yet implemented in UnifiedExecutor",
))
}
QueryExpr::Hybrid(_) => {
Err(ExecutionError::new(
"Hybrid queries not yet implemented in UnifiedExecutor",
))
}
QueryExpr::Insert(_)
| QueryExpr::Update(_)
| QueryExpr::Delete(_)
| QueryExpr::CreateTable(_)
| QueryExpr::CreateCollection(_)
| QueryExpr::CreateVector(_)
| QueryExpr::DropTable(_)
| QueryExpr::DropGraph(_)
| QueryExpr::DropVector(_)
| QueryExpr::DropDocument(_)
| QueryExpr::DropKv(_)
| QueryExpr::DropCollection(_)
| QueryExpr::Truncate(_)
| QueryExpr::AlterTable(_)
| QueryExpr::GraphCommand(_)
| QueryExpr::SearchCommand(_)
| QueryExpr::CreateIndex(_)
| QueryExpr::DropIndex(_)
| QueryExpr::ProbabilisticCommand(_)
| QueryExpr::Ask(_)
| QueryExpr::SetConfig { .. }
| QueryExpr::ShowConfig { .. }
| QueryExpr::SetSecret { .. }
| QueryExpr::DeleteSecret { .. }
| QueryExpr::ShowSecrets { .. }
| QueryExpr::SetTenant(_)
| QueryExpr::ShowTenant
| QueryExpr::CreateTimeSeries(_)
| QueryExpr::DropTimeSeries(_)
| QueryExpr::CreateQueue(_)
| QueryExpr::AlterQueue(_)
| QueryExpr::DropQueue(_)
| QueryExpr::QueueSelect(_)
| QueryExpr::QueueCommand(_)
| QueryExpr::KvCommand(_)
| QueryExpr::ConfigCommand(_)
| QueryExpr::CreateTree(_)
| QueryExpr::DropTree(_)
| QueryExpr::TreeCommand(_)
| QueryExpr::ExplainAlter(_)
| QueryExpr::TransactionControl(_)
| QueryExpr::MaintenanceCommand(_)
| QueryExpr::CreateSchema(_)
| QueryExpr::DropSchema(_)
| QueryExpr::CreateSequence(_)
| QueryExpr::DropSequence(_)
| QueryExpr::CopyFrom(_)
| QueryExpr::CreateView(_)
| QueryExpr::DropView(_)
| QueryExpr::RefreshMaterializedView(_)
| QueryExpr::CreatePolicy(_)
| QueryExpr::DropPolicy(_)
| QueryExpr::CreateServer(_)
| QueryExpr::DropServer(_)
| QueryExpr::CreateForeignTable(_)
| QueryExpr::DropForeignTable(_)
| QueryExpr::Grant(_)
| QueryExpr::Revoke(_)
| QueryExpr::AlterUser(_)
| QueryExpr::CreateIamPolicy { .. }
| QueryExpr::DropIamPolicy { .. }
| QueryExpr::AttachPolicy { .. }
| QueryExpr::DetachPolicy { .. }
| QueryExpr::ShowPolicies { .. }
| QueryExpr::ShowEffectivePermissions { .. }
| QueryExpr::SimulatePolicy { .. }
| QueryExpr::CreateMigration(_)
| QueryExpr::ApplyMigration(_)
| QueryExpr::RollbackMigration(_)
| QueryExpr::ExplainMigration(_)
| QueryExpr::EventsBackfill(_)
| QueryExpr::EventsBackfillStatus { .. } => Err(ExecutionError::new(
"DML/DDL/Command statements are not supported in UnifiedExecutor",
)),
}
}
fn exec_table(&self, _query: &TableQuery) -> Result<UnifiedResult, ExecutionError> {
Ok(UnifiedResult::empty())
}
fn exec_graph(&self, query: &GraphQuery) -> Result<UnifiedResult, ExecutionError> {
let mut result = UnifiedResult::empty();
let mut stats = QueryStats::default();
let matches = self.match_pattern(&query.pattern, &mut stats)?;
let effective_filter = effective_graph_filter(query);
let effective_projections = effective_graph_projections(query);
for matched in matches {
if Self::graph_limit_reached(result.records.len(), query.limit) {
break;
}
if !self.eval_filter_on_match(&effective_filter, &matched) {
continue;
}
let record = self.project_match(&matched, &effective_projections);
result.push(record);
}
result.stats = stats;
Ok(result)
}
fn graph_limit_reached(row_count: usize, limit: Option<u64>) -> bool {
limit.is_some_and(|limit| row_count as u64 >= limit)
}
fn match_pattern(
&self,
pattern: &GraphPattern,
stats: &mut QueryStats,
) -> Result<Vec<PatternMatch>, ExecutionError> {
self.match_pattern_on(self.graph.as_ref(), pattern, stats)
}
fn match_pattern_on(
&self,
graph: &GraphStore,
pattern: &GraphPattern,
stats: &mut QueryStats,
) -> Result<Vec<PatternMatch>, ExecutionError> {
if pattern.nodes.is_empty() {
return Ok(Vec::new());
}
let first = &pattern.nodes[0];
let mut matches = self.find_matching_nodes_on(graph, first, stats)?;
for edge_pattern in &pattern.edges {
matches =
self.extend_matches_on(graph, matches, edge_pattern, &pattern.nodes, stats)?;
}
Ok(matches)
}
fn find_matching_nodes_on(
&self,
graph: &GraphStore,
pattern: &NodePattern,
stats: &mut QueryStats,
) -> Result<Vec<PatternMatch>, ExecutionError> {
let mut matches = Vec::new();
for node in graph.iter_nodes() {
stats.nodes_scanned += 1;
if let Some(ref expected) = pattern.node_label {
let expected_id = graph.registry.lookup(Namespace::Node, expected);
match expected_id {
Some(id) if id == node.label_id => {}
_ => continue,
}
}
let mut match_props = true;
for prop_filter in &pattern.properties {
if !self.eval_node_property_filter(&node, prop_filter) {
match_props = false;
break;
}
}
if match_props {
let mut pm = PatternMatch::new();
pm.nodes
.insert(pattern.alias.clone(), self.matched_node(&node));
matches.push(pm);
}
}
Ok(matches)
}
fn extend_matches_on(
&self,
graph: &GraphStore,
matches: Vec<PatternMatch>,
edge_pattern: &EdgePattern,
node_patterns: &[NodePattern],
stats: &mut QueryStats,
) -> Result<Vec<PatternMatch>, ExecutionError> {
let mut extended = Vec::new();
let target_pattern = node_patterns
.iter()
.find(|n| n.alias == edge_pattern.to)
.ok_or_else(|| {
ExecutionError::new(format!(
"Node alias '{}' not found in pattern",
edge_pattern.to
))
})?;
for pm in matches {
let source_node = pm.nodes.get(&edge_pattern.from).ok_or_else(|| {
ExecutionError::new(format!(
"Source node '{}' not found in match",
edge_pattern.from
))
})?;
let edges: Vec<_> = match edge_pattern.direction {
EdgeDirection::Outgoing => {
graph
.outgoing_edges(&source_node.id)
.into_iter()
.map(|(et, target, w)| (et, target, w, true)) .collect()
}
EdgeDirection::Incoming => {
graph
.incoming_edges(&source_node.id)
.into_iter()
.map(|(et, source, w)| (et, source, w, false)) .collect()
}
EdgeDirection::Both => {
let mut all: Vec<_> = graph
.outgoing_edges(&source_node.id)
.into_iter()
.map(|(et, target, w)| (et, target, w, true))
.collect();
all.extend(
graph
.incoming_edges(&source_node.id)
.into_iter()
.map(|(et, source, w)| (et, source, w, false)),
);
all
}
};
for (etype, other_id, weight, is_outgoing) in edges {
stats.edges_scanned += 1;
if let Some(ref expected) = edge_pattern.edge_label {
if etype.as_str() != expected.as_str() {
continue;
}
}
let target_id = &other_id;
if let Some(target_node) = graph.get_node(target_id) {
if let Some(ref expected) = target_pattern.node_label {
let expected_id = graph.registry.lookup(Namespace::Node, expected);
match expected_id {
Some(id) if id == target_node.label_id => {}
_ => continue,
}
}
let mut match_props = true;
for prop_filter in &target_pattern.properties {
if !self.eval_node_property_filter(&target_node, prop_filter) {
match_props = false;
break;
}
}
if match_props {
let mut new_pm = pm.clone();
new_pm.nodes.insert(
target_pattern.alias.clone(),
self.matched_node(&target_node),
);
if let Some(ref alias) = edge_pattern.alias {
let edge = if is_outgoing {
MatchedEdge::from_tuple(&source_node.id, etype, target_id, weight)
} else {
MatchedEdge::from_tuple(target_id, etype, &source_node.id, weight)
};
new_pm.edges.insert(alias.clone(), edge);
}
extended.push(new_pm);
}
}
}
}
Ok(extended)
}
fn eval_node_property_filter(
&self,
node: &StoredNode,
filter: &crate::storage::query::ast::PropertyFilter,
) -> bool {
let Some(value) = self.node_property_value(node, filter.name.as_str()) else {
return false;
};
self.compare_values(&value, &filter.op, &filter.value)
}
fn compare_values(&self, left: &Value, op: &CompareOp, right: &Value) -> bool {
match op {
CompareOp::Eq => left == right,
CompareOp::Ne => left != right,
CompareOp::Lt => self.value_lt(left, right),
CompareOp::Le => self.value_lt(left, right) || left == right,
CompareOp::Gt => self.value_lt(right, left),
CompareOp::Ge => self.value_lt(right, left) || left == right,
}
}
fn value_lt(&self, left: &Value, right: &Value) -> bool {
match (left, right) {
(Value::Integer(a), Value::Integer(b)) => a < b,
(Value::Float(a), Value::Float(b)) => a < b,
(Value::Integer(a), Value::Float(b)) => (*a as f64) < *b,
(Value::Float(a), Value::Integer(b)) => *a < (*b as f64),
(Value::Text(a), Value::Text(b)) => a < b,
(Value::Timestamp(a), Value::Timestamp(b)) => a < b,
_ => false,
}
}
fn eval_filter_on_match(&self, filter: &Option<Filter>, matched: &PatternMatch) -> bool {
match filter {
None => true,
Some(f) => self.eval_filter(f, matched),
}
}
fn eval_filter(&self, filter: &Filter, matched: &PatternMatch) -> bool {
match filter {
Filter::Compare { field, op, value } => {
let actual = self.get_field_value(field, matched);
match actual {
Some(v) => self.compare_values(&v, op, value),
None => false,
}
}
Filter::CompareFields { left, op, right } => {
let l = self.get_field_value(left, matched);
let r = self.get_field_value(right, matched);
match (l, r) {
(Some(lv), Some(rv)) => self.compare_values(&lv, op, &rv),
_ => false,
}
}
Filter::CompareExpr { .. } => {
false
}
Filter::And(left, right) => {
self.eval_filter(left, matched) && self.eval_filter(right, matched)
}
Filter::Or(left, right) => {
self.eval_filter(left, matched) || self.eval_filter(right, matched)
}
Filter::Not(inner) => !self.eval_filter(inner, matched),
Filter::IsNull(field) => self.get_field_value(field, matched).is_none(),
Filter::IsNotNull(field) => self.get_field_value(field, matched).is_some(),
Filter::In { field, values } => match self.get_field_value(field, matched) {
Some(v) => values.contains(&v),
None => false,
},
Filter::Between { field, low, high } => match self.get_field_value(field, matched) {
Some(v) => !self.value_lt(&v, low) && !self.value_lt(high, &v),
None => false,
},
Filter::Like { field, pattern } => match self.get_field_value(field, matched) {
Some(Value::Text(s)) => self.match_like(&s, pattern),
_ => false,
},
Filter::StartsWith { field, prefix } => match self.get_field_value(field, matched) {
Some(Value::Text(s)) => s.starts_with(prefix),
_ => false,
},
Filter::EndsWith { field, suffix } => match self.get_field_value(field, matched) {
Some(Value::Text(s)) => s.ends_with(suffix),
_ => false,
},
Filter::Contains { field, substring } => match self.get_field_value(field, matched) {
Some(Value::Text(s)) => s.contains(substring),
_ => false,
},
}
}
fn match_like(&self, text: &str, pattern: &str) -> bool {
let regex_pattern = pattern.replace('%', ".*").replace('_', ".");
if pattern.starts_with('%') && pattern.ends_with('%') {
let inner = &pattern[1..pattern.len() - 1];
text.contains(inner)
} else if let Some(suffix) = pattern.strip_prefix('%') {
text.ends_with(suffix)
} else if let Some(prefix) = pattern.strip_suffix('%') {
text.starts_with(prefix)
} else {
text == pattern || regex_pattern == text
}
}
fn get_field_value(&self, field: &FieldRef, matched: &PatternMatch) -> Option<Value> {
match field {
FieldRef::NodeId { alias } => {
matched.nodes.get(alias).map(|n| Value::text(n.id.clone()))
}
FieldRef::NodeProperty { alias, property } => matched
.nodes
.get(alias)
.and_then(|n| match property.as_str() {
"id" => Some(Value::text(n.id.clone())),
"label" => Some(Value::text(n.label.clone())),
"type" | "node_type" => Some(Value::text(n.node_label.clone())),
_ => n.properties.get(property).cloned(),
})
.or_else(|| {
matched
.edges
.get(alias)
.and_then(|e| Self::edge_property_value(e, property))
}),
FieldRef::EdgeProperty { alias, property } => matched
.edges
.get(alias)
.and_then(|e| Self::edge_property_value(e, property)),
FieldRef::TableColumn { table, column } => {
if !table.is_empty() {
if let Some(n) = matched.nodes.get(table) {
return match column.as_str() {
"id" => Some(Value::text(n.id.clone())),
"label" => Some(Value::text(n.label.clone())),
"type" | "node_type" => Some(Value::text(n.node_label.clone())),
other => n.properties.get(other).cloned(),
};
}
if let Some(e) = matched.edges.get(table) {
return Self::edge_property_value(e, column);
}
}
None
}
}
}
fn edge_property_value(edge: &MatchedEdge, property: &str) -> Option<Value> {
match property {
"weight" => Some(Value::Float(edge.weight as f64)),
"from" | "source" => Some(Value::text(edge.from.clone())),
"to" | "target" => Some(Value::text(edge.to.clone())),
"label" | "type" | "edge_type" => Some(Value::text(edge.edge_label.clone())),
_ => None,
}
}
fn get_join_value(&self, field: &FieldRef, record: &UnifiedRecord) -> Option<Value> {
match field {
FieldRef::TableColumn { column, .. } => record.get(column.as_str()).cloned(),
FieldRef::NodeId { alias } => record
.nodes
.get(alias)
.map(|node| Value::text(node.id.clone())),
FieldRef::NodeProperty { alias, property } => {
record
.nodes
.get(alias)
.and_then(|n| match property.as_str() {
"id" => Some(Value::text(n.id.clone())),
"label" => Some(Value::text(n.label.clone())),
"type" | "node_type" => Some(Value::text(n.node_label.clone())),
_ => n.properties.get(property).cloned(),
})
}
FieldRef::EdgeProperty { alias, property } => {
record
.edges
.get(alias)
.and_then(|e| match property.as_str() {
"weight" => Some(Value::Float(e.weight as f64)),
"from" => Some(Value::text(e.from.clone())),
"to" => Some(Value::text(e.to.clone())),
_ => None,
})
}
}
}
fn project_match(&self, matched: &PatternMatch, projections: &[Projection]) -> UnifiedRecord {
let mut record = UnifiedRecord::new();
record.nodes = matched.nodes.clone();
record.edges = matched.edges.clone();
for proj in projections {
match proj {
Projection::Field(field, alias) => {
if let (FieldRef::NodeId { alias: node_alias }, None) = (field, alias) {
if let Some(node) = matched.nodes.get(node_alias) {
record.set(&format!("{}.id", node_alias), Value::text(node.id.clone()));
record.set(
&format!("{}.label", node_alias),
Value::text(node.label.clone()),
);
record.set(
&format!("{}.node_type", node_alias),
Value::text(node.node_label.clone()),
);
for (k, v) in &node.properties {
record.set(&format!("{}.{}", node_alias, k), v.clone());
}
continue;
}
if let Some(edge) = matched.edges.get(node_alias) {
record.set(
&format!("{}.from", node_alias),
Value::text(edge.from.clone()),
);
record.set(&format!("{}.to", node_alias), Value::text(edge.to.clone()));
record.set(
&format!("{}.label", node_alias),
Value::text(edge.edge_label.clone()),
);
record.set(
&format!("{}.weight", node_alias),
Value::Float(edge.weight as f64),
);
continue;
}
}
if let Some(value) = self.get_field_value(field, matched) {
let key = alias.clone().unwrap_or_else(|| self.field_to_string(field));
record.set(&key, value);
}
}
Projection::All => {
for (alias, node) in &matched.nodes {
record.set(&format!("{}.id", alias), Value::text(node.id.clone()));
record.set(&format!("{}.label", alias), Value::text(node.label.clone()));
}
}
Projection::Column(col) => {
for node in matched.nodes.values() {
match col.as_str() {
"id" => record.set(col, Value::text(node.id.clone())),
"label" => record.set(col, Value::text(node.label.clone())),
_ => {}
}
}
}
Projection::Alias(col, alias) => {
for node in matched.nodes.values() {
match col.as_str() {
"id" => record.set(alias, Value::text(node.id.clone())),
"label" => record.set(alias, Value::text(node.label.clone())),
_ => {}
}
}
}
_ => {} }
}
record
}
fn field_to_string(&self, field: &FieldRef) -> String {
match field {
FieldRef::NodeId { alias } => format!("{}.id", alias),
FieldRef::NodeProperty { alias, property } => format!("{}.{}", alias, property),
FieldRef::EdgeProperty { alias, property } => format!("{}.{}", alias, property),
FieldRef::TableColumn { table, column } => {
if table.is_empty() {
column.clone()
} else {
format!("{}.{}", table, column)
}
}
}
}
fn exec_join(&self, query: &JoinQuery) -> Result<UnifiedResult, ExecutionError> {
let left_result = self.execute(&query.left)?;
let right_result = self.execute(&query.right)?;
let mut result = UnifiedResult::empty();
for left in &left_result.records {
let left_value = self.get_join_value(&query.on.left_field, left);
for right in &right_result.records {
let right_value = self.get_join_value(&query.on.right_field, right);
if left_value == right_value {
let mut merged = left.clone();
merged.nodes.extend(right.nodes.clone());
merged.edges.extend(right.edges.clone());
for (k, v) in right.iter_fields() {
merged.set_arc(k.clone(), v.clone());
}
result.push(merged);
}
}
if matches!(query.join_type, JoinType::LeftOuter) {
if !right_result
.records
.iter()
.any(|r| self.get_join_value(&query.on.right_field, r) == left_value)
{
result.push(left.clone());
}
}
}
Ok(result)
}
fn exec_path(&self, query: &PathQuery) -> Result<UnifiedResult, ExecutionError> {
let mut result = UnifiedResult::empty();
let mut stats = QueryStats::default();
let start_nodes = self.resolve_selector(&query.from, &mut stats)?;
let target_nodes: HashSet<String> = self
.resolve_selector(&query.to, &mut stats)?
.into_iter()
.collect();
for start_id in start_nodes {
let paths = self.bfs_paths(
&start_id,
&target_nodes,
&query.via,
query.max_length,
&mut stats,
)?;
for path in paths {
if effective_path_filter(query).is_some() {
}
let mut record = UnifiedRecord::new();
record.paths.push(path);
result.push(record);
}
}
result.stats = stats;
Ok(result)
}
fn resolve_selector(
&self,
selector: &NodeSelector,
stats: &mut QueryStats,
) -> Result<Vec<String>, ExecutionError> {
match selector {
NodeSelector::ById(id) => Ok(vec![id.clone()]),
NodeSelector::ByType { node_label, filter } => {
let expected_id = self.graph.registry.lookup(Namespace::Node, node_label);
let mut nodes = Vec::new();
for node in self.graph.iter_nodes() {
stats.nodes_scanned += 1;
if expected_id.map(|id| node.label_id == id).unwrap_or(false) {
let matches_filter = filter
.as_ref()
.map(|f| self.eval_node_property_filter(&node, f))
.unwrap_or(true);
if matches_filter {
nodes.push(node.id.clone());
}
}
}
Ok(nodes)
}
NodeSelector::ByRow { row_id, .. } => {
if let Some(node_id) = self.index.get_node_for_row(0, *row_id) {
Ok(vec![node_id])
} else {
Ok(Vec::new())
}
}
}
}
fn bfs_paths(
&self,
start: &str,
targets: &HashSet<String>,
via: &[String],
max_length: u32,
stats: &mut QueryStats,
) -> Result<Vec<GraphPath>, ExecutionError> {
let mut paths = Vec::new();
let mut queue: VecDeque<GraphPath> = VecDeque::new();
let mut visited: HashSet<String> = HashSet::new();
queue.push_back(GraphPath::start(start));
visited.insert(start.to_string());
while let Some(current_path) = queue.pop_front() {
let Some(current_node) = current_path.nodes.last() else {
continue;
};
if targets.contains(current_node) && !current_path.is_empty() {
paths.push(current_path.clone());
continue;
}
if current_path.len() >= max_length as usize {
continue;
}
for (edge_type, target_id, weight) in self.graph.outgoing_edges(current_node) {
stats.edges_scanned += 1;
if !via.is_empty() && !via.iter().any(|v| v == edge_type.as_str()) {
continue;
}
if visited.contains(&target_id) {
continue;
}
let edge = MatchedEdge::from_tuple(current_node, edge_type, &target_id, weight);
let new_path = current_path.extend(edge, &target_id);
visited.insert(target_id.clone());
queue.push_back(new_path);
}
}
Ok(paths)
}
}
#[derive(Debug, Clone, Default)]
struct PatternMatch {
nodes: HashMap<String, MatchedNode>,
edges: HashMap<String, MatchedEdge>,
}
impl PatternMatch {
fn new() -> Self {
Self::default()
}
}