use std::collections::HashSet;
use crate::bridge::envelope::{PhysicalPlan, Response};
use crate::bridge::physical_plan::GraphOp;
use crate::control::scatter_gather;
use crate::control::state::SharedState;
use crate::engine::graph::traversal_options::GraphTraversalOptions;
use crate::types::{Lsn, RequestId, TenantId};
pub async fn cross_core_bfs(
shared: &SharedState,
tenant_id: TenantId,
start_nodes: Vec<String>,
edge_label: Option<String>,
direction: crate::engine::graph::edge_store::Direction,
max_depth: usize,
) -> crate::Result<Response> {
cross_core_bfs_with_options(
shared,
tenant_id,
start_nodes,
edge_label,
direction,
max_depth,
&GraphTraversalOptions::default(),
)
.await
}
pub async fn cross_core_bfs_with_options(
shared: &SharedState,
tenant_id: TenantId,
start_nodes: Vec<String>,
edge_label: Option<String>,
direction: crate::engine::graph::edge_store::Direction,
max_depth: usize,
options: &GraphTraversalOptions,
) -> crate::Result<Response> {
let cluster_mode = shared.cluster_routing.is_some();
let mut visited: HashSet<String> = HashSet::new();
let mut all_discovered: Vec<String> = Vec::new();
let mut frontier: Vec<String> = start_nodes.clone();
for node in &start_nodes {
visited.insert(node.clone());
all_discovered.push(node.clone());
}
for depth in 0..max_depth {
if frontier.is_empty() {
break;
}
let mut local_hop_results: Vec<String> = Vec::new();
for node in &frontier {
let plan = PhysicalPlan::Graph(GraphOp::Neighbors {
node_id: node.clone(),
edge_label: edge_label.clone(),
direction,
rls_filters: Vec::new(),
});
let resp = super::broadcast::broadcast_to_all_cores(shared, tenant_id, plan, 0).await?;
if !resp.payload.is_empty() {
let json_text =
crate::data::executor::response_codec::decode_payload_to_json(&resp.payload);
if let Ok(arr) = serde_json::from_str::<Vec<serde_json::Value>>(&json_text) {
for item in arr {
if let Some(neighbor) = item.get("node").and_then(|v| v.as_str()) {
local_hop_results.push(neighbor.to_string());
}
}
}
}
}
let merged_hop_results = if cluster_mode {
let (local_nodes, cross_shard_envelope) = {
let routing = shared
.cluster_routing
.as_ref()
.expect("cluster_routing checked above");
let rt = routing.read().unwrap_or_else(|p| p.into_inner());
scatter_gather::partition_local_remote(&local_hop_results, shared.node_id, &rt)
};
if cross_shard_envelope.is_empty() {
local_nodes
} else {
let remaining_depth = max_depth.saturating_sub(depth + 1);
let (merged, _meta) = scatter_gather::coordinate_cross_shard_hop(
shared,
tenant_id,
scatter_gather::CrossShardHopParams {
local_nodes,
envelope: cross_shard_envelope,
options,
edge_label: edge_label.as_deref(),
direction,
remaining_depth,
},
)
.await?;
merged
}
} else {
local_hop_results
};
let mut next_frontier: Vec<String> = Vec::new();
for node in merged_hop_results {
if visited.insert(node.clone()) {
next_frontier.push(node.clone());
all_discovered.push(node);
}
}
frontier = next_frontier;
if all_discovered.len() >= options.max_visited {
break;
}
}
let payload = match serde_json::to_vec(&all_discovered) {
Ok(v) => v,
Err(e) => {
return Err(crate::Error::Serialization {
format: "json".into(),
detail: e.to_string(),
});
}
};
Ok(Response {
request_id: RequestId::new(0),
status: crate::bridge::envelope::Status::Ok,
attempt: 1,
partial: false,
payload: crate::bridge::envelope::Payload::from_vec(payload),
watermark_lsn: Lsn::ZERO,
error_code: None,
})
}