mod defer_embed;
mod hash_cache;
mod prune;
pub mod quarantine;
mod staging;
mod validate;
use defer_embed::spawn_deferred_embed_pass;
use prune::{prune_deleted_files_from_staging, to_corpus_relative_path};
pub use quarantine::ReindexQuarantine;
use crate::core::indexer::{CommitTimings, ParsedBatch};
use crate::core::memguard::{current_rss_mb, current_rss_mb_for_pid, index_memory_limit_mb};
use crate::core::registry::{IndexHandle, IndexId, IndexStages, StageState, StageStatus};
use crate::service::walker::{should_skip_content, walk_source_files_with_options, WalkOptions};
use anyhow::Context;
use crossbeam_utils::atomic::AtomicCell;
use dashmap::DashMap;
use serde::Serialize;
use sha2::{Digest, Sha256};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering as AtomicOrdering};
use std::sync::{Arc, OnceLock};
use std::time::{Duration, Instant};
use tokio::sync::{broadcast, mpsc, Mutex, Semaphore};
fn reindex_semaphore() -> &'static Semaphore {
static SEM: OnceLock<Semaphore> = OnceLock::new();
SEM.get_or_init(|| Semaphore::new(MAX_PARALLEL_REINDEXES))
}
pub(super) fn background_reindex_semaphore() -> &'static Semaphore {
static BG_SEM: OnceLock<Semaphore> = OnceLock::new();
BG_SEM.get_or_init(|| Semaphore::new(MAX_PARALLEL_BACKGROUND_REINDEXES))
}
const MAX_PARALLEL_REINDEXES: usize = 2;
const MAX_PARALLEL_BACKGROUND_REINDEXES: usize = 1;
pub fn background_reindex_queue_depth() -> usize {
BACKGROUND_QUEUE_DEPTH.load(std::sync::atomic::Ordering::Relaxed)
}
static BACKGROUND_QUEUE_DEPTH: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
pub(crate) fn reindex_semaphore_for(priority: bool) -> &'static Semaphore {
if priority {
reindex_semaphore()
} else {
background_reindex_semaphore()
}
}
const REINDEX_BATCH_SIZE: usize = 128;
fn file_hashes() -> &'static DashMap<IndexId, Arc<DashMap<PathBuf, String>>> {
static FILE_HASHES: OnceLock<DashMap<IndexId, Arc<DashMap<PathBuf, String>>>> = OnceLock::new();
FILE_HASHES.get_or_init(DashMap::new)
}
fn hashes_for(id: &IndexId) -> Arc<DashMap<PathBuf, String>> {
file_hashes()
.entry(id.clone())
.or_insert_with(|| Arc::new(DashMap::new()))
.clone()
}
const MAX_FILE_HASHES_PER_INDEX: usize = 200_000;
fn shrink_hashes_if_needed(map: &DashMap<PathBuf, String>) {
let len = map.len();
if len <= MAX_FILE_HASHES_PER_INDEX {
return;
}
let target = MAX_FILE_HASHES_PER_INDEX * 9 / 10;
let to_remove = len.saturating_sub(target);
let keys: Vec<PathBuf> = map
.iter()
.take(to_remove)
.map(|e| e.key().clone())
.collect();
for k in keys {
map.remove(&k);
}
tracing::info!(
"file-hash cache exceeded {} entries — dropped {} to bound memory",
MAX_FILE_HASHES_PER_INDEX,
to_remove
);
}
const MAX_REPLAY_EVENTS: usize = 500;
const REINDEX_PROGRESS_TTL_SECS: u64 = 60;
fn hash_content(content: &str) -> String {
let mut hasher = Sha256::new();
hasher.update(content.as_bytes());
format!("{:x}", hasher.finalize())
}
const BROADCAST_CAPACITY: usize = 256;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum ReindexStatus {
Running,
Complete,
AbortedMemory,
Failed,
}
pub struct ReindexProgress {
pub status: AtomicCell<ReindexStatus>,
pub total_files: std::sync::atomic::AtomicUsize,
pub indexed: std::sync::atomic::AtomicUsize,
pub total_chunks: std::sync::atomic::AtomicUsize,
pub errors: std::sync::atomic::AtomicUsize,
pub skipped: std::sync::atomic::AtomicUsize,
pub chunks_dropped_by_cap: std::sync::atomic::AtomicUsize,
pub events: Arc<Mutex<Vec<String>>>,
pub sender: broadcast::Sender<String>,
}
impl ReindexProgress {
pub fn new() -> Self {
let (sender, _) = broadcast::channel(BROADCAST_CAPACITY);
Self {
status: AtomicCell::new(ReindexStatus::Running),
total_files: Default::default(),
indexed: Default::default(),
total_chunks: Default::default(),
errors: Default::default(),
skipped: Default::default(),
chunks_dropped_by_cap: Default::default(),
events: Arc::new(Mutex::new(Vec::new())),
sender,
}
}
pub async fn push(&self, event: serde_json::Value) {
let line = event.to_string();
{
let mut buf = self.events.lock().await;
if buf.len() >= MAX_REPLAY_EVENTS {
buf.remove(0);
}
buf.push(line.clone());
}
let _ = self.sender.send(line);
}
}
impl Default for ReindexProgress {
fn default() -> Self {
Self::new()
}
}
pub fn spawn_reindex(handle: Arc<IndexHandle>, progress: Arc<ReindexProgress>, force: bool) {
spawn_reindex_with_cleanup(handle, progress, force, None, None, None, true, None);
}
fn collect_files_to_index(handle: &IndexHandle) -> crate::service::walker::WalkResult {
let include_paths: Vec<PathBuf> = if handle.include_paths.is_empty() {
vec![handle.root_path.clone()]
} else {
handle.include_paths.clone()
};
let mut walked_files: Vec<PathBuf> = Vec::new();
let mut total_skipped_dirs: usize = 0;
let walk_opts = WalkOptions {
include_docs: handle.include_docs,
respect_gitignore: handle.respect_gitignore,
};
for subtree in &include_paths {
let w = walk_source_files_with_options(subtree, walk_opts);
walked_files.extend(w.files);
total_skipped_dirs = total_skipped_dirs.saturating_add(w.skipped_dirs);
}
if !handle.exclude_globs.is_empty() {
let excludes = handle.exclude_globs.clone();
walked_files.retain(|p| !crate::core::repo_config::path_matches_any_glob(p, &excludes));
}
if !handle.extensions.is_empty() {
let allowed = handle.extensions.clone();
walked_files.retain(|p| {
p.extension()
.and_then(|e| e.to_str())
.map(|e| allowed.iter().any(|x| x.eq_ignore_ascii_case(e)))
.unwrap_or(false)
});
}
if !handle.path_filter.is_empty() {
let patterns = handle.path_filter.clone();
let root =
std::fs::canonicalize(&handle.root_path).unwrap_or_else(|_| handle.root_path.clone());
walked_files.retain(|p| crate::core::registry::path_matches_filter(p, &root, &patterns));
}
walked_files.sort();
walked_files.dedup();
crate::service::walker::WalkResult {
files: walked_files,
skipped_dirs: total_skipped_dirs,
}
}
fn spawn_memory_poller(
mem_limit: Option<u64>,
mem_abort: Arc<AtomicBool>,
peak_rss: Arc<AtomicU64>,
index_id: String,
) -> (tokio::task::JoinHandle<()>, Arc<AtomicBool>) {
const MEM_POLL_INTERVAL: Duration = Duration::from_secs(1);
let stop = Arc::new(AtomicBool::new(false));
let stop_clone = stop.clone();
let handle = tokio::spawn(async move {
let mut ticker = tokio::time::interval(MEM_POLL_INTERVAL);
ticker.tick().await;
loop {
if stop_clone.load(AtomicOrdering::Acquire) {
break;
}
if let Some(rss) = current_rss_mb() {
let mut prev = peak_rss.load(AtomicOrdering::Acquire);
while rss > prev {
match peak_rss.compare_exchange_weak(
prev,
rss,
AtomicOrdering::AcqRel,
AtomicOrdering::Acquire,
) {
Ok(_) => break,
Err(cur) => prev = cur,
}
}
if let Some(limit) = mem_limit {
if rss >= limit && !mem_abort.load(AtomicOrdering::Acquire) {
tracing::warn!(
"reindex memory poller: rss={}MB >= limit={}MB \
— tripping abort flag for index {}",
rss,
limit,
index_id,
);
mem_abort.store(true, AtomicOrdering::Release);
}
}
}
ticker.tick().await;
}
});
(handle, stop)
}
fn spawn_embedderd_rss_poller(
embedderd_pid_slot: Arc<AtomicU32>,
peak_embedderd_rss: Arc<AtomicU64>,
) -> (tokio::task::JoinHandle<()>, Arc<AtomicBool>) {
const EMBEDDERD_POLL_INTERVAL: Duration = Duration::from_millis(500);
let stop = Arc::new(AtomicBool::new(false));
let stop_clone = stop.clone();
let handle = tokio::spawn(async move {
let mut ticker = tokio::time::interval(EMBEDDERD_POLL_INTERVAL);
ticker.tick().await;
loop {
if stop_clone.load(AtomicOrdering::Acquire) {
break;
}
let pid = embedderd_pid_slot.load(AtomicOrdering::Acquire);
if let Some(rss) = current_rss_mb_for_pid(pid) {
let mut prev = peak_embedderd_rss.load(AtomicOrdering::Acquire);
while rss > prev {
match peak_embedderd_rss.compare_exchange_weak(
prev,
rss,
AtomicOrdering::AcqRel,
AtomicOrdering::Acquire,
) {
Ok(_) => break,
Err(cur) => prev = cur,
}
}
}
ticker.tick().await;
}
});
(handle, stop)
}
pub(super) fn now_rfc3339() -> String {
chrono::Utc::now().to_rfc3339()
}
async fn reset_stages_for_reindex(handle: &Arc<IndexHandle>) {
let mut stages = handle.stages.write().await;
if handle.lexical_only {
*stages = IndexStages {
lexical: StageState {
status: StageStatus::InProgress,
started_at: Some(now_rfc3339()),
..Default::default()
},
semantic: StageState::skipped(),
graph: StageState::skipped(),
};
} else {
let graph_init = if handle.skip_kg {
StageState::skipped()
} else {
StageState::pending()
};
*stages = IndexStages {
lexical: StageState {
status: StageStatus::InProgress,
started_at: Some(now_rfc3339()),
..Default::default()
},
semantic: StageState::pending(),
graph: graph_init,
};
}
}
async fn mark_lexical_ready_semantic_in_progress(
handle: &Arc<IndexHandle>,
files: usize,
corpus_total_chunks: usize,
total_chunks_for_embed: usize,
) {
let mut stages = handle.stages.write().await;
stages.lexical.status = StageStatus::Ready;
stages.lexical.completed_at = Some(now_rfc3339());
stages.lexical.files = Some(files);
stages.lexical.chunks = Some(corpus_total_chunks);
if !handle.lexical_only && stages.semantic.status == StageStatus::Pending {
stages.semantic.status = StageStatus::InProgress;
stages.semantic.started_at = Some(now_rfc3339());
stages.semantic.total = Some(total_chunks_for_embed);
}
}
async fn mark_semantic_ready_graph_in_progress(
handle: &Arc<IndexHandle>,
embedded: usize,
total: usize,
) {
let mut stages = handle.stages.write().await;
if handle.lexical_only {
return;
}
stages.semantic.status = StageStatus::Ready;
stages.semantic.completed_at = Some(now_rfc3339());
stages.semantic.embedded = Some(embedded);
stages.semantic.total = Some(total);
if !handle.skip_kg && stages.graph.status == StageStatus::Pending {
stages.graph.status = StageStatus::InProgress;
stages.graph.started_at = Some(now_rfc3339());
}
}
async fn mark_reindex_failed(handle: &Arc<IndexHandle>, reason: &str) {
let mut stages = handle.stages.write().await;
stages.semantic = StageState::failed(reason);
stages.graph = StageState::failed(reason);
}
async fn mark_graph_ready(handle: &Arc<IndexHandle>) {
let mut stages = handle.stages.write().await;
if handle.lexical_only || handle.skip_kg {
return;
}
stages.graph.status = StageStatus::Ready;
stages.graph.completed_at = Some(now_rfc3339());
}
fn schedule_progress_cleanup(
cleanup_map: Option<Arc<DashMap<IndexId, Arc<ReindexProgress>>>>,
cleanup_id: IndexId,
) {
let Some(map) = cleanup_map else {
return;
};
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_secs(REINDEX_PROGRESS_TTL_SECS)).await;
map.remove(&cleanup_id);
});
}
async fn refresh_context_embedding(handle: &Arc<IndexHandle>) {
use crate::service::context_inference::{make_display_summary, scrape_metadata_summary};
let Some(summary) = scrape_metadata_summary(&handle.root_path) else {
tracing::debug!(
"context_inference: no recognised metadata files under {} for index {}",
handle.root_path.display(),
handle.id.0
);
*handle.context_embedding.write().await = None;
*handle.context_summary.write().await = None;
return;
};
let display = make_display_summary(&summary);
let indexer = handle.indexer.read().await;
let embed_result = indexer.embed_text(&summary).await;
drop(indexer);
match embed_result {
Ok(Some(vec)) => {
*handle.context_embedding.write().await = Some(vec);
*handle.context_summary.write().await = Some(display);
tracing::info!(
"context_inference: refreshed context embedding for index {}",
handle.id.0
);
}
Ok(None) => {
tracing::debug!(
"context_inference: no embedder wired on index {} — skipping context embedding",
handle.id.0
);
*handle.context_embedding.write().await = None;
*handle.context_summary.write().await = Some(display);
}
Err(e) => {
tracing::warn!(
"context_inference: embed failed for index {}: {e}",
handle.id.0
);
*handle.context_embedding.write().await = None;
*handle.context_summary.write().await = Some(display);
}
}
}
#[derive(Clone)]
struct BatchCtx {
handle: Arc<IndexHandle>,
progress: Arc<ReindexProgress>,
root: PathBuf,
index_id: IndexId,
hashes: Arc<DashMap<PathBuf, String>>,
mem_limit: Option<u64>,
mem_abort: Arc<AtomicBool>,
peak_rss_atomic: Arc<AtomicU64>,
started: Instant,
total: usize,
lexical_only: bool,
defer_embed: bool,
embedder_pid_slot: Option<Arc<AtomicU32>>,
}
#[derive(Default)]
struct BatchOutcome {
parse_ms: u64,
embed_ms: u64,
bm25_ms: u64,
vector_upsert_ms: u64,
vector_count: usize,
mem_limit_hit: bool,
chunks_dropped_by_cap: usize,
}
#[allow(dead_code)]
async fn process_one_batch(ctx: &BatchCtx, batch: &[PathBuf]) -> BatchOutcome {
let Some(parsed) = prepare_and_parse_batch(ctx, batch).await else {
return BatchOutcome::default();
};
commit_parsed_and_finalize(ctx, parsed).await
}
struct ParsedReadyBatch {
parsed: ParsedBatch,
new_hashes: Vec<(PathBuf, String)>,
batch_files: usize,
}
async fn prepare_and_parse_batch(ctx: &BatchCtx, batch: &[PathBuf]) -> Option<ParsedReadyBatch> {
let payload = prepare_batch_payload(ctx, batch).await;
if payload.to_index.is_empty() {
return None;
}
let batch_files = payload.to_index.len();
let to_index = payload.to_index;
let first_batch_ever = ctx.progress.indexed.load(AtomicOrdering::Acquire) == 0;
let needs_embedder_init = !ctx.lexical_only
&& !ctx.defer_embed
&& if let Some(slot) = ctx.embedder_pid_slot.as_ref() {
slot.load(AtomicOrdering::Acquire) == 0
} else {
first_batch_ever
};
if needs_embedder_init {
ctx.progress
.push(serde_json::json!({
"event": "embedder_init",
"index_id": ctx.index_id.0,
}))
.await;
}
let parsed = {
let indexer = ctx.handle.indexer.read().await;
let result = if ctx.lexical_only || ctx.defer_embed {
indexer.parse_files_only(to_index).await
} else {
use crate::core::indexer::PROGRESS_CHUNK_INTERVAL;
use std::sync::atomic::Ordering;
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<(usize, u64)>();
let parse_result = indexer.parse_and_embed_files_tracked(to_index, tx).await;
while let Ok((wave_chunks, wave_ms)) = rx.try_recv() {
if wave_chunks >= PROGRESS_CHUNK_INTERVAL {
let cps = (wave_chunks as u64 * 1000)
.checked_div(wave_ms.max(1))
.unwrap_or(0);
ctx.progress
.push(serde_json::json!({
"event": "chunk_progress",
"chunks_done": wave_chunks as u64,
"chunks_per_sec": cps,
"embed_ms": wave_ms,
"indexed": ctx.progress.indexed.load(Ordering::Acquire),
"total_files": ctx.total,
}))
.await;
}
}
parse_result
};
match result {
Ok(p) => p,
Err(e) => {
drop(indexer);
emit_batch_error(ctx, &payload.to_index_paths, e).await;
return None;
}
}
};
if needs_embedder_init {
ctx.progress
.push(serde_json::json!({
"event": "embedder_ready",
"index_id": ctx.index_id.0,
}))
.await;
}
if !ctx.lexical_only && !ctx.defer_embed && parsed.vector_count > 0 {
use std::sync::atomic::Ordering;
let batch_chunks = parsed.chunks.len() as u64;
let chunks_per_sec = (batch_chunks * 1000)
.checked_div(parsed.embed_ms.max(1))
.unwrap_or(0);
ctx.progress
.push(serde_json::json!({
"event": "chunk_progress",
"chunks_done": batch_chunks,
"chunks_per_sec": chunks_per_sec,
"embed_ms": parsed.embed_ms,
"indexed": ctx.progress.indexed.load(Ordering::Acquire),
"total_files": ctx.total,
}))
.await;
}
Some(ParsedReadyBatch {
parsed,
new_hashes: payload.new_hashes,
batch_files,
})
}
async fn commit_parsed_and_finalize(ctx: &BatchCtx, ready: ParsedReadyBatch) -> BatchOutcome {
let ParsedReadyBatch {
parsed,
new_hashes,
batch_files,
} = ready;
let parse_ms = parsed.parse_ms;
let embed_ms = parsed.embed_ms;
let vector_count = parsed.vector_count;
let commit = {
let indexer = ctx.handle.indexer.write().await;
match indexer.commit_parsed_batch(parsed, true).await {
Ok(c) => c,
Err(e) => {
drop(indexer);
let placeholder_paths: Vec<PathBuf> =
new_hashes.iter().map(|(p, _)| p.clone()).collect();
emit_batch_error(ctx, &placeholder_paths, e).await;
return BatchOutcome::default();
}
}
};
apply_successful_commit(ctx, new_hashes, batch_files, &commit).await;
let mem_limit_hit = check_post_commit_memory(ctx);
BatchOutcome {
parse_ms,
embed_ms,
bm25_ms: commit.bm25_ms,
vector_upsert_ms: commit.vector_upsert_ms,
vector_count,
mem_limit_hit,
chunks_dropped_by_cap: commit.chunks_dropped_by_cap,
}
}
struct BatchPayload {
to_index: Vec<(String, String)>,
to_index_paths: Vec<PathBuf>,
new_hashes: Vec<(PathBuf, String)>,
}
async fn prepare_batch_payload(ctx: &BatchCtx, batch: &[PathBuf]) -> BatchPayload {
use std::sync::atomic::Ordering;
let read_futs = batch.iter().map(|path| {
let path = path.clone();
async move {
let content = tokio::fs::read_to_string(&path).await;
(path, content)
}
});
let read_results = futures::future::join_all(read_futs).await;
let mut to_index: Vec<(String, String)> = Vec::with_capacity(batch.len());
let mut to_index_paths: Vec<PathBuf> = Vec::with_capacity(batch.len());
let mut new_hashes: Vec<(PathBuf, String)> = Vec::with_capacity(batch.len());
for (path, content_res) in read_results {
let rel = to_corpus_relative_path(&ctx.root, &path);
let content = match content_res {
Ok(c) => c,
Err(e) => {
ctx.progress.errors.fetch_add(1, Ordering::Release);
ctx.progress
.push(serde_json::json!({
"event": "error",
"file": rel,
"message": format!("read: {e}"),
"indexed": ctx.progress.indexed.load(Ordering::Acquire),
"total_files": ctx.total,
}))
.await;
continue;
}
};
if should_skip_content(&path, &content) {
tracing::debug!("reindex: skipping minified content in {}", path.display());
emit_skip(ctx, &rel, Some("minified")).await;
continue;
}
let h = hash_content(&content);
if ctx
.hashes
.get(&path)
.map(|prev| *prev == h)
.unwrap_or(false)
{
emit_skip(ctx, &rel, None).await;
continue;
}
let path_str = rel.clone();
to_index.push((path_str, content));
to_index_paths.push(path.clone());
new_hashes.push((path, h));
}
BatchPayload {
to_index,
to_index_paths,
new_hashes,
}
}
async fn emit_skip(ctx: &BatchCtx, rel: &str, reason: Option<&str>) {
use std::sync::atomic::Ordering;
ctx.progress.skipped.fetch_add(1, Ordering::Release);
let indexed = ctx.progress.indexed.fetch_add(1, Ordering::Release) + 1;
let mut event = serde_json::json!({
"event": "skip",
"file": rel,
"indexed": indexed,
"total_files": ctx.total,
});
if let Some(r) = reason {
event["reason"] = serde_json::Value::String(r.to_string());
}
ctx.progress.push(event).await;
}
async fn emit_batch_error(ctx: &BatchCtx, to_index_paths: &[PathBuf], err: anyhow::Error) {
use std::sync::atomic::Ordering;
let files_in_batch: Vec<String> = to_index_paths
.iter()
.map(|p| to_corpus_relative_path(&ctx.root, p))
.collect();
ctx.progress
.errors
.fetch_add(to_index_paths.len(), Ordering::Release);
ctx.progress
.push(serde_json::json!({
"event": "error",
"files": files_in_batch,
"message": format!("batch index: {err}"),
"indexed": ctx.progress.indexed.load(Ordering::Acquire),
"total_files": ctx.total,
}))
.await;
}
async fn apply_successful_commit(
ctx: &BatchCtx,
new_hashes: Vec<(PathBuf, String)>,
batch_files: usize,
commit: &CommitTimings,
) {
use std::sync::atomic::Ordering;
let new_chunks = commit.chunks;
ctx.progress
.total_chunks
.fetch_add(new_chunks, Ordering::Release);
let indexed = ctx
.progress
.indexed
.fetch_add(batch_files, Ordering::Release)
+ batch_files;
let elapsed_ms = ctx.started.elapsed().as_millis() as u64;
let chunks_per_sec = (ctx.progress.total_chunks.load(Ordering::Acquire) as u64 * 1000)
.checked_div(elapsed_ms)
.unwrap_or(0);
for (path, h) in &new_hashes {
ctx.hashes.insert(path.clone(), h.clone());
}
shrink_hashes_if_needed(&ctx.hashes);
hash_cache::persist_batch(
&ctx.handle,
&new_hashes,
MAX_FILE_HASHES_PER_INDEX,
ctx.hashes.len(),
)
.await;
ctx.progress
.push(serde_json::json!({
"event": "batch",
"batch_files": batch_files,
"batch_chunks": new_chunks,
"indexed": indexed,
"total_files": ctx.total,
"elapsed_ms": elapsed_ms,
"chunks_per_sec": chunks_per_sec,
}))
.await;
}
fn check_post_commit_memory(ctx: &BatchCtx) -> bool {
let Some(limit) = ctx.mem_limit else {
return false;
};
let Some(rss) = current_rss_mb() else {
return false;
};
let prev_peak = ctx.peak_rss_atomic.load(AtomicOrdering::Acquire);
if rss > prev_peak {
ctx.peak_rss_atomic.store(rss, AtomicOrdering::Release);
}
if rss >= limit {
tracing::warn!(
"reindex: memory limit hit after commit \
(rss={}MB >= limit={}MB) — skipping \
remaining batches for index {}",
rss,
limit,
ctx.index_id.0
);
ctx.mem_abort.store(true, AtomicOrdering::Release);
return true;
}
false
}
struct KgRebuildOutcome {
symbol_count: usize,
edge_count: usize,
kg_ms: u64,
kg_skipped: bool,
}
async fn rebuild_symbol_graph_for_reindex(handle: &IndexHandle) -> KgRebuildOutcome {
let kg_start = Instant::now();
let indexer = handle.indexer.read().await;
indexer.rebuild_symbol_graph_now().await;
let g = indexer.symbol_graph().await;
KgRebuildOutcome {
symbol_count: g.node_count(),
edge_count: g.edge_count(),
kg_ms: kg_start.elapsed().as_millis() as u64,
kg_skipped: false,
}
}
struct RunTotals {
walk_ms: u64,
parse_ms: u64,
embed_ms: u64,
bm25_ms: u64,
vector_upsert_ms: u64,
vector_count: usize,
mem_limit_hit: bool,
chunks_dropped_by_cap: usize,
}
async fn emit_complete_event(
progress: &ReindexProgress,
started: Instant,
peak_rss_mb: u64,
embedderd_peak_rss_mb: Option<u64>,
totals: &RunTotals,
kg: &KgRebuildOutcome,
) {
use std::sync::atomic::Ordering;
let total_chunks = progress.total_chunks.load(Ordering::Acquire);
let elapsed_ms = started.elapsed().as_millis() as u64;
let chunks_per_sec = (total_chunks as u64 * 1000)
.checked_div(elapsed_ms)
.unwrap_or(0);
let indexed_final = progress.indexed.load(Ordering::Acquire);
let skipped_final = progress.skipped.load(Ordering::Acquire);
let indexed_new = indexed_final.saturating_sub(skipped_final);
let status_str = if totals.mem_limit_hit {
"aborted_memory"
} else {
"complete"
};
let mut event = serde_json::json!({
"event": "complete",
"status": status_str,
"indexed": indexed_final,
"indexed_new": indexed_new,
"total_chunks": total_chunks,
"skipped": skipped_final,
"errors": progress.errors.load(Ordering::Acquire),
"elapsed_ms": elapsed_ms,
"chunks_per_sec": chunks_per_sec,
"peak_rss_mb": peak_rss_mb,
"memory_limit_hit": totals.mem_limit_hit,
"walk_truncated_by_budget": totals.chunks_dropped_by_cap > 0,
"chunks_dropped_by_cap": totals.chunks_dropped_by_cap,
"kg_skipped": kg.kg_skipped,
"timings": {
"walk_ms": totals.walk_ms,
"parse_ms": totals.parse_ms,
"embed_ms": totals.embed_ms,
"bm25_ms": totals.bm25_ms,
"vector_upsert_ms": totals.vector_upsert_ms,
"kg_ms": kg.kg_ms,
"vector_count": totals.vector_count,
"symbol_count": kg.symbol_count,
"edge_count": kg.edge_count,
},
});
if let Some(n) = embedderd_peak_rss_mb {
event["embedderd_peak_rss_mb"] = serde_json::Value::Number(n.into());
}
progress.push(event).await;
}
async fn begin_force_corpus_swap(
handle: &IndexHandle,
index_id: &IndexId,
force: bool,
) -> Result<Option<PathBuf>, anyhow::Error> {
let live_corpus = {
let indexer = handle.indexer.read().await;
if !indexer.has_corpus_store() {
return Ok(None);
}
if !force {
indexer.corpus_store()
} else {
None
}
};
let is_incremental_carryover = live_corpus.is_some();
let tmp_path = if crate::service::colocated_storage::has_colocated_storage(&handle.root_path) {
match crate::service::colocated_storage::colocated_redb_tmp_path(&handle.root_path) {
Ok(p) => p,
Err(e) => {
tracing::warn!(
"force reindex: cannot resolve colocated staging corpus path for '{}' ({e}) — \
reindex will write directly to the live corpus",
index_id.0
);
return Ok(None);
}
}
} else {
match crate::service::persistence::corpus_redb_tmp_path(&index_id.0) {
Ok(p) => p,
Err(e) => {
tracing::warn!(
"force reindex: cannot resolve staging corpus path for '{}' ({e}) — \
reindex will write directly to the live corpus",
index_id.0
);
return Ok(None);
}
}
};
let tmp_for_open = tmp_path.clone();
let index_id_str = index_id.0.clone();
let staged_result = tokio::task::spawn_blocking(move || {
let store = crate::core::corpus::CorpusStore::open_fresh(&tmp_for_open)?;
if let Some(live) = live_corpus {
store.copy_all_from(&live).with_context(|| {
format!(
"reindex[{index_id_str}]: failed to seed staging corpus from live corpus — \
aborting incremental reindex to preserve live corpus integrity"
)
})?;
}
Ok::<_, anyhow::Error>(store)
})
.await;
let staged = match staged_result {
Ok(Ok(store)) => store,
Ok(Err(e)) => {
if is_incremental_carryover {
tracing::error!(
"reindex[{}]: ABORTING — could not copy live corpus into staging store ({e}); \
live corpus remains intact",
index_id.0
);
let _ = std::fs::remove_file(&tmp_path);
return Err(e);
}
tracing::warn!(
"force reindex: could not open staging corpus for '{}' ({e}) — \
reindex will write directly to the live corpus",
index_id.0
);
return Ok(None);
}
Err(e) => {
tracing::warn!(
"force reindex: staging corpus open task panicked for '{}': {e}",
index_id.0
);
return Ok(None);
}
};
let mut indexer = handle.indexer.write().await;
let _prev = indexer.swap_corpus_store(Arc::new(staged));
drop(indexer);
tracing::info!(
"force reindex: staging rebuilt corpus for '{}' in {}",
index_id.0,
tmp_path.display()
);
Ok(Some(tmp_path))
}
async fn commit_force_corpus_swap(handle: &IndexHandle, index_id: &IndexId, tmp_path: &Path) {
let live_path = if crate::service::colocated_storage::has_colocated_storage(&handle.root_path) {
match crate::service::colocated_storage::colocated_redb_path(&handle.root_path) {
Ok(p) => p,
Err(e) => {
tracing::warn!(
"force reindex: cannot resolve colocated live corpus path for '{}' ({e}) — \
staged corpus left at {}",
index_id.0,
tmp_path.display()
);
return;
}
}
} else {
match crate::service::persistence::corpus_redb_path(&index_id.0) {
Ok(p) => p,
Err(e) => {
tracing::warn!(
"force reindex: cannot resolve live corpus path for '{}' ({e}) — \
staged corpus left at {}",
index_id.0,
tmp_path.display()
);
return;
}
}
};
{
let mut indexer = handle.indexer.write().await;
let _ = indexer.take_corpus_store();
}
let tmp = tmp_path.to_path_buf();
let live = live_path.clone();
let index_id_inner = index_id.0.clone();
let reopened = tokio::task::spawn_blocking(
move || -> anyhow::Result<crate::core::corpus::CorpusStore> {
std::fs::rename(&tmp, &live).with_context(|| {
format!(
"atomic-swap rename {} -> {} for '{index_id_inner}'",
tmp.display(),
live.display()
)
})?;
crate::core::corpus::CorpusStore::open(&live)
.with_context(|| format!("re-open swapped corpus for '{index_id_inner}'"))
},
)
.await;
match reopened {
Ok(Ok(store)) => {
handle
.indexer
.write()
.await
.set_corpus_store(Arc::new(store));
tracing::info!(
"force reindex: atomically swapped rebuilt corpus into {} for '{}'",
live_path.display(),
index_id.0
);
}
Ok(Err(e)) => tracing::warn!(
"force reindex: atomic corpus swap failed for '{}' ({e}) — \
previous corpus preserved; in-memory state is the rebuilt one",
index_id.0
),
Err(e) => tracing::warn!(
"force reindex: atomic corpus swap task panicked for '{}': {e}",
index_id.0
),
}
}
async fn abort_force_corpus_swap(handle: &IndexHandle, index_id: &IndexId, tmp_path: &Path) {
{
let mut indexer = handle.indexer.write().await;
let _ = indexer.take_corpus_store();
}
let live_path = if crate::service::colocated_storage::has_colocated_storage(&handle.root_path) {
crate::service::colocated_storage::colocated_redb_path(&handle.root_path)
} else {
crate::service::persistence::corpus_redb_path(&index_id.0)
};
let tmp = tmp_path.to_path_buf();
let index_id_inner = index_id.0.clone();
let restored = tokio::task::spawn_blocking(
move || -> anyhow::Result<Option<crate::core::corpus::CorpusStore>> {
match std::fs::remove_file(&tmp) {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => tracing::warn!(
"force reindex: could not delete staging corpus {} for '{index_id_inner}': {e}",
tmp.display()
),
}
match live_path {
Ok(live) => Ok(Some(crate::core::corpus::CorpusStore::open(&live)?)),
Err(e) => {
tracing::warn!(
"force reindex: cannot resolve live corpus path for '{index_id_inner}' \
({e}) — index left without a durable corpus until next restart"
);
Ok(None)
}
}
},
)
.await;
match restored {
Ok(Ok(Some(store))) => {
handle
.indexer
.write()
.await
.set_corpus_store(Arc::new(store));
tracing::warn!(
"force reindex: aborted — discarded staging corpus and restored the \
original durable corpus for '{}'",
index_id.0
);
}
Ok(Ok(None)) => {}
Ok(Err(e)) => tracing::warn!(
"force reindex: could not restore the original corpus for '{}' after abort ({e})",
index_id.0
),
Err(e) => tracing::warn!(
"force reindex: corpus-restore task panicked for '{}': {e}",
index_id.0
),
}
}
struct ReindexTerminationGuard {
progress: Arc<ReindexProgress>,
armed: bool,
}
impl ReindexTerminationGuard {
fn new(progress: Arc<ReindexProgress>) -> Self {
Self {
progress,
armed: true,
}
}
fn disarm(&mut self) {
self.armed = false;
}
}
impl Drop for ReindexTerminationGuard {
fn drop(&mut self) {
if !self.armed {
return;
}
self.progress.status.store(ReindexStatus::Failed);
let msg = serde_json::json!({
"event": "error",
"message": "reindex task exited unexpectedly — check daemon logs for details"
})
.to_string();
let _ = self.progress.sender.send(msg);
}
}
#[allow(clippy::too_many_arguments)] pub fn spawn_reindex_with_cleanup(
handle: Arc<IndexHandle>,
progress: Arc<ReindexProgress>,
force: bool,
cleanup_map: Option<Arc<DashMap<IndexId, Arc<ReindexProgress>>>>,
aborted_map: Option<Arc<DashMap<IndexId, Instant>>>,
embedderd_pid_slot: Option<Arc<AtomicU32>>,
priority: bool,
quarantine: Option<quarantine::ReindexQuarantine>,
) {
use std::sync::atomic::Ordering as AtomicOrd;
if !priority {
BACKGROUND_QUEUE_DEPTH.fetch_add(1, AtomicOrd::Relaxed);
}
let cleanup_id = handle.id.clone();
tokio::spawn(async move {
use std::sync::atomic::Ordering;
let _permit = reindex_semaphore_for(priority)
.acquire()
.await
.expect("reindex semaphore is never closed");
if !priority {
BACKGROUND_QUEUE_DEPTH.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
}
let mut term_guard = ReindexTerminationGuard::new(Arc::clone(&progress));
let started = Instant::now();
let root = handle.root_path.clone();
let canonical_root = validate::canonical_walk_root(&root);
let index_id: IndexId = handle.id.clone();
reset_stages_for_reindex(&handle).await;
{
let mut diag = handle.walk_diagnostics.write().await;
diag.last_walk_started_at = Some(now_rfc3339());
diag.last_walk_files_seen = 0;
diag.last_walk_files_skipped = 0;
diag.last_walk_error = None;
}
let walk = collect_files_to_index(&handle);
let walk_ms = started.elapsed().as_millis() as u64;
let total = walk.files.len();
{
let mut diag = handle.walk_diagnostics.write().await;
diag.last_walk_files_seen = total as u64;
diag.last_walk_files_skipped = walk.skipped_dirs as u64;
if total == 0 {
let reason = if !handle.root_path.exists() {
format!("root path does not exist: {}", handle.root_path.display())
} else {
format!(
"walk produced zero files under {}; check gitignore rules, \
path_filter, and extension allow-list",
handle.root_path.display()
)
};
diag.last_walk_error = Some(reason);
}
}
progress.total_files.store(total, Ordering::Release);
let hashes = hashes_for(&index_id);
let prior_indexed_root = handle.read_indexed_root().await.unwrap_or(None);
let root_moved =
validate::needs_path_relativization(prior_indexed_root.as_deref(), &canonical_root);
let hashes_loaded: usize = if force {
hashes.clear();
hash_cache::clear_persisted(&handle).await; 0
} else if root_moved {
tracing::warn!(
"reindex[{}]: index root moved from {:?} to {} — clearing hash \
cache to re-relativize all chunk paths against the new root",
index_id.0,
prior_indexed_root,
canonical_root.display(),
);
hashes.clear();
hash_cache::clear_persisted(&handle).await; 0
} else {
hash_cache::load_into_cache(&handle, &hashes).await
};
progress
.push(serde_json::json!({
"event": "walk_complete",
"total_files": total,
"index_id": index_id.0,
}))
.await;
let effective_defer_embed = handle.defer_embed && !handle.lexical_only;
progress
.push(serde_json::json!({
"event": "start",
"total_files": total,
"index_id": index_id.0,
"root_path": root,
"force": force,
"lexical_only": handle.lexical_only,
"hashes_loaded": hashes_loaded,
"defer_embed": effective_defer_embed,
}))
.await;
if !handle.lexical_only {
let warm_indexer = Arc::clone(&handle.indexer);
let warm_index_id = index_id.0.clone();
let warm_ms = started;
tokio::spawn(async move {
tracing::debug!("reindex[{warm_index_id}]: starting concurrent embedder warm-up");
let t0 = std::time::Instant::now();
warm_indexer.read().await.warm_embedder().await;
tracing::info!(
"reindex[{warm_index_id}]: embedder warm-up complete in {}ms \
(started {}ms after reindex began)",
t0.elapsed().as_millis(),
warm_ms.elapsed().as_millis(),
);
});
}
let corpus_swap_tmp: Option<PathBuf> =
if staging::should_stage(handle.indexer.read().await.has_corpus_store()) {
match begin_force_corpus_swap(&handle, &index_id, force).await {
Ok(path) => path,
Err(e) => {
tracing::error!(
"reindex[{}]: ABORTING incremental reindex — carryover copy \
from live corpus failed ({e}); live corpus is intact",
index_id.0
);
mark_reindex_failed(&handle, "carryover copy failed — live corpus intact")
.await;
progress.status.store(ReindexStatus::Failed);
progress
.push(serde_json::json!({
"event": "error",
"index_id": index_id.0,
"message": format!(
"incremental reindex aborted: failed to copy live corpus \
into staging store ({e}) — live corpus is intact"
),
"fatal": true,
}))
.await;
term_guard.disarm();
schedule_progress_cleanup(cleanup_map, cleanup_id);
if let Some(ref q) = quarantine {
q.record_failure(&index_id);
}
return;
}
}
} else {
None
};
let mut total_parse_ms: u64 = 0;
let mut total_embed_ms: u64 = 0;
let mut total_bm25_ms: u64 = 0;
let mut total_vector_upsert_ms: u64 = 0;
let mut total_vector_count: usize = 0;
let mut total_chunks_dropped_by_cap: usize = 0;
let mem_limit = index_memory_limit_mb();
let mem_abort = Arc::new(AtomicBool::new(false));
let peak_rss_atomic = Arc::new(AtomicU64::new(current_rss_mb().unwrap_or(0)));
let mut mem_limit_hit: bool = false;
let (poller_handle, poller_stop) = spawn_memory_poller(
mem_limit,
mem_abort.clone(),
peak_rss_atomic.clone(),
index_id.0.clone(),
);
let peak_embedderd_rss_atomic = Arc::new(AtomicU64::new(0));
let (embedderd_poller_handle, embedderd_poller_stop) =
if let Some(pid_slot) = embedderd_pid_slot.as_ref() {
let initial_pid = pid_slot.load(AtomicOrdering::Acquire);
if let Some(rss) = current_rss_mb_for_pid(initial_pid) {
peak_embedderd_rss_atomic.store(rss, AtomicOrdering::Release);
}
let (h, s) = spawn_embedderd_rss_poller(
Arc::clone(pid_slot),
Arc::clone(&peak_embedderd_rss_atomic),
);
(Some(h), Some(s))
} else {
(None, None)
};
let ctx = BatchCtx {
handle: handle.clone(),
progress: progress.clone(),
root: canonical_root.clone(),
index_id: index_id.clone(),
hashes: hashes.clone(),
mem_limit,
mem_abort: mem_abort.clone(),
peak_rss_atomic: peak_rss_atomic.clone(),
started,
total,
lexical_only: handle.lexical_only,
defer_embed: handle.defer_embed && !handle.lexical_only,
embedder_pid_slot: embedderd_pid_slot.clone(),
};
let batches: Vec<Vec<PathBuf>> = walk
.files
.chunks(REINDEX_BATCH_SIZE)
.map(|b| b.to_vec())
.collect();
let (tx, mut rx) = mpsc::channel::<ParsedReadyBatch>(1);
let producer_ctx = ctx.clone();
let producer_mem_abort = mem_abort.clone();
let producer_index_id = index_id.0.clone();
let producer = tokio::spawn(async move {
for batch in batches {
if producer_mem_abort.load(AtomicOrdering::Acquire) {
let rss = current_rss_mb().unwrap_or(0);
tracing::warn!(
"reindex: memory limit hit before batch (rss={}MB, \
limit={:?}MB) — producer halting for index {}",
rss,
producer_ctx.mem_limit,
producer_index_id
);
break;
}
let Some(ready) = prepare_and_parse_batch(&producer_ctx, &batch).await else {
continue;
};
if tx.send(ready).await.is_err() {
break;
}
}
});
while let Some(ready) = rx.recv().await {
let outcome = commit_parsed_and_finalize(&ctx, ready).await;
total_parse_ms = total_parse_ms.saturating_add(outcome.parse_ms);
total_embed_ms = total_embed_ms.saturating_add(outcome.embed_ms);
total_bm25_ms = total_bm25_ms.saturating_add(outcome.bm25_ms);
total_vector_upsert_ms =
total_vector_upsert_ms.saturating_add(outcome.vector_upsert_ms);
total_vector_count = total_vector_count.saturating_add(outcome.vector_count);
total_chunks_dropped_by_cap =
total_chunks_dropped_by_cap.saturating_add(outcome.chunks_dropped_by_cap);
if outcome.chunks_dropped_by_cap > 0 {
progress.chunks_dropped_by_cap.fetch_add(
outcome.chunks_dropped_by_cap,
std::sync::atomic::Ordering::Release,
);
}
if outcome.mem_limit_hit {
mem_limit_hit = true;
rx.close();
while rx.recv().await.is_some() {}
break;
}
}
let _ = producer.await;
let memory_aborted = mem_limit_hit || mem_abort.load(AtomicOrdering::Acquire);
if corpus_swap_tmp.is_some() && !force && !memory_aborted {
prune_deleted_files_from_staging(
&handle,
&walk.files,
&canonical_root,
&hashes,
&index_id,
)
.await;
}
let embedder_present = handle.indexer.read().await.has_embedder();
let reindex_outcome = validate::reindex_outcome(
handle.lexical_only,
ctx.defer_embed,
embedder_present,
total,
progress.skipped.load(AtomicOrdering::Acquire),
total_vector_count,
);
let staging_resolution = staging::resolve_staging(memory_aborted, &reindex_outcome);
{
let files_done = progress.indexed.load(AtomicOrdering::Acquire);
let corpus_total_chunks = {
let indexer = handle.indexer.read().await;
indexer
.corpus_arc()
.and_then(|c| c.chunk_count().ok())
.unwrap_or_else(|| {
let in_mem = indexer.chunk_count();
if in_mem > 0 {
in_mem
} else {
progress.total_chunks.load(AtomicOrdering::Acquire)
}
})
};
mark_lexical_ready_semantic_in_progress(
&handle,
files_done,
corpus_total_chunks,
total_vector_count,
)
.await;
}
{
let indexer = handle.indexer.read().await;
indexer.force_incremental_persist();
}
if let Some(tmp_path) = &corpus_swap_tmp {
if staging_resolution.is_commit() {
commit_force_corpus_swap(&handle, &index_id, tmp_path).await;
if let Err(e) = handle.write_indexed_root(&canonical_root).await {
tracing::warn!(
"reindex[{}]: failed to persist indexed_root {} ({e}) — \
a future root-move may not re-relativize paths",
index_id.0,
canonical_root.display(),
);
}
} else {
if let staging::StagingResolution::Rollback { reason } = &staging_resolution {
tracing::warn!(
"reindex[{}]: rolling back staged corpus — {reason}",
index_id.0,
);
}
abort_force_corpus_swap(&handle, &index_id, tmp_path).await;
}
} else if reindex_outcome.is_ready() && !memory_aborted {
if let Err(e) = handle.write_indexed_root(&canonical_root).await {
tracing::debug!(
"reindex[{}]: indexed_root not persisted (no durable corpus): {e}",
index_id.0,
);
}
}
if let Some(reason) = reindex_outcome.failure_reason() {
let embed_failure_count = progress.errors.load(AtomicOrdering::Acquire);
tracing::error!(
"reindex[{}]: FAILED — {reason} (walked_files={}, vectors=0, \
embed_failure_count={})",
index_id.0,
total,
embed_failure_count,
);
mark_reindex_failed(&handle, reason).await;
progress.status.store(ReindexStatus::Failed);
progress
.push(serde_json::json!({
"event": "error",
"index_id": index_id.0,
"message": reason,
"embed_failure_count": embed_failure_count,
"walked_files": total,
"vector_count": 0,
"fatal": true,
}))
.await;
term_guard.disarm();
poller_stop.store(true, AtomicOrdering::Release);
let _ = poller_handle.await;
if let Some(stop) = embedderd_poller_stop {
stop.store(true, AtomicOrdering::Release);
}
if let Some(h) = embedderd_poller_handle {
let _ = h.await;
}
if let Some(ref q) = quarantine {
q.record_failure(&index_id);
}
schedule_progress_cleanup(cleanup_map, cleanup_id);
return;
}
mark_semantic_ready_graph_in_progress(
&handle,
total_vector_count,
progress.total_chunks.load(AtomicOrdering::Acquire),
)
.await;
let kg = if handle.skip_kg {
tracing::info!(
"reindex[{}]: KG construction skipped (skip_kg=true)",
index_id.0,
);
KgRebuildOutcome {
symbol_count: 0,
edge_count: 0,
kg_ms: 0,
kg_skipped: true,
}
} else {
progress
.push(serde_json::json!({
"event": "kg_start",
"index_id": index_id.0,
}))
.await;
let outcome = rebuild_symbol_graph_for_reindex(&handle).await;
progress
.push(serde_json::json!({
"event": "kg_complete",
"index_id": index_id.0,
"kg_ms": outcome.kg_ms,
"symbol_count": outcome.symbol_count,
"edge_count": outcome.edge_count,
}))
.await;
mark_graph_ready(&handle).await;
if mem_limit_hit || mem_abort.load(AtomicOrdering::Acquire) {
tracing::warn!(
"reindex: memory limit was breached during batch processing for \
index {} (peak_rss={}MB, limit={:?}MB) — KG was still rebuilt \
(symbols={}, edges={}) because graph construction is bounded by \
TRUSTY_MAX_KG_NODES and independent of the embedding spike",
index_id.0,
peak_rss_atomic.load(AtomicOrdering::Acquire),
mem_limit,
outcome.symbol_count,
outcome.edge_count,
);
}
outcome
};
poller_stop.store(true, AtomicOrdering::Release);
let _ = poller_handle.await;
if let Some(stop) = embedderd_poller_stop {
stop.store(true, AtomicOrdering::Release);
}
if let Some(h) = embedderd_poller_handle {
let _ = h.await;
}
if let Some(pid_slot) = embedderd_pid_slot.as_ref() {
let pid = pid_slot.load(AtomicOrdering::Acquire);
if let Some(rss) = current_rss_mb_for_pid(pid) {
let prev = peak_embedderd_rss_atomic.load(AtomicOrdering::Acquire);
if rss > prev {
peak_embedderd_rss_atomic.store(rss, AtomicOrdering::Release);
}
}
}
let embedderd_peak_rss_mb: Option<u64> = if embedderd_pid_slot.is_some() {
let v = peak_embedderd_rss_atomic.load(AtomicOrdering::Acquire);
if v > 0 {
Some(v)
} else {
None
}
} else {
None
};
let aborted_memory = mem_limit_hit || mem_abort.load(AtomicOrdering::Acquire);
if aborted_memory {
progress.status.store(ReindexStatus::AbortedMemory);
if let Some(map) = aborted_map.as_ref() {
map.insert(index_id.clone(), Instant::now());
}
} else {
progress.status.store(ReindexStatus::Complete);
let new_sha = crate::core::git::head_sha(&handle.root_path);
*handle.indexed_head_sha.write().await = new_sha;
*handle.last_indexed_at.write().await = Some(now_rfc3339());
}
if let Some(rss) = current_rss_mb() {
let prev = peak_rss_atomic.load(AtomicOrdering::Acquire);
if rss > prev {
peak_rss_atomic.store(rss, AtomicOrdering::Release);
}
}
let peak_rss_mb = peak_rss_atomic.load(AtomicOrdering::Acquire);
let indexed_final = progress.indexed.load(Ordering::Acquire);
let total_chunks = progress.total_chunks.load(Ordering::Acquire);
let skipped_final = progress.skipped.load(Ordering::Acquire);
let elapsed_ms = started.elapsed().as_millis() as u64;
let indexed_new = indexed_final.saturating_sub(skipped_final);
tracing::info!(
"reindex complete: index={} files={} indexed_new={} skipped={} chunks={} \
elapsed_ms={} peak_rss_mb={} memory_limit_hit={}",
index_id.0,
indexed_final,
indexed_new,
skipped_final,
total_chunks,
elapsed_ms,
peak_rss_mb,
mem_limit_hit,
);
let model_load_approx_ms = elapsed_ms
.saturating_sub(walk_ms)
.saturating_sub(total_parse_ms)
.saturating_sub(total_embed_ms)
.saturating_sub(total_bm25_ms)
.saturating_sub(total_vector_upsert_ms)
.saturating_sub(kg.kg_ms);
tracing::info!(
"reindex phase timings: index={} walk={}ms parse={}ms \
model_load_approx={}ms embed={}ms bm25={}ms vector_upsert={}ms \
kg={}ms total={}ms",
index_id.0,
walk_ms,
total_parse_ms,
model_load_approx_ms,
total_embed_ms,
total_bm25_ms,
total_vector_upsert_ms,
kg.kg_ms,
elapsed_ms,
);
let totals = RunTotals {
walk_ms,
parse_ms: total_parse_ms,
embed_ms: total_embed_ms,
bm25_ms: total_bm25_ms,
vector_upsert_ms: total_vector_upsert_ms,
vector_count: total_vector_count,
mem_limit_hit,
chunks_dropped_by_cap: total_chunks_dropped_by_cap,
};
emit_complete_event(
&progress,
started,
peak_rss_mb,
embedderd_peak_rss_mb,
&totals,
&kg,
)
.await;
term_guard.disarm();
if let Some(ref q) = quarantine {
q.record_success(&index_id);
}
refresh_context_embedding(&handle).await;
let has_embedder = handle.indexer.read().await.has_embedder();
if ctx.defer_embed && !aborted_memory && has_embedder {
spawn_deferred_embed_pass(handle, progress.clone());
}
schedule_progress_cleanup(cleanup_map, cleanup_id);
});
}
#[cfg(test)]
mod tests;