use axum::extract::{Path, Query, State};
use axum::http::StatusCode;
use axum::Json;
use petgraph::Direction;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use crate::core::corpus::contrib::{ContribEdge, ContribGraph, ContribNode};
use crate::core::entity::EdgeKind;
use crate::core::registry::IndexId;
use crate::core::symbol_graph::parse_kind_token;
use super::state::SearchAppState;
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub(super) struct IngestGraphRequest {
#[serde(default)]
pub schema: Option<String>,
pub producer: String,
#[serde(default)]
pub producer_version: Option<String>,
#[serde(default)]
pub git_sha: Option<String>,
#[serde(default)]
pub nodes: Vec<ContribNode>,
#[serde(default)]
pub edges: Vec<ContribEdge>,
}
#[derive(Debug, Serialize)]
pub(super) struct IngestGraphResponse {
pub producer: String,
pub replaced: bool,
pub nodes_received: usize,
pub edges_received: usize,
pub graph_nodes: usize,
pub graph_edges: usize,
pub unknown_edge_tags_dropped: usize,
}
pub(super) async fn ingest_graph_handler(
State(state): State<Arc<SearchAppState>>,
Path(id): Path<String>,
Json(req): Json<IngestGraphRequest>,
) -> Result<Json<IngestGraphResponse>, (StatusCode, Json<serde_json::Value>)> {
let err = |code: StatusCode, msg: String| (code, Json(serde_json::json!({ "error": msg })));
if req.producer.trim().is_empty() {
return Err(err(
StatusCode::BAD_REQUEST,
"producer must be a non-empty identifier".into(),
));
}
let index_id = IndexId::new(id);
let handle = state
.registry
.get(&index_id)
.ok_or_else(|| err(StatusCode::NOT_FOUND, format!("unknown index '{index_id}'")))?;
let contrib = ContribGraph {
producer: req.producer.clone(),
producer_version: req.producer_version,
git_sha: req.git_sha,
nodes: req.nodes,
edges: req.edges,
};
let (nodes_received, edges_received) = (contrib.nodes.len(), contrib.edges.len());
tracing::info!(
index_id = %index_id,
producer = %contrib.producer,
schema = req.schema.as_deref().unwrap_or("-"),
nodes = nodes_received,
edges = edges_received,
"contrib ingest received"
);
let replaced = {
let indexer = handle.indexer.read().await;
let Some(corpus) = indexer.corpus_store() else {
return Err(err(
StatusCode::SERVICE_UNAVAILABLE,
"index has no durable corpus store — contributed graphs require one".into(),
));
};
let join = tokio::task::spawn_blocking(move || corpus.save_contrib_graph(&contrib)).await;
match join {
Ok(Ok(replaced)) => replaced,
Ok(Err(e)) => {
return Err(err(
StatusCode::INTERNAL_SERVER_ERROR,
format!("contrib persist failed: {e}"),
))
}
Err(e) => {
return Err(err(
StatusCode::INTERNAL_SERVER_ERROR,
format!("contrib persist task panicked: {e}"),
))
}
}
};
let graph = {
let indexer = handle.indexer.read().await;
indexer.rebuild_symbol_graph_now().await;
indexer.snapshot_symbol_graph().await
};
Ok(Json(IngestGraphResponse {
producer: req.producer,
replaced,
nodes_received,
edges_received,
graph_nodes: graph.node_count(),
graph_edges: graph.edge_count(),
unknown_edge_tags_dropped: graph.unknown_edge_tags_dropped(),
}))
}
#[derive(Debug, Deserialize)]
pub(super) struct NeighborsParams {
pub node: String,
#[serde(default)]
pub direction: Option<String>,
#[serde(default)]
pub edge_kinds: Option<String>,
#[serde(default)]
pub max_hops: Option<usize>,
}
#[derive(Debug, Serialize)]
pub(super) struct NeighborEntry {
pub symbol: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub node_kind: Option<String>,
#[serde(skip_serializing_if = "String::is_empty")]
pub chunk_id: String,
pub edge: String,
}
pub(super) async fn graph_neighbors_handler(
State(state): State<Arc<SearchAppState>>,
Path(id): Path<String>,
Query(params): Query<NeighborsParams>,
) -> Result<Json<serde_json::Value>, (StatusCode, Json<serde_json::Value>)> {
let err = |code: StatusCode, msg: String| (code, Json(serde_json::json!({ "error": msg })));
let index_id = IndexId::new(id);
let handle = state
.registry
.get(&index_id)
.ok_or_else(|| err(StatusCode::NOT_FOUND, format!("unknown index '{index_id}'")))?;
let dirs: &[Direction] = match params.direction.as_deref().unwrap_or("both") {
"in" | "inbound" => &[Direction::Incoming],
"out" | "outbound" => &[Direction::Outgoing],
"both" => &[Direction::Outgoing, Direction::Incoming],
other => {
return Err(err(
StatusCode::BAD_REQUEST,
format!("direction must be in|out|both, got '{other}'"),
))
}
};
let kinds: Option<Vec<EdgeKind>> = match params.edge_kinds.as_deref() {
None | Some("") => None,
Some(csv) => {
let mut out = Vec::new();
for token in csv.split(',').map(str::trim).filter(|t| !t.is_empty()) {
let Some(kind) = parse_kind_token(token) else {
return Err(err(
StatusCode::BAD_REQUEST,
format!("unknown edge kind '{token}'"),
));
};
out.push(kind);
}
Some(out)
}
};
let max_hops = params.max_hops.unwrap_or(2).clamp(1, 4);
let graph = {
let indexer = handle.indexer.read().await;
indexer.snapshot_symbol_graph().await
};
let neighbors: Vec<NeighborEntry> = graph
.graph_neighbors(¶ms.node, dirs, kinds.as_deref(), max_hops)
.into_iter()
.map(|(symbol, chunk_id, node_kind, edge)| NeighborEntry {
symbol,
node_kind,
chunk_id,
edge,
})
.collect();
Ok(Json(serde_json::json!({
"index_id": index_id.to_string(),
"node": params.node,
"node_kind": graph.node_kind(¶ms.node),
"direction": params.direction.as_deref().unwrap_or("both"),
"max_hops": max_hops,
"count": neighbors.len(),
"neighbors": neighbors,
})))
}