use std::collections::{HashMap, HashSet};
use sonic_rs;
use crate::bridge::envelope::{PhysicalPlan, Response};
use crate::control::scatter_gather;
use crate::control::state::SharedState;
use crate::engine::graph::traversal_options::GraphTraversalOptions;
use crate::types::{TenantId, TraceId};
use nodedb_physical::physical_plan::GraphOp;
use super::helpers::{encode_path, ok_response};
pub async fn cross_core_shortest_path(
shared: &SharedState,
tenant_id: TenantId,
src: String,
dst: String,
edge_label: Option<String>,
max_depth: usize,
) -> crate::Result<Response> {
let options = GraphTraversalOptions::default();
let cluster_mode = shared.cluster_routing.is_some();
let direction = crate::engine::graph::edge_store::Direction::Out;
if src == dst {
let payload = encode_path(&[src])?;
return Ok(ok_response(payload));
}
let mut parent: HashMap<String, String> = HashMap::new();
let mut visited: HashSet<String> = HashSet::new();
visited.insert(src.clone());
let mut frontier: Vec<String> = vec![src.clone()];
for depth in 0..max_depth {
if frontier.is_empty() {
break;
}
let mut discoveries: Vec<(String, String)> = Vec::new();
let remaining_budget = options
.max_visited
.saturating_sub(visited.len())
.min(u32::MAX as usize) as u32;
let plan = PhysicalPlan::Graph(GraphOp::NeighborsMulti {
node_ids: frontier.clone(),
edge_label: edge_label.clone(),
direction,
max_results: remaining_budget,
rls_filters: Vec::new(),
});
let resp = crate::control::server::broadcast::broadcast_to_all_cores(
shared,
tenant_id,
plan,
TraceId::ZERO,
)
.await?;
if !resp.payload.is_empty() {
let json_text =
crate::data::executor::response_codec::decode_payload_to_json(&resp.payload);
if let Ok(arr) = sonic_rs::from_str::<Vec<serde_json::Value>>(&json_text) {
for item in arr {
let src_node = item.get("src").and_then(|v| v.as_str());
let nb = item.get("node").and_then(|v| v.as_str());
if let (Some(s), Some(n)) = (src_node, nb) {
discoveries.push((s.to_string(), n.to_string()));
}
}
}
}
let merged: Vec<(String, String)> = if cluster_mode {
let local_ids: Vec<String> = discoveries.iter().map(|(_, n)| n.clone()).collect();
let (local_nodes, envelope) = {
let routing = shared
.cluster_routing
.as_ref()
.expect("cluster_routing checked above");
let rt = routing.read().unwrap_or_else(|p| p.into_inner());
scatter_gather::partition_local_remote(&local_ids, shared.node_id, &rt)
};
if envelope.is_empty() {
discoveries
} else {
let remaining = max_depth.saturating_sub(depth + 1);
let (remote_hits, _meta) = scatter_gather::coordinate_cross_shard_hop(
shared,
tenant_id,
scatter_gather::CrossShardHopParams {
local_nodes,
envelope,
options: &options,
edge_label: edge_label.as_deref(),
direction,
remaining_depth: remaining,
},
)
.await?;
let mut out = discoveries;
if let Some(attrib_parent) = frontier.first().cloned() {
for n in remote_hits {
out.push((attrib_parent.clone(), n));
}
}
out
}
} else {
discoveries
};
let mut next_frontier: Vec<String> = Vec::new();
for (from, to) in merged {
if !visited.insert(to.clone()) {
continue;
}
parent.insert(to.clone(), from);
if to == dst {
let path = reconstruct(&parent, &src, &dst);
let payload = encode_path(&path)?;
return Ok(ok_response(payload));
}
next_frontier.push(to);
}
frontier = next_frontier;
if visited.len() >= options.max_visited {
break;
}
}
let payload = encode_path::<String>(&[])?;
Ok(ok_response(payload))
}
fn reconstruct(parent: &HashMap<String, String>, src: &str, dst: &str) -> Vec<String> {
let mut path: Vec<String> = Vec::new();
let mut cursor = dst.to_string();
path.push(cursor.clone());
while cursor != src {
match parent.get(&cursor) {
Some(p) => {
cursor = p.clone();
path.push(cursor.clone());
}
None => break,
}
}
path.reverse();
path
}