use std::collections::HashSet;
use crate::features::storage::api as storage_api;
use plexus_serde::ExpandDir;
use super::super::super::{execution, ExplainError, Result, Row, RowStream};
use super::aggregate::apply_aggregate;
use super::types::{apply_post_ops, AdapterExecutionPlan, ExpandKind, ExpandSpec};
use crate::features::runtime::api::ExecuteParams;
pub(super) fn execute_adapter_plan(
plan: &AdapterExecutionPlan,
params: &ExecuteParams,
handle: &mut storage_api::StorageHandle,
) -> Result<RowStream> {
if let Some(expand_spec) = plan.expand {
return execute_with_expand(plan, params, handle, expand_spec);
}
let mut stream = execution::execute(&plan.explain_plan, params, handle)?;
if let Some(values) = &plan.unwind_values {
stream.rows = execute_unwind_values(handle, &stream.rows, values)?;
}
if let Some(aggregate) = plan.aggregate {
stream.rows = apply_aggregate(stream.rows, aggregate);
}
if plan.path_construct {
}
apply_post_ops(&mut stream, plan);
Ok(stream)
}
fn execute_with_expand(
plan: &AdapterExecutionPlan,
params: &ExecuteParams,
handle: &mut storage_api::StorageHandle,
expand_spec: ExpandSpec,
) -> Result<RowStream> {
if expand_spec.dir != ExpandDir::Out {
return Err(ExplainError::UnsupportedSerializedOperator(
"Expand execution currently supports only Out direction".to_string(),
));
}
let source_stream = execution::execute(&plan.explain_plan, params, handle)?;
let source_rows = if let Some(values) = &plan.unwind_values {
execute_unwind_values(handle, &source_stream.rows, values)?
} else {
source_stream.rows.clone()
};
let mut rows = Vec::new();
for source in &source_rows {
let logical = storage_api::get_logical_node(handle, source.node_id).map_err(|error| {
ExplainError::InvalidPlan(format!("storage read failed: {:?}", error))
})?;
let neighbors = logical.adjacency();
match expand_spec.kind {
ExpandKind::Expand => {
for neighbor_id in neighbors {
rows.push(neighbor_row(handle, neighbor_id)?);
}
}
ExpandKind::OptionalExpand => {
if neighbors.is_empty() {
rows.push(source.clone());
} else {
for neighbor_id in neighbors {
rows.push(neighbor_row(handle, neighbor_id)?);
}
}
}
ExpandKind::SemiExpand => {
if !neighbors.is_empty() {
rows.push(source.clone());
}
}
ExpandKind::ExpandVarLen { min_hops, max_hops } => {
let mut frontier = vec![source.node_id];
let mut seen = HashSet::new();
for hop in 1..=max_hops {
let mut next = Vec::new();
for node_id in frontier {
let logical =
storage_api::get_logical_node(handle, node_id).map_err(|error| {
ExplainError::InvalidPlan(format!(
"storage read failed: {:?}",
error
))
})?;
for neighbor in logical.adjacency() {
next.push(neighbor);
if hop >= min_hops && seen.insert(neighbor) {
rows.push(neighbor_row(handle, neighbor)?);
}
}
}
frontier = next;
if frontier.is_empty() {
break;
}
}
}
}
}
let mut stream = RowStream {
rows,
scanned_nodes: source_stream.scanned_nodes,
latency_micros: source_stream.latency_micros,
morsels_processed: source_stream.morsels_processed,
rerank_batches: source_stream.rerank_batches,
parallel_workers: source_stream.parallel_workers,
};
if let Some(aggregate) = plan.aggregate {
stream.rows = apply_aggregate(stream.rows, aggregate);
}
apply_post_ops(&mut stream, plan);
Ok(stream)
}
pub(super) fn execute_unwind_values(
handle: &mut storage_api::StorageHandle,
source_rows: &[Row],
values: &[u64],
) -> Result<Vec<Row>> {
let mut out = Vec::new();
for _source in source_rows {
for value in values {
out.push(neighbor_row(handle, *value)?);
}
}
Ok(out)
}
fn neighbor_row(handle: &mut storage_api::StorageHandle, node_id: u64) -> Result<Row> {
let summary = storage_api::get_node_row_summary(handle, node_id)
.map_err(|error| ExplainError::InvalidPlan(format!("storage read failed: {:?}", error)))?;
Ok(match summary {
Some(summary) => Row {
node_id,
has_full: summary.has_full,
delta_count: summary.delta_count,
adjacency_degree: summary.adjacency_degree,
score: None,
aggregate_value: None,
},
None => Row {
node_id,
has_full: false,
delta_count: 0,
adjacency_degree: 0,
score: None,
aggregate_value: None,
},
})
}