use crate::core::registry::IndexHandle;
use std::time::Instant;
use super::progress::ReindexProgress;
pub(super) struct KgRebuildOutcome {
pub symbol_count: usize,
pub edge_count: usize,
pub kg_ms: u64,
pub kg_skipped: bool,
}
pub(super) struct RunTotals {
pub walk_ms: u64,
pub parse_ms: u64,
pub embed_ms: u64,
pub bm25_ms: u64,
pub vector_upsert_ms: u64,
pub vector_count: usize,
pub mem_limit_hit: bool,
pub chunks_dropped_by_cap: usize,
}
pub(super) async fn rebuild_symbol_graph_for_reindex(handle: &IndexHandle) -> KgRebuildOutcome {
let kg_start = Instant::now();
let indexer = handle.indexer.read().await;
indexer.rebuild_symbol_graph_now().await;
let g = indexer.symbol_graph().await;
KgRebuildOutcome {
symbol_count: g.node_count(),
edge_count: g.edge_count(),
kg_ms: kg_start.elapsed().as_millis() as u64,
kg_skipped: false,
}
}
pub(super) async fn emit_complete_event(
progress: &ReindexProgress,
started: Instant,
peak_rss_mb: u64,
embedderd_peak_rss_mb: Option<u64>,
totals: &RunTotals,
kg: &KgRebuildOutcome,
) {
use std::sync::atomic::Ordering;
let total_chunks = progress.total_chunks.load(Ordering::Acquire);
let elapsed_ms = started.elapsed().as_millis() as u64;
let chunks_per_sec = (total_chunks as u64 * 1000)
.checked_div(elapsed_ms)
.unwrap_or(0);
let indexed_final = progress.indexed.load(Ordering::Acquire);
let skipped_final = progress.skipped.load(Ordering::Acquire);
let indexed_new = indexed_final.saturating_sub(skipped_final);
let status_str = if totals.mem_limit_hit {
"aborted_memory"
} else {
"complete"
};
let mut event = serde_json::json!({
"event": "complete",
"status": status_str,
"indexed": indexed_final,
"indexed_new": indexed_new,
"total_chunks": total_chunks,
"skipped": skipped_final,
"errors": progress.errors.load(Ordering::Acquire),
"elapsed_ms": elapsed_ms,
"chunks_per_sec": chunks_per_sec,
"peak_rss_mb": peak_rss_mb,
"memory_limit_hit": totals.mem_limit_hit,
"walk_truncated_by_budget": totals.chunks_dropped_by_cap > 0,
"chunks_dropped_by_cap": totals.chunks_dropped_by_cap,
"kg_skipped": kg.kg_skipped,
"timings": {
"walk_ms": totals.walk_ms,
"parse_ms": totals.parse_ms,
"embed_ms": totals.embed_ms,
"bm25_ms": totals.bm25_ms,
"vector_upsert_ms": totals.vector_upsert_ms,
"kg_ms": kg.kg_ms,
"vector_count": totals.vector_count,
"symbol_count": kg.symbol_count,
"edge_count": kg.edge_count,
},
});
if let Some(n) = embedderd_peak_rss_mb {
event["embedderd_peak_rss_mb"] = serde_json::Value::Number(n.into());
}
progress.push(event).await;
}