use std::collections::{HashMap, HashSet};
use sonic_rs;
use crate::bridge::envelope::Response;
use crate::control::state::SharedState;
use crate::engine::graph::edge_store::Direction;
use crate::engine::graph::traversal_options::GraphTraversalOptions;
use crate::types::{Lsn, RequestId, TenantId};
use super::hop::{NeighborHopParams, execute_neighbor_hop};
#[derive(serde::Serialize)]
struct WireNode<'a> {
id: &'a str,
depth: u8,
}
#[derive(serde::Serialize)]
struct WireEdge<'a> {
from: &'a str,
to: &'a str,
label: &'a str,
}
#[derive(serde::Serialize)]
struct WireSubGraph<'a> {
nodes: Vec<WireNode<'a>>,
edges: Vec<WireEdge<'a>>,
}
pub async fn cross_core_traverse_subgraph(
shared: &SharedState,
tenant_id: TenantId,
start: String,
edge_label: Option<String>,
direction: Direction,
max_depth: usize,
options: &GraphTraversalOptions,
) -> crate::Result<Response> {
let mut depth_of: HashMap<String, u8> = HashMap::new();
let mut visited: HashSet<String> = HashSet::new();
let mut node_order: Vec<String> = Vec::new();
let mut edges: Vec<(String, String, String)> = Vec::new();
let mut frontier: Vec<String> = vec![start.clone()];
visited.insert(start.clone());
depth_of.insert(start.clone(), 0);
node_order.push(start);
for hop_idx in 0..max_depth {
if frontier.is_empty() {
break;
}
let hop = execute_neighbor_hop(
shared,
tenant_id,
NeighborHopParams {
frontier: &frontier,
edge_label: edge_label.as_deref(),
direction,
options,
discovered_so_far: node_order.len(),
remaining_depth: max_depth.saturating_sub(hop_idx + 1),
},
)
.await?;
edges.extend(hop.local_triples);
let next_depth_tag = (hop_idx + 1).min(u8::MAX as usize) as u8;
let mut next_frontier: Vec<String> = Vec::new();
for node in hop.merged_destinations {
if visited.insert(node.clone()) {
depth_of.insert(node.clone(), next_depth_tag);
node_order.push(node.clone());
next_frontier.push(node);
if node_order.len() >= options.max_visited {
break;
}
}
}
frontier = next_frontier;
if node_order.len() >= options.max_visited {
break;
}
}
let wire_nodes: Vec<WireNode<'_>> = node_order
.iter()
.map(|id| WireNode {
id: id.as_str(),
depth: *depth_of.get(id).unwrap_or(&0),
})
.collect();
let wire_edges: Vec<WireEdge<'_>> = edges
.iter()
.map(|(src, label, dst)| WireEdge {
from: src.as_str(),
to: dst.as_str(),
label: label.as_str(),
})
.collect();
let envelope = WireSubGraph {
nodes: wire_nodes,
edges: wire_edges,
};
let payload = sonic_rs::to_vec(&envelope).map_err(|e| crate::Error::Serialization {
format: "json".into(),
detail: e.to_string(),
})?;
Ok(Response {
request_id: RequestId::new(0),
status: crate::bridge::envelope::Status::Ok,
attempt: 1,
partial: false,
payload: crate::bridge::envelope::Payload::from_vec(payload),
watermark_lsn: Lsn::ZERO,
error_code: None,
})
}