use std::sync::Arc;
use crate::core::corpus::contrib::{ContribEdge, ContribGraph};
use crate::core::corpus::CorpusStore;
use crate::core::entity::EdgeKind;
use super::graph::{SymbolGraph, SymbolNode};
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub struct ContribMergeStats {
pub nodes_added: usize,
pub nodes_existing: usize,
pub edges_added: usize,
pub edges_duplicate: usize,
pub edges_dangling: usize,
pub edges_unknown_kind: usize,
}
pub(crate) fn resolve_edge_kind(edge: &ContribEdge) -> Option<EdgeKind> {
if let Some(k) = edge.kind.as_deref().and_then(parse_kind_token) {
return Some(k);
}
edge.tag.as_deref().and_then(EdgeKind::from_tag)
}
pub(crate) fn parse_kind_token(token: &str) -> Option<EdgeKind> {
match token {
"reads" => Some(EdgeKind::Reads),
"writes" => Some(EdgeKind::Writes),
"references" => Some(EdgeKind::References),
"calls_function" | "calls_proc" => Some(EdgeKind::CallsFunction),
"accesses_resource" => Some(EdgeKind::AccessesResource),
other => EdgeKind::from_tag(other),
}
}
impl SymbolGraph {
pub fn merge_contrib(&mut self, graphs: &[ContribGraph]) -> ContribMergeStats {
let mut stats = ContribMergeStats::default();
for cg in graphs {
for node in &cg.nodes {
if self.by_symbol.contains_key(&node.id) {
stats.nodes_existing += 1;
continue;
}
let idx = self.graph.add_node(SymbolNode {
symbol: node.id.clone(),
chunk_id: String::new(),
file: String::new(),
kind: Some(node.kind.clone()),
});
self.by_symbol.insert(node.id.clone(), idx);
stats.nodes_added += 1;
}
for edge in &cg.edges {
let Some(kind) = resolve_edge_kind(edge) else {
stats.edges_unknown_kind += 1;
self.unknown_edge_tags_dropped += 1;
tracing::warn!(
producer = %cg.producer,
kind = ?edge.kind,
tag = ?edge.tag,
action = "skipped",
"kg: contributed edge with unresolvable kind dropped (#816 semantics)"
);
continue;
};
let (Some(&src), Some(&tgt)) =
(self.by_symbol.get(&edge.from), self.by_symbol.get(&edge.to))
else {
stats.edges_dangling += 1;
continue;
};
let duplicate = self
.graph
.edges_connecting(src, tgt)
.any(|e| e.weight() == &kind);
if duplicate {
stats.edges_duplicate += 1;
continue;
}
self.graph.add_edge(src, tgt, kind);
stats.edges_added += 1;
}
}
stats
}
pub fn node_kind(&self, symbol: &str) -> Option<&str> {
let idx = self.by_symbol.get(symbol)?;
self.graph[*idx].kind.as_deref()
}
}
pub async fn save_then_merge_contrib(
graph: Arc<SymbolGraph>,
corpus: Option<Arc<CorpusStore>>,
index_id: String,
) -> Arc<SymbolGraph> {
let Some(corpus) = corpus else {
return graph;
};
let join = tokio::task::spawn_blocking(move || {
if let Err(e) = graph.save_to_corpus(&corpus) {
tracing::warn!("index '{index_id}': kg persist failed ({e}) — graph stays in memory");
}
let contribs = match corpus.load_contrib_graphs() {
Ok(c) => c,
Err(e) => {
tracing::warn!("index '{index_id}': contrib load failed ({e}) — merge skipped");
return graph;
}
};
if contribs.is_empty() {
return graph;
}
let mut g = Arc::try_unwrap(graph).unwrap_or_else(|shared| (*shared).clone());
let stats = g.merge_contrib(&contribs);
tracing::info!(
"index '{index_id}': merged {} contributed graph(s): +{} nodes, +{} edges \
({} duplicate, {} dangling, {} unknown-kind)",
contribs.len(),
stats.nodes_added,
stats.edges_added,
stats.edges_duplicate,
stats.edges_dangling,
stats.edges_unknown_kind,
);
Arc::new(g)
})
.await;
join.unwrap_or_else(|e| {
tracing::error!(
"kg save/merge task panicked ({e}) — installing empty graph; reindex to repair"
);
Arc::new(SymbolGraph::new())
})
}