use crate::core::indexer::{CommitTimings, ParsedBatch};
use crate::core::memguard::{current_rss_mb, current_rss_mb_for_pid, index_memory_limit_mb};
use crate::core::registry::{IndexHandle, IndexId, IndexStages, StageState, StageStatus};
use crate::service::walker::{should_skip_content, walk_source_files_with_options, WalkOptions};
use anyhow::Context;
use crossbeam_utils::atomic::AtomicCell;
use dashmap::DashMap;
use serde::Serialize;
use sha2::{Digest, Sha256};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering as AtomicOrdering};
use std::sync::{Arc, OnceLock};
use std::time::{Duration, Instant};
use tokio::sync::{broadcast, mpsc, Mutex, Semaphore};
fn reindex_semaphore() -> &'static Semaphore {
static SEM: OnceLock<Semaphore> = OnceLock::new();
SEM.get_or_init(|| Semaphore::new(MAX_PARALLEL_REINDEXES))
}
fn background_reindex_semaphore() -> &'static Semaphore {
static BG_SEM: OnceLock<Semaphore> = OnceLock::new();
BG_SEM.get_or_init(|| Semaphore::new(MAX_PARALLEL_BACKGROUND_REINDEXES))
}
const MAX_PARALLEL_REINDEXES: usize = 2;
const MAX_PARALLEL_BACKGROUND_REINDEXES: usize = 1;
pub fn background_reindex_queue_depth() -> usize {
BACKGROUND_QUEUE_DEPTH.load(std::sync::atomic::Ordering::Relaxed)
}
static BACKGROUND_QUEUE_DEPTH: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
pub(crate) fn reindex_semaphore_for(priority: bool) -> &'static Semaphore {
if priority {
reindex_semaphore()
} else {
background_reindex_semaphore()
}
}
const REINDEX_BATCH_SIZE: usize = 128;
fn file_hashes() -> &'static DashMap<IndexId, Arc<DashMap<PathBuf, String>>> {
static FILE_HASHES: OnceLock<DashMap<IndexId, Arc<DashMap<PathBuf, String>>>> = OnceLock::new();
FILE_HASHES.get_or_init(DashMap::new)
}
fn hashes_for(id: &IndexId) -> Arc<DashMap<PathBuf, String>> {
file_hashes()
.entry(id.clone())
.or_insert_with(|| Arc::new(DashMap::new()))
.clone()
}
const MAX_FILE_HASHES_PER_INDEX: usize = 200_000;
fn shrink_hashes_if_needed(map: &DashMap<PathBuf, String>) {
let len = map.len();
if len <= MAX_FILE_HASHES_PER_INDEX {
return;
}
let target = MAX_FILE_HASHES_PER_INDEX * 9 / 10;
let to_remove = len.saturating_sub(target);
let keys: Vec<PathBuf> = map
.iter()
.take(to_remove)
.map(|e| e.key().clone())
.collect();
for k in keys {
map.remove(&k);
}
tracing::info!(
"file-hash cache exceeded {} entries — dropped {} to bound memory",
MAX_FILE_HASHES_PER_INDEX,
to_remove
);
}
const MAX_REPLAY_EVENTS: usize = 500;
const REINDEX_PROGRESS_TTL_SECS: u64 = 60;
fn hash_content(content: &str) -> String {
let mut hasher = Sha256::new();
hasher.update(content.as_bytes());
format!("{:x}", hasher.finalize())
}
const BROADCAST_CAPACITY: usize = 256;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum ReindexStatus {
Running,
Complete,
AbortedMemory,
Failed,
}
pub struct ReindexProgress {
pub status: AtomicCell<ReindexStatus>,
pub total_files: std::sync::atomic::AtomicUsize,
pub indexed: std::sync::atomic::AtomicUsize,
pub total_chunks: std::sync::atomic::AtomicUsize,
pub errors: std::sync::atomic::AtomicUsize,
pub skipped: std::sync::atomic::AtomicUsize,
pub chunks_dropped_by_cap: std::sync::atomic::AtomicUsize,
pub events: Arc<Mutex<Vec<String>>>,
pub sender: broadcast::Sender<String>,
}
impl ReindexProgress {
pub fn new() -> Self {
let (sender, _) = broadcast::channel(BROADCAST_CAPACITY);
Self {
status: AtomicCell::new(ReindexStatus::Running),
total_files: Default::default(),
indexed: Default::default(),
total_chunks: Default::default(),
errors: Default::default(),
skipped: Default::default(),
chunks_dropped_by_cap: Default::default(),
events: Arc::new(Mutex::new(Vec::new())),
sender,
}
}
pub async fn push(&self, event: serde_json::Value) {
let line = event.to_string();
{
let mut buf = self.events.lock().await;
if buf.len() >= MAX_REPLAY_EVENTS {
buf.remove(0);
}
buf.push(line.clone());
}
let _ = self.sender.send(line);
}
}
impl Default for ReindexProgress {
fn default() -> Self {
Self::new()
}
}
pub fn spawn_reindex(handle: Arc<IndexHandle>, progress: Arc<ReindexProgress>, force: bool) {
spawn_reindex_with_cleanup(handle, progress, force, None, None, None, true);
}
fn collect_files_to_index(handle: &IndexHandle) -> crate::service::walker::WalkResult {
let include_paths: Vec<PathBuf> = if handle.include_paths.is_empty() {
vec![handle.root_path.clone()]
} else {
handle.include_paths.clone()
};
let mut walked_files: Vec<PathBuf> = Vec::new();
let mut total_skipped_dirs: usize = 0;
let walk_opts = WalkOptions {
include_docs: handle.include_docs,
respect_gitignore: handle.respect_gitignore,
};
for subtree in &include_paths {
let w = walk_source_files_with_options(subtree, walk_opts);
walked_files.extend(w.files);
total_skipped_dirs = total_skipped_dirs.saturating_add(w.skipped_dirs);
}
if !handle.exclude_globs.is_empty() {
let excludes = handle.exclude_globs.clone();
walked_files.retain(|p| !crate::core::repo_config::path_matches_any_glob(p, &excludes));
}
if !handle.extensions.is_empty() {
let allowed = handle.extensions.clone();
walked_files.retain(|p| {
p.extension()
.and_then(|e| e.to_str())
.map(|e| allowed.iter().any(|x| x.eq_ignore_ascii_case(e)))
.unwrap_or(false)
});
}
if !handle.path_filter.is_empty() {
let patterns = handle.path_filter.clone();
let root =
std::fs::canonicalize(&handle.root_path).unwrap_or_else(|_| handle.root_path.clone());
walked_files.retain(|p| crate::core::registry::path_matches_filter(p, &root, &patterns));
}
walked_files.sort();
walked_files.dedup();
crate::service::walker::WalkResult {
files: walked_files,
skipped_dirs: total_skipped_dirs,
}
}
fn spawn_memory_poller(
mem_limit: Option<u64>,
mem_abort: Arc<AtomicBool>,
peak_rss: Arc<AtomicU64>,
index_id: String,
) -> (tokio::task::JoinHandle<()>, Arc<AtomicBool>) {
const MEM_POLL_INTERVAL: Duration = Duration::from_secs(1);
let stop = Arc::new(AtomicBool::new(false));
let stop_clone = stop.clone();
let handle = tokio::spawn(async move {
let mut ticker = tokio::time::interval(MEM_POLL_INTERVAL);
ticker.tick().await;
loop {
if stop_clone.load(AtomicOrdering::Acquire) {
break;
}
if let Some(rss) = current_rss_mb() {
let mut prev = peak_rss.load(AtomicOrdering::Acquire);
while rss > prev {
match peak_rss.compare_exchange_weak(
prev,
rss,
AtomicOrdering::AcqRel,
AtomicOrdering::Acquire,
) {
Ok(_) => break,
Err(cur) => prev = cur,
}
}
if let Some(limit) = mem_limit {
if rss >= limit && !mem_abort.load(AtomicOrdering::Acquire) {
tracing::warn!(
"reindex memory poller: rss={}MB >= limit={}MB \
— tripping abort flag for index {}",
rss,
limit,
index_id,
);
mem_abort.store(true, AtomicOrdering::Release);
}
}
}
ticker.tick().await;
}
});
(handle, stop)
}
fn spawn_embedderd_rss_poller(
embedderd_pid_slot: Arc<AtomicU32>,
peak_embedderd_rss: Arc<AtomicU64>,
) -> (tokio::task::JoinHandle<()>, Arc<AtomicBool>) {
const EMBEDDERD_POLL_INTERVAL: Duration = Duration::from_millis(500);
let stop = Arc::new(AtomicBool::new(false));
let stop_clone = stop.clone();
let handle = tokio::spawn(async move {
let mut ticker = tokio::time::interval(EMBEDDERD_POLL_INTERVAL);
ticker.tick().await;
loop {
if stop_clone.load(AtomicOrdering::Acquire) {
break;
}
let pid = embedderd_pid_slot.load(AtomicOrdering::Acquire);
if let Some(rss) = current_rss_mb_for_pid(pid) {
let mut prev = peak_embedderd_rss.load(AtomicOrdering::Acquire);
while rss > prev {
match peak_embedderd_rss.compare_exchange_weak(
prev,
rss,
AtomicOrdering::AcqRel,
AtomicOrdering::Acquire,
) {
Ok(_) => break,
Err(cur) => prev = cur,
}
}
}
ticker.tick().await;
}
});
(handle, stop)
}
fn now_rfc3339() -> String {
chrono::Utc::now().to_rfc3339()
}
async fn reset_stages_for_reindex(handle: &Arc<IndexHandle>) {
let mut stages = handle.stages.write().await;
if handle.lexical_only {
*stages = IndexStages {
lexical: StageState {
status: StageStatus::InProgress,
started_at: Some(now_rfc3339()),
..Default::default()
},
semantic: StageState::skipped(),
graph: StageState::skipped(),
};
} else {
let graph_init = if handle.skip_kg {
StageState::skipped()
} else {
StageState::pending()
};
*stages = IndexStages {
lexical: StageState {
status: StageStatus::InProgress,
started_at: Some(now_rfc3339()),
..Default::default()
},
semantic: StageState::pending(),
graph: graph_init,
};
}
}
async fn mark_lexical_ready_semantic_in_progress(
handle: &Arc<IndexHandle>,
files: usize,
chunks: usize,
total_chunks: usize,
) {
let mut stages = handle.stages.write().await;
stages.lexical.status = StageStatus::Ready;
stages.lexical.completed_at = Some(now_rfc3339());
stages.lexical.files = Some(files);
stages.lexical.chunks = Some(chunks);
if !handle.lexical_only && stages.semantic.status == StageStatus::Pending {
stages.semantic.status = StageStatus::InProgress;
stages.semantic.started_at = Some(now_rfc3339());
stages.semantic.total = Some(total_chunks);
}
}
async fn mark_semantic_ready_graph_in_progress(
handle: &Arc<IndexHandle>,
embedded: usize,
total: usize,
) {
let mut stages = handle.stages.write().await;
if handle.lexical_only {
return;
}
stages.semantic.status = StageStatus::Ready;
stages.semantic.completed_at = Some(now_rfc3339());
stages.semantic.embedded = Some(embedded);
stages.semantic.total = Some(total);
if !handle.skip_kg && stages.graph.status == StageStatus::Pending {
stages.graph.status = StageStatus::InProgress;
stages.graph.started_at = Some(now_rfc3339());
}
}
async fn mark_graph_ready(handle: &Arc<IndexHandle>) {
let mut stages = handle.stages.write().await;
if handle.lexical_only || handle.skip_kg {
return;
}
stages.graph.status = StageStatus::Ready;
stages.graph.completed_at = Some(now_rfc3339());
}
fn schedule_progress_cleanup(
cleanup_map: Option<Arc<DashMap<IndexId, Arc<ReindexProgress>>>>,
cleanup_id: IndexId,
) {
let Some(map) = cleanup_map else {
return;
};
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_secs(REINDEX_PROGRESS_TTL_SECS)).await;
map.remove(&cleanup_id);
});
}
async fn refresh_context_embedding(handle: &Arc<IndexHandle>) {
use crate::service::context_inference::{make_display_summary, scrape_metadata_summary};
let Some(summary) = scrape_metadata_summary(&handle.root_path) else {
tracing::debug!(
"context_inference: no recognised metadata files under {} for index {}",
handle.root_path.display(),
handle.id.0
);
*handle.context_embedding.write().await = None;
*handle.context_summary.write().await = None;
return;
};
let display = make_display_summary(&summary);
let indexer = handle.indexer.read().await;
let embed_result = indexer.embed_text(&summary).await;
drop(indexer);
match embed_result {
Ok(Some(vec)) => {
*handle.context_embedding.write().await = Some(vec);
*handle.context_summary.write().await = Some(display);
tracing::info!(
"context_inference: refreshed context embedding for index {}",
handle.id.0
);
}
Ok(None) => {
tracing::debug!(
"context_inference: no embedder wired on index {} — skipping context embedding",
handle.id.0
);
*handle.context_embedding.write().await = None;
*handle.context_summary.write().await = Some(display);
}
Err(e) => {
tracing::warn!(
"context_inference: embed failed for index {}: {e}",
handle.id.0
);
*handle.context_embedding.write().await = None;
*handle.context_summary.write().await = Some(display);
}
}
}
#[derive(Clone)]
struct BatchCtx {
handle: Arc<IndexHandle>,
progress: Arc<ReindexProgress>,
root: PathBuf,
index_id: IndexId,
hashes: Arc<DashMap<PathBuf, String>>,
mem_limit: Option<u64>,
mem_abort: Arc<AtomicBool>,
peak_rss_atomic: Arc<AtomicU64>,
started: Instant,
total: usize,
lexical_only: bool,
}
#[derive(Default)]
struct BatchOutcome {
parse_ms: u64,
embed_ms: u64,
bm25_ms: u64,
vector_upsert_ms: u64,
vector_count: usize,
mem_limit_hit: bool,
chunks_dropped_by_cap: usize,
}
#[allow(dead_code)]
async fn process_one_batch(ctx: &BatchCtx, batch: &[PathBuf]) -> BatchOutcome {
let Some(parsed) = prepare_and_parse_batch(ctx, batch).await else {
return BatchOutcome::default();
};
commit_parsed_and_finalize(ctx, parsed).await
}
struct ParsedReadyBatch {
parsed: ParsedBatch,
new_hashes: Vec<(PathBuf, String)>,
batch_files: usize,
}
async fn prepare_and_parse_batch(ctx: &BatchCtx, batch: &[PathBuf]) -> Option<ParsedReadyBatch> {
let payload = prepare_batch_payload(ctx, batch).await;
if payload.to_index.is_empty() {
return None;
}
let batch_files = payload.to_index.len();
let to_index = payload.to_index;
let parsed = {
let indexer = ctx.handle.indexer.read().await;
let result = if ctx.lexical_only {
indexer.parse_files_only(to_index).await
} else {
indexer.parse_and_embed_files(to_index).await
};
match result {
Ok(p) => p,
Err(e) => {
drop(indexer);
emit_batch_error(ctx, &payload.to_index_paths, e).await;
return None;
}
}
};
Some(ParsedReadyBatch {
parsed,
new_hashes: payload.new_hashes,
batch_files,
})
}
async fn commit_parsed_and_finalize(ctx: &BatchCtx, ready: ParsedReadyBatch) -> BatchOutcome {
let ParsedReadyBatch {
parsed,
new_hashes,
batch_files,
} = ready;
let parse_ms = parsed.parse_ms;
let embed_ms = parsed.embed_ms;
let vector_count = parsed.vector_count;
let commit = {
let indexer = ctx.handle.indexer.write().await;
match indexer.commit_parsed_batch(parsed, true).await {
Ok(c) => c,
Err(e) => {
drop(indexer);
let placeholder_paths: Vec<PathBuf> =
new_hashes.iter().map(|(p, _)| p.clone()).collect();
emit_batch_error(ctx, &placeholder_paths, e).await;
return BatchOutcome::default();
}
}
};
apply_successful_commit(ctx, new_hashes, batch_files, &commit).await;
let mem_limit_hit = check_post_commit_memory(ctx);
BatchOutcome {
parse_ms,
embed_ms,
bm25_ms: commit.bm25_ms,
vector_upsert_ms: commit.vector_upsert_ms,
vector_count,
mem_limit_hit,
chunks_dropped_by_cap: commit.chunks_dropped_by_cap,
}
}
struct BatchPayload {
to_index: Vec<(String, String)>,
to_index_paths: Vec<PathBuf>,
new_hashes: Vec<(PathBuf, String)>,
}
async fn prepare_batch_payload(ctx: &BatchCtx, batch: &[PathBuf]) -> BatchPayload {
use std::sync::atomic::Ordering;
let read_futs = batch.iter().map(|path| {
let path = path.clone();
async move {
let content = tokio::fs::read_to_string(&path).await;
(path, content)
}
});
let read_results = futures::future::join_all(read_futs).await;
let mut to_index: Vec<(String, String)> = Vec::with_capacity(batch.len());
let mut to_index_paths: Vec<PathBuf> = Vec::with_capacity(batch.len());
let mut new_hashes: Vec<(PathBuf, String)> = Vec::with_capacity(batch.len());
for (path, content_res) in read_results {
let rel = path
.strip_prefix(&ctx.root)
.unwrap_or(&path)
.display()
.to_string();
let content = match content_res {
Ok(c) => c,
Err(e) => {
ctx.progress.errors.fetch_add(1, Ordering::Release);
ctx.progress
.push(serde_json::json!({
"event": "error",
"file": rel,
"message": format!("read: {e}"),
"indexed": ctx.progress.indexed.load(Ordering::Acquire),
"total_files": ctx.total,
}))
.await;
continue;
}
};
if should_skip_content(&path, &content) {
tracing::debug!("reindex: skipping minified content in {}", path.display());
emit_skip(ctx, &rel, Some("minified")).await;
continue;
}
let h = hash_content(&content);
if ctx
.hashes
.get(&path)
.map(|prev| *prev == h)
.unwrap_or(false)
{
emit_skip(ctx, &rel, None).await;
continue;
}
let path_str = path
.strip_prefix(&ctx.root)
.unwrap_or(&path)
.display()
.to_string();
to_index.push((path_str, content));
to_index_paths.push(path.clone());
new_hashes.push((path, h));
}
BatchPayload {
to_index,
to_index_paths,
new_hashes,
}
}
async fn emit_skip(ctx: &BatchCtx, rel: &str, reason: Option<&str>) {
use std::sync::atomic::Ordering;
ctx.progress.skipped.fetch_add(1, Ordering::Release);
let indexed = ctx.progress.indexed.fetch_add(1, Ordering::Release) + 1;
let mut event = serde_json::json!({
"event": "skip",
"file": rel,
"indexed": indexed,
"total_files": ctx.total,
});
if let Some(r) = reason {
event["reason"] = serde_json::Value::String(r.to_string());
}
ctx.progress.push(event).await;
}
async fn emit_batch_error(ctx: &BatchCtx, to_index_paths: &[PathBuf], err: anyhow::Error) {
use std::sync::atomic::Ordering;
let files_in_batch: Vec<String> = to_index_paths
.iter()
.map(|p| p.strip_prefix(&ctx.root).unwrap_or(p).display().to_string())
.collect();
ctx.progress
.errors
.fetch_add(to_index_paths.len(), Ordering::Release);
ctx.progress
.push(serde_json::json!({
"event": "error",
"files": files_in_batch,
"message": format!("batch index: {err}"),
"indexed": ctx.progress.indexed.load(Ordering::Acquire),
"total_files": ctx.total,
}))
.await;
}
async fn apply_successful_commit(
ctx: &BatchCtx,
new_hashes: Vec<(PathBuf, String)>,
batch_files: usize,
commit: &CommitTimings,
) {
use std::sync::atomic::Ordering;
let new_chunks = commit.chunks;
ctx.progress
.total_chunks
.fetch_add(new_chunks, Ordering::Release);
let indexed = ctx
.progress
.indexed
.fetch_add(batch_files, Ordering::Release)
+ batch_files;
let elapsed_ms = ctx.started.elapsed().as_millis() as u64;
let chunks_per_sec = (ctx.progress.total_chunks.load(Ordering::Acquire) as u64 * 1000)
.checked_div(elapsed_ms)
.unwrap_or(0);
for (path, h) in new_hashes {
ctx.hashes.insert(path, h);
}
shrink_hashes_if_needed(&ctx.hashes);
ctx.progress
.push(serde_json::json!({
"event": "batch",
"batch_files": batch_files,
"batch_chunks": new_chunks,
"indexed": indexed,
"total_files": ctx.total,
"elapsed_ms": elapsed_ms,
"chunks_per_sec": chunks_per_sec,
}))
.await;
}
fn check_post_commit_memory(ctx: &BatchCtx) -> bool {
let Some(limit) = ctx.mem_limit else {
return false;
};
let Some(rss) = current_rss_mb() else {
return false;
};
let prev_peak = ctx.peak_rss_atomic.load(AtomicOrdering::Acquire);
if rss > prev_peak {
ctx.peak_rss_atomic.store(rss, AtomicOrdering::Release);
}
if rss >= limit {
tracing::warn!(
"reindex: memory limit hit after commit \
(rss={}MB >= limit={}MB) — skipping \
remaining batches for index {}",
rss,
limit,
ctx.index_id.0
);
ctx.mem_abort.store(true, AtomicOrdering::Release);
return true;
}
false
}
struct KgRebuildOutcome {
symbol_count: usize,
edge_count: usize,
kg_ms: u64,
kg_skipped: bool,
}
async fn rebuild_symbol_graph_for_reindex(handle: &IndexHandle) -> KgRebuildOutcome {
let kg_start = Instant::now();
let indexer = handle.indexer.read().await;
indexer.rebuild_symbol_graph_now().await;
let g = indexer.symbol_graph().await;
KgRebuildOutcome {
symbol_count: g.node_count(),
edge_count: g.edge_count(),
kg_ms: kg_start.elapsed().as_millis() as u64,
kg_skipped: false,
}
}
struct RunTotals {
parse_ms: u64,
embed_ms: u64,
bm25_ms: u64,
vector_upsert_ms: u64,
vector_count: usize,
mem_limit_hit: bool,
chunks_dropped_by_cap: usize,
}
async fn emit_complete_event(
progress: &ReindexProgress,
started: Instant,
peak_rss_mb: u64,
embedderd_peak_rss_mb: Option<u64>,
totals: &RunTotals,
kg: &KgRebuildOutcome,
) {
use std::sync::atomic::Ordering;
let total_chunks = progress.total_chunks.load(Ordering::Acquire);
let elapsed_ms = started.elapsed().as_millis() as u64;
let chunks_per_sec = (total_chunks as u64 * 1000)
.checked_div(elapsed_ms)
.unwrap_or(0);
let indexed_final = progress.indexed.load(Ordering::Acquire);
let skipped_final = progress.skipped.load(Ordering::Acquire);
let indexed_new = indexed_final.saturating_sub(skipped_final);
let status_str = if totals.mem_limit_hit {
"aborted_memory"
} else {
"complete"
};
let mut event = serde_json::json!({
"event": "complete",
"status": status_str,
"indexed": indexed_final,
"indexed_new": indexed_new,
"total_chunks": total_chunks,
"skipped": skipped_final,
"errors": progress.errors.load(Ordering::Acquire),
"elapsed_ms": elapsed_ms,
"chunks_per_sec": chunks_per_sec,
"peak_rss_mb": peak_rss_mb,
"memory_limit_hit": totals.mem_limit_hit,
"walk_truncated_by_budget": totals.chunks_dropped_by_cap > 0,
"chunks_dropped_by_cap": totals.chunks_dropped_by_cap,
"kg_skipped": kg.kg_skipped,
"timings": {
"parse_ms": totals.parse_ms,
"embed_ms": totals.embed_ms,
"bm25_ms": totals.bm25_ms,
"vector_upsert_ms": totals.vector_upsert_ms,
"kg_ms": kg.kg_ms,
"vector_count": totals.vector_count,
"symbol_count": kg.symbol_count,
"edge_count": kg.edge_count,
},
});
if let Some(n) = embedderd_peak_rss_mb {
event["embedderd_peak_rss_mb"] = serde_json::Value::Number(n.into());
}
progress.push(event).await;
}
async fn begin_force_corpus_swap(handle: &IndexHandle, index_id: &IndexId) -> Option<PathBuf> {
{
if !handle.indexer.read().await.has_corpus_store() {
return None;
}
}
let tmp_path = if crate::service::colocated_storage::has_colocated_storage(&handle.root_path) {
match crate::service::colocated_storage::colocated_redb_tmp_path(&handle.root_path) {
Ok(p) => p,
Err(e) => {
tracing::warn!(
"force reindex: cannot resolve colocated staging corpus path for '{}' ({e}) — \
reindex will write directly to the live corpus",
index_id.0
);
return None;
}
}
} else {
match crate::service::persistence::corpus_redb_tmp_path(&index_id.0) {
Ok(p) => p,
Err(e) => {
tracing::warn!(
"force reindex: cannot resolve staging corpus path for '{}' ({e}) — \
reindex will write directly to the live corpus",
index_id.0
);
return None;
}
}
};
let tmp_for_open = tmp_path.clone();
let staged = tokio::task::spawn_blocking(move || {
crate::core::corpus::CorpusStore::open_fresh(&tmp_for_open)
})
.await;
let staged = match staged {
Ok(Ok(store)) => store,
Ok(Err(e)) => {
tracing::warn!(
"force reindex: could not open staging corpus for '{}' ({e}) — \
reindex will write directly to the live corpus",
index_id.0
);
return None;
}
Err(e) => {
tracing::warn!(
"force reindex: staging corpus open task panicked for '{}': {e}",
index_id.0
);
return None;
}
};
let mut indexer = handle.indexer.write().await;
let _prev = indexer.swap_corpus_store(Arc::new(staged));
drop(indexer);
tracing::info!(
"force reindex: staging rebuilt corpus for '{}' in {}",
index_id.0,
tmp_path.display()
);
Some(tmp_path)
}
async fn commit_force_corpus_swap(handle: &IndexHandle, index_id: &IndexId, tmp_path: &Path) {
let live_path = if crate::service::colocated_storage::has_colocated_storage(&handle.root_path) {
match crate::service::colocated_storage::colocated_redb_path(&handle.root_path) {
Ok(p) => p,
Err(e) => {
tracing::warn!(
"force reindex: cannot resolve colocated live corpus path for '{}' ({e}) — \
staged corpus left at {}",
index_id.0,
tmp_path.display()
);
return;
}
}
} else {
match crate::service::persistence::corpus_redb_path(&index_id.0) {
Ok(p) => p,
Err(e) => {
tracing::warn!(
"force reindex: cannot resolve live corpus path for '{}' ({e}) — \
staged corpus left at {}",
index_id.0,
tmp_path.display()
);
return;
}
}
};
{
let mut indexer = handle.indexer.write().await;
let _ = indexer.take_corpus_store();
}
let tmp = tmp_path.to_path_buf();
let live = live_path.clone();
let index_id_inner = index_id.0.clone();
let reopened = tokio::task::spawn_blocking(
move || -> anyhow::Result<crate::core::corpus::CorpusStore> {
std::fs::rename(&tmp, &live).with_context(|| {
format!(
"atomic-swap rename {} -> {} for '{index_id_inner}'",
tmp.display(),
live.display()
)
})?;
crate::core::corpus::CorpusStore::open(&live)
.with_context(|| format!("re-open swapped corpus for '{index_id_inner}'"))
},
)
.await;
match reopened {
Ok(Ok(store)) => {
handle
.indexer
.write()
.await
.set_corpus_store(Arc::new(store));
tracing::info!(
"force reindex: atomically swapped rebuilt corpus into {} for '{}'",
live_path.display(),
index_id.0
);
}
Ok(Err(e)) => tracing::warn!(
"force reindex: atomic corpus swap failed for '{}' ({e}) — \
previous corpus preserved; in-memory state is the rebuilt one",
index_id.0
),
Err(e) => tracing::warn!(
"force reindex: atomic corpus swap task panicked for '{}': {e}",
index_id.0
),
}
}
async fn abort_force_corpus_swap(handle: &IndexHandle, index_id: &IndexId, tmp_path: &Path) {
{
let mut indexer = handle.indexer.write().await;
let _ = indexer.take_corpus_store();
}
let live_path = if crate::service::colocated_storage::has_colocated_storage(&handle.root_path) {
crate::service::colocated_storage::colocated_redb_path(&handle.root_path)
} else {
crate::service::persistence::corpus_redb_path(&index_id.0)
};
let tmp = tmp_path.to_path_buf();
let index_id_inner = index_id.0.clone();
let restored = tokio::task::spawn_blocking(
move || -> anyhow::Result<Option<crate::core::corpus::CorpusStore>> {
match std::fs::remove_file(&tmp) {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => tracing::warn!(
"force reindex: could not delete staging corpus {} for '{index_id_inner}': {e}",
tmp.display()
),
}
match live_path {
Ok(live) => Ok(Some(crate::core::corpus::CorpusStore::open(&live)?)),
Err(e) => {
tracing::warn!(
"force reindex: cannot resolve live corpus path for '{index_id_inner}' \
({e}) — index left without a durable corpus until next restart"
);
Ok(None)
}
}
},
)
.await;
match restored {
Ok(Ok(Some(store))) => {
handle
.indexer
.write()
.await
.set_corpus_store(Arc::new(store));
tracing::warn!(
"force reindex: aborted — discarded staging corpus and restored the \
original durable corpus for '{}'",
index_id.0
);
}
Ok(Ok(None)) => {}
Ok(Err(e)) => tracing::warn!(
"force reindex: could not restore the original corpus for '{}' after abort ({e})",
index_id.0
),
Err(e) => tracing::warn!(
"force reindex: corpus-restore task panicked for '{}': {e}",
index_id.0
),
}
}
struct ReindexTerminationGuard {
progress: Arc<ReindexProgress>,
armed: bool,
}
impl ReindexTerminationGuard {
fn new(progress: Arc<ReindexProgress>) -> Self {
Self {
progress,
armed: true,
}
}
fn disarm(&mut self) {
self.armed = false;
}
}
impl Drop for ReindexTerminationGuard {
fn drop(&mut self) {
if !self.armed {
return;
}
self.progress.status.store(ReindexStatus::Failed);
let msg = serde_json::json!({
"event": "error",
"message": "reindex task exited unexpectedly — check daemon logs for details"
})
.to_string();
let _ = self.progress.sender.send(msg);
}
}
pub fn spawn_reindex_with_cleanup(
handle: Arc<IndexHandle>,
progress: Arc<ReindexProgress>,
force: bool,
cleanup_map: Option<Arc<DashMap<IndexId, Arc<ReindexProgress>>>>,
aborted_map: Option<Arc<DashMap<IndexId, Instant>>>,
embedderd_pid_slot: Option<Arc<AtomicU32>>,
priority: bool,
) {
use std::sync::atomic::Ordering as AtomicOrd;
if !priority {
BACKGROUND_QUEUE_DEPTH.fetch_add(1, AtomicOrd::Relaxed);
}
let cleanup_id = handle.id.clone();
tokio::spawn(async move {
use std::sync::atomic::Ordering;
let _permit = reindex_semaphore_for(priority)
.acquire()
.await
.expect("reindex semaphore is never closed");
if !priority {
BACKGROUND_QUEUE_DEPTH.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
}
let mut term_guard = ReindexTerminationGuard::new(Arc::clone(&progress));
let started = Instant::now();
let root = handle.root_path.clone();
let index_id: IndexId = handle.id.clone();
reset_stages_for_reindex(&handle).await;
{
let mut diag = handle.walk_diagnostics.write().await;
diag.last_walk_started_at = Some(now_rfc3339());
diag.last_walk_files_seen = 0;
diag.last_walk_files_skipped = 0;
diag.last_walk_error = None;
}
let walk = collect_files_to_index(&handle);
let total = walk.files.len();
{
let mut diag = handle.walk_diagnostics.write().await;
diag.last_walk_files_seen = total as u64;
diag.last_walk_files_skipped = walk.skipped_dirs as u64;
if total == 0 {
let reason = if !handle.root_path.exists() {
format!("root path does not exist: {}", handle.root_path.display())
} else {
format!(
"walk produced zero files under {}; check gitignore rules, \
path_filter, and extension allow-list",
handle.root_path.display()
)
};
diag.last_walk_error = Some(reason);
}
}
progress.total_files.store(total, Ordering::Release);
progress
.push(serde_json::json!({
"event": "walk_complete",
"total_files": total,
"index_id": index_id.0,
}))
.await;
progress
.push(serde_json::json!({
"event": "start",
"total_files": total,
"index_id": index_id.0,
"root_path": root,
"force": force,
"lexical_only": handle.lexical_only,
}))
.await;
let hashes = hashes_for(&index_id);
if force {
hashes.clear();
}
let corpus_swap_tmp: Option<PathBuf> = if force {
begin_force_corpus_swap(&handle, &index_id).await
} else {
None
};
let mut total_parse_ms: u64 = 0;
let mut total_embed_ms: u64 = 0;
let mut total_bm25_ms: u64 = 0;
let mut total_vector_upsert_ms: u64 = 0;
let mut total_vector_count: usize = 0;
let mut total_chunks_dropped_by_cap: usize = 0;
let mem_limit = index_memory_limit_mb();
let mem_abort = Arc::new(AtomicBool::new(false));
let peak_rss_atomic = Arc::new(AtomicU64::new(current_rss_mb().unwrap_or(0)));
let mut mem_limit_hit: bool = false;
let (poller_handle, poller_stop) = spawn_memory_poller(
mem_limit,
mem_abort.clone(),
peak_rss_atomic.clone(),
index_id.0.clone(),
);
let peak_embedderd_rss_atomic = Arc::new(AtomicU64::new(0));
let (embedderd_poller_handle, embedderd_poller_stop) =
if let Some(pid_slot) = embedderd_pid_slot.as_ref() {
let initial_pid = pid_slot.load(AtomicOrdering::Acquire);
if let Some(rss) = current_rss_mb_for_pid(initial_pid) {
peak_embedderd_rss_atomic.store(rss, AtomicOrdering::Release);
}
let (h, s) = spawn_embedderd_rss_poller(
Arc::clone(pid_slot),
Arc::clone(&peak_embedderd_rss_atomic),
);
(Some(h), Some(s))
} else {
(None, None)
};
let ctx = BatchCtx {
handle: handle.clone(),
progress: progress.clone(),
root: root.clone(),
index_id: index_id.clone(),
hashes: hashes.clone(),
mem_limit,
mem_abort: mem_abort.clone(),
peak_rss_atomic: peak_rss_atomic.clone(),
started,
total,
lexical_only: handle.lexical_only,
};
let batches: Vec<Vec<PathBuf>> = walk
.files
.chunks(REINDEX_BATCH_SIZE)
.map(|b| b.to_vec())
.collect();
let (tx, mut rx) = mpsc::channel::<ParsedReadyBatch>(1);
let producer_ctx = ctx.clone();
let producer_mem_abort = mem_abort.clone();
let producer_index_id = index_id.0.clone();
let producer = tokio::spawn(async move {
for batch in batches {
if producer_mem_abort.load(AtomicOrdering::Acquire) {
let rss = current_rss_mb().unwrap_or(0);
tracing::warn!(
"reindex: memory limit hit before batch (rss={}MB, \
limit={:?}MB) — producer halting for index {}",
rss,
producer_ctx.mem_limit,
producer_index_id
);
break;
}
let Some(ready) = prepare_and_parse_batch(&producer_ctx, &batch).await else {
continue;
};
if tx.send(ready).await.is_err() {
break;
}
}
});
while let Some(ready) = rx.recv().await {
let outcome = commit_parsed_and_finalize(&ctx, ready).await;
total_parse_ms = total_parse_ms.saturating_add(outcome.parse_ms);
total_embed_ms = total_embed_ms.saturating_add(outcome.embed_ms);
total_bm25_ms = total_bm25_ms.saturating_add(outcome.bm25_ms);
total_vector_upsert_ms =
total_vector_upsert_ms.saturating_add(outcome.vector_upsert_ms);
total_vector_count = total_vector_count.saturating_add(outcome.vector_count);
total_chunks_dropped_by_cap =
total_chunks_dropped_by_cap.saturating_add(outcome.chunks_dropped_by_cap);
if outcome.chunks_dropped_by_cap > 0 {
progress.chunks_dropped_by_cap.fetch_add(
outcome.chunks_dropped_by_cap,
std::sync::atomic::Ordering::Release,
);
}
if outcome.mem_limit_hit {
mem_limit_hit = true;
rx.close();
while rx.recv().await.is_some() {}
break;
}
}
let _ = producer.await;
{
let files_done = progress.indexed.load(AtomicOrdering::Acquire);
let chunks_done = progress.total_chunks.load(AtomicOrdering::Acquire);
mark_lexical_ready_semantic_in_progress(
&handle,
files_done,
chunks_done,
total_vector_count,
)
.await;
}
{
let indexer = handle.indexer.read().await;
indexer.force_incremental_persist();
}
if let Some(tmp_path) = &corpus_swap_tmp {
let aborted = mem_limit_hit || mem_abort.load(AtomicOrdering::Acquire);
if aborted {
abort_force_corpus_swap(&handle, &index_id, tmp_path).await;
} else {
commit_force_corpus_swap(&handle, &index_id, tmp_path).await;
}
}
mark_semantic_ready_graph_in_progress(
&handle,
total_vector_count,
progress.total_chunks.load(AtomicOrdering::Acquire),
)
.await;
let kg = if handle.skip_kg {
tracing::info!(
"reindex[{}]: KG construction skipped (skip_kg=true)",
index_id.0,
);
KgRebuildOutcome {
symbol_count: 0,
edge_count: 0,
kg_ms: 0,
kg_skipped: true,
}
} else {
progress
.push(serde_json::json!({
"event": "kg_start",
"index_id": index_id.0,
}))
.await;
let outcome = rebuild_symbol_graph_for_reindex(&handle).await;
progress
.push(serde_json::json!({
"event": "kg_complete",
"index_id": index_id.0,
"kg_ms": outcome.kg_ms,
"symbol_count": outcome.symbol_count,
"edge_count": outcome.edge_count,
}))
.await;
mark_graph_ready(&handle).await;
if mem_limit_hit || mem_abort.load(AtomicOrdering::Acquire) {
tracing::warn!(
"reindex: memory limit was breached during batch processing for \
index {} (peak_rss={}MB, limit={:?}MB) — KG was still rebuilt \
(symbols={}, edges={}) because graph construction is bounded by \
TRUSTY_MAX_KG_NODES and independent of the embedding spike",
index_id.0,
peak_rss_atomic.load(AtomicOrdering::Acquire),
mem_limit,
outcome.symbol_count,
outcome.edge_count,
);
}
outcome
};
poller_stop.store(true, AtomicOrdering::Release);
let _ = poller_handle.await;
if let Some(stop) = embedderd_poller_stop {
stop.store(true, AtomicOrdering::Release);
}
if let Some(h) = embedderd_poller_handle {
let _ = h.await;
}
if let Some(pid_slot) = embedderd_pid_slot.as_ref() {
let pid = pid_slot.load(AtomicOrdering::Acquire);
if let Some(rss) = current_rss_mb_for_pid(pid) {
let prev = peak_embedderd_rss_atomic.load(AtomicOrdering::Acquire);
if rss > prev {
peak_embedderd_rss_atomic.store(rss, AtomicOrdering::Release);
}
}
}
let embedderd_peak_rss_mb: Option<u64> = if embedderd_pid_slot.is_some() {
let v = peak_embedderd_rss_atomic.load(AtomicOrdering::Acquire);
if v > 0 {
Some(v)
} else {
None
}
} else {
None
};
let aborted_memory = mem_limit_hit || mem_abort.load(AtomicOrdering::Acquire);
if aborted_memory {
progress.status.store(ReindexStatus::AbortedMemory);
if let Some(map) = aborted_map.as_ref() {
map.insert(index_id.clone(), Instant::now());
}
} else {
progress.status.store(ReindexStatus::Complete);
let new_sha = crate::core::git::head_sha(&handle.root_path);
*handle.indexed_head_sha.write().await = new_sha;
}
if let Some(rss) = current_rss_mb() {
let prev = peak_rss_atomic.load(AtomicOrdering::Acquire);
if rss > prev {
peak_rss_atomic.store(rss, AtomicOrdering::Release);
}
}
let peak_rss_mb = peak_rss_atomic.load(AtomicOrdering::Acquire);
let indexed_final = progress.indexed.load(Ordering::Acquire);
let total_chunks = progress.total_chunks.load(Ordering::Acquire);
let skipped_final = progress.skipped.load(Ordering::Acquire);
let elapsed_ms = started.elapsed().as_millis() as u64;
let indexed_new = indexed_final.saturating_sub(skipped_final);
tracing::info!(
"reindex complete: index={} files={} indexed_new={} skipped={} chunks={} \
elapsed_ms={} peak_rss_mb={} memory_limit_hit={}",
index_id.0,
indexed_final,
indexed_new,
skipped_final,
total_chunks,
elapsed_ms,
peak_rss_mb,
mem_limit_hit,
);
let totals = RunTotals {
parse_ms: total_parse_ms,
embed_ms: total_embed_ms,
bm25_ms: total_bm25_ms,
vector_upsert_ms: total_vector_upsert_ms,
vector_count: total_vector_count,
mem_limit_hit,
chunks_dropped_by_cap: total_chunks_dropped_by_cap,
};
emit_complete_event(
&progress,
started,
peak_rss_mb,
embedderd_peak_rss_mb,
&totals,
&kg,
)
.await;
term_guard.disarm();
refresh_context_embedding(&handle).await;
schedule_progress_cleanup(cleanup_map, cleanup_id);
});
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::indexer::CodeIndexer;
use std::fs;
use std::sync::atomic::Ordering;
#[tokio::test]
async fn reindex_honours_include_paths_filter() {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
fs::create_dir_all(root.join("api")).unwrap();
fs::create_dir_all(root.join("ui")).unwrap();
fs::write(root.join("api/keep.rs"), "fn keep_me() {}\n").unwrap();
fs::write(root.join("ui/drop.rs"), "fn drop_me() {}\n").unwrap();
let indexer = CodeIndexer::new("filter-test", root.clone());
let handle = Arc::new(IndexHandle {
id: IndexId::new("filter-test"),
indexer: Arc::new(tokio::sync::RwLock::new(indexer)),
root_path: root.clone(),
include_paths: vec![root.join("api")],
exclude_globs: vec![],
extensions: vec![],
domain_terms: vec![],
include_docs: false,
respect_gitignore: true,
path_filter: vec![],
context_embedding: Arc::new(tokio::sync::RwLock::new(None)),
context_summary: Arc::new(tokio::sync::RwLock::new(None)),
indexed_head_sha: Arc::new(tokio::sync::RwLock::new(None)),
lexical_only: false,
skip_kg: false,
stages: Arc::new(tokio::sync::RwLock::new(IndexStages::default())),
search_pressure: Arc::new(tokio::sync::Notify::new()),
walk_diagnostics: Arc::new(tokio::sync::RwLock::new(
crate::core::registry::WalkDiagnostics::default(),
)),
});
let progress = Arc::new(ReindexProgress::new());
spawn_reindex(handle.clone(), progress.clone(), false);
for _ in 0..100 {
if progress.status.load() == ReindexStatus::Complete {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
assert_eq!(progress.status.load(), ReindexStatus::Complete);
assert_eq!(
progress.total_files.load(Ordering::Acquire),
1,
"only api/keep.rs should be walked"
);
let idx = handle.indexer.read().await;
let r = idx
.search(&crate::core::indexer::SearchQuery {
text: "keep_me".into(),
top_k: 5,
expand_graph: false,
compact: false,
..Default::default()
})
.await
.unwrap();
assert!(r.iter().any(|c| c.content.contains("keep_me")));
let r2 = idx
.search(&crate::core::indexer::SearchQuery {
text: "drop_me".into(),
top_k: 5,
expand_graph: false,
compact: false,
..Default::default()
})
.await
.unwrap();
assert!(
!r2.iter().any(|c| c.content.contains("drop_me")),
"ui/drop.rs must not have been indexed"
);
}
#[tokio::test]
async fn reindex_honours_path_filter() {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
std::fs::create_dir_all(root.join("common-utils")).unwrap();
std::fs::create_dir_all(root.join("other-repo")).unwrap();
std::fs::write(root.join("common-utils/keep.rs"), "fn keep_common() {}\n").unwrap();
std::fs::write(root.join("other-repo/drop.rs"), "fn drop_other() {}\n").unwrap();
let indexer = CodeIndexer::new("pf-test", root.clone());
let handle = Arc::new(IndexHandle {
id: IndexId::new("pf-test"),
indexer: Arc::new(tokio::sync::RwLock::new(indexer)),
root_path: root.clone(),
include_paths: vec![],
exclude_globs: vec![],
extensions: vec![],
domain_terms: vec![],
include_docs: false,
respect_gitignore: true,
path_filter: vec!["common-*".to_string()],
context_embedding: Arc::new(tokio::sync::RwLock::new(None)),
context_summary: Arc::new(tokio::sync::RwLock::new(None)),
indexed_head_sha: Arc::new(tokio::sync::RwLock::new(None)),
lexical_only: false,
skip_kg: false,
stages: Arc::new(tokio::sync::RwLock::new(IndexStages::default())),
search_pressure: Arc::new(tokio::sync::Notify::new()),
walk_diagnostics: Arc::new(tokio::sync::RwLock::new(
crate::core::registry::WalkDiagnostics::default(),
)),
});
let progress = Arc::new(ReindexProgress::new());
spawn_reindex(handle.clone(), progress.clone(), false);
for _ in 0..100 {
if progress.status.load() == ReindexStatus::Complete {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
assert_eq!(progress.status.load(), ReindexStatus::Complete);
assert_eq!(
progress.total_files.load(Ordering::Acquire),
1,
"only common-utils/keep.rs should pass the path_filter"
);
let idx = handle.indexer.read().await;
let r = idx
.search(&crate::core::indexer::SearchQuery {
text: "keep_common".into(),
top_k: 5,
expand_graph: false,
compact: false,
..Default::default()
})
.await
.unwrap();
assert!(r.iter().any(|c| c.content.contains("keep_common")));
let r2 = idx
.search(&crate::core::indexer::SearchQuery {
text: "drop_other".into(),
top_k: 5,
expand_graph: false,
compact: false,
..Default::default()
})
.await
.unwrap();
assert!(
!r2.iter().any(|c| c.content.contains("drop_other")),
"other-repo must not have been indexed"
);
}
#[tokio::test]
async fn reindex_walks_directory_and_emits_events() {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
fs::write(root.join("a.rs"), "fn a() {}").unwrap();
fs::write(root.join("b.py"), "def b():\n pass\n").unwrap();
fs::create_dir(root.join("target")).unwrap();
fs::write(root.join("target/skip.rs"), "fn skip() {}").unwrap();
let indexer = CodeIndexer::new("test".to_string(), root.clone());
let handle = Arc::new(IndexHandle::bare(
IndexId::new("test"),
Arc::new(tokio::sync::RwLock::new(indexer)),
root.clone(),
));
let progress = Arc::new(ReindexProgress::new());
spawn_reindex(handle, progress.clone(), false);
for _ in 0..100 {
if progress.status.load() == ReindexStatus::Complete {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
assert_eq!(progress.status.load(), ReindexStatus::Complete);
assert_eq!(progress.total_files.load(Ordering::Acquire), 2);
assert_eq!(progress.indexed.load(Ordering::Acquire), 2);
let events = progress.events.lock().await;
assert!(
events
.first()
.map(|s| s.contains("\"walk_complete\""))
.unwrap_or(false),
"first event must be walk_complete (issue #317); got: {:?}",
events.first()
);
assert!(
events
.get(1)
.map(|s| s.contains("\"start\""))
.unwrap_or(false),
"second event must be start; got: {:?}",
events.get(1)
);
assert!(
events
.last()
.map(|s| s.contains("\"complete\""))
.unwrap_or(false),
"last event must be complete; got: {:?}",
events.last()
);
}
#[tokio::test]
async fn reindex_persists_chunks_end_to_end() {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
fs::create_dir_all(root.join("crates/foo/src")).unwrap();
fs::create_dir_all(root.join("excluded")).unwrap();
fs::write(root.join(".gitignore"), "excluded/\n").unwrap();
let lib_rs = root.join("crates/foo/src/lib.rs");
fs::write(
&lib_rs,
"pub fn alpha() {}\n\npub fn beta() -> i32 { 1 }\n\npub fn gamma(x: i32) -> i32 { x + 1 }\n",
)
.unwrap();
fs::write(
root.join("excluded/should_not_index.rs"),
"pub fn nope() {}\n",
)
.unwrap();
let id = IndexId::new("e2e-pipeline-test");
let indexer = CodeIndexer::new(id.0.clone(), root.clone());
let handle = Arc::new(IndexHandle::bare(
id.clone(),
Arc::new(tokio::sync::RwLock::new(indexer)),
root.clone(),
));
let progress = Arc::new(ReindexProgress::new());
spawn_reindex(handle.clone(), progress.clone(), false);
for _ in 0..100 {
if progress.status.load() == ReindexStatus::Complete {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
assert_eq!(progress.status.load(), ReindexStatus::Complete);
assert_eq!(
progress.total_files.load(Ordering::Acquire),
1,
"walker must yield exactly 1 file (gitignored subtree pruned)"
);
let chunks = progress.total_chunks.load(Ordering::Acquire);
assert!(
chunks > 0,
"regression: walker yielded 1 file but chunker persisted 0 chunks \
on the first (cold-cache) reindex"
);
assert_eq!(
progress.skipped.load(Ordering::Acquire),
0,
"first reindex hash-skipped a file (cold cache should hash-miss everything)"
);
let canonical_lib_rs = std::fs::canonicalize(&lib_rs).unwrap_or(lib_rs.clone());
{
let idx = handle.indexer.read().await;
assert!(
idx.chunk_count() > 0,
"regression: indexer corpus is empty after reindex"
);
let results = idx
.search(&crate::core::indexer::SearchQuery {
text: "alpha".into(),
top_k: 5,
expand_graph: false,
compact: false,
..Default::default()
})
.await
.unwrap();
assert!(
results
.iter()
.any(|c| c.file == canonical_lib_rs.to_string_lossy()),
"no chunk references the canonical lib.rs path: {:?}",
results.iter().map(|c| c.file.clone()).collect::<Vec<_>>()
);
}
let progress2 = Arc::new(ReindexProgress::new());
spawn_reindex(handle.clone(), progress2.clone(), false);
for _ in 0..100 {
if progress2.status.load() == ReindexStatus::Complete {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
assert_eq!(progress2.status.load(), ReindexStatus::Complete);
assert_eq!(
progress2.total_files.load(Ordering::Acquire),
1,
"second reindex must still walk 1 file"
);
assert_eq!(
progress2.total_chunks.load(Ordering::Acquire),
0,
"second reindex of unchanged files MUST emit 0 new chunks (hash-skip path)"
);
assert_eq!(
progress2.skipped.load(Ordering::Acquire),
1,
"second reindex must report the file as hash-skipped"
);
{
let idx = handle.indexer.read().await;
assert!(
idx.chunk_count() > 0,
"regression: corpus emptied by a hash-skip-only second reindex"
);
}
}
#[tokio::test]
async fn context_embedding_populated_after_reindex() {
use crate::core::embed::{Embedder, MockEmbedder};
use crate::core::store::{UsearchStore, VectorStore};
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
fs::write(root.join("lib.rs"), "fn hello() {}\n").unwrap();
fs::write(
root.join("README.md"),
"# proj\n\nA test project for #112.\n",
)
.unwrap();
let dim = 32;
let embedder: Arc<dyn Embedder> = Arc::new(MockEmbedder::new(dim));
let store: Arc<dyn VectorStore> = Arc::new(UsearchStore::new(dim).expect("usearch new"));
let indexer = CodeIndexer::new("ctx-test", root.clone()).with_components(embedder, store);
let handle = Arc::new(IndexHandle::bare(
IndexId::new("ctx-test"),
Arc::new(tokio::sync::RwLock::new(indexer)),
root.clone(),
));
let progress = Arc::new(ReindexProgress::new());
spawn_reindex(handle.clone(), progress.clone(), false);
for _ in 0..100 {
if progress.status.load() == ReindexStatus::Complete {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
assert_eq!(progress.status.load(), ReindexStatus::Complete);
let ctx = handle.context_embedding.read().await.clone();
assert!(
ctx.is_some(),
"context_embedding must be populated when metadata is present and embedder is wired"
);
assert_eq!(ctx.unwrap().len(), dim, "embedding must have embedder dim");
let summary = handle.context_summary.read().await.clone();
assert!(summary.is_some(), "context_summary must be populated");
let s = summary.unwrap();
assert!(s.contains("proj") || s.contains("README"));
}
#[tokio::test]
async fn context_embedding_none_when_no_metadata() {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
fs::write(root.join("lib.rs"), "fn hello() {}\n").unwrap();
let indexer = CodeIndexer::new("no-meta", root.clone());
let handle = Arc::new(IndexHandle::bare(
IndexId::new("no-meta"),
Arc::new(tokio::sync::RwLock::new(indexer)),
root.clone(),
));
let progress = Arc::new(ReindexProgress::new());
spawn_reindex(handle.clone(), progress.clone(), false);
for _ in 0..100 {
if progress.status.load() == ReindexStatus::Complete {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
assert_eq!(progress.status.load(), ReindexStatus::Complete);
assert!(handle.context_embedding.read().await.is_none());
assert!(handle.context_summary.read().await.is_none());
}
fn make_handle_with_flag(
id: &str,
root: std::path::PathBuf,
lexical_only: bool,
) -> Arc<IndexHandle> {
make_handle_with_flags(id, root, lexical_only, false)
}
fn make_handle_with_flags(
id: &str,
root: std::path::PathBuf,
lexical_only: bool,
skip_kg: bool,
) -> Arc<IndexHandle> {
use crate::core::registry::{IndexStages, StageState};
let indexer = CodeIndexer::new(id.to_string(), root.clone());
let stages = if lexical_only {
IndexStages {
lexical: StageState::pending(),
semantic: StageState::skipped(),
graph: StageState::skipped(),
}
} else if skip_kg {
IndexStages {
lexical: StageState::pending(),
semantic: StageState::pending(),
graph: StageState::skipped(),
}
} else {
IndexStages::default()
};
Arc::new(IndexHandle {
id: IndexId::new(id),
indexer: Arc::new(tokio::sync::RwLock::new(indexer)),
root_path: root,
include_paths: vec![],
exclude_globs: vec![],
extensions: vec![],
domain_terms: vec![],
include_docs: false,
respect_gitignore: true,
path_filter: vec![],
context_embedding: Arc::new(tokio::sync::RwLock::new(None)),
context_summary: Arc::new(tokio::sync::RwLock::new(None)),
indexed_head_sha: Arc::new(tokio::sync::RwLock::new(None)),
lexical_only,
skip_kg,
stages: Arc::new(tokio::sync::RwLock::new(stages)),
search_pressure: Arc::new(tokio::sync::Notify::new()),
walk_diagnostics: Arc::new(tokio::sync::RwLock::new(
crate::core::registry::WalkDiagnostics::default(),
)),
})
}
#[tokio::test]
async fn stage_1_completes_and_search_works_before_embedding() {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
fs::write(root.join("hello.rs"), "pub fn unique_alpha() {}\n").unwrap();
let handle = make_handle_with_flag("stage1-test", root.clone(), false);
let progress = Arc::new(ReindexProgress::new());
spawn_reindex(handle.clone(), progress.clone(), false);
for _ in 0..200 {
if progress.status.load() == ReindexStatus::Complete {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
assert_eq!(progress.status.load(), ReindexStatus::Complete);
let stages = handle.stages.read().await.clone();
assert_eq!(
stages.lexical.status,
crate::core::registry::StageStatus::Ready,
"stage 1 must finish on a BM25-only reindex"
);
let caps = stages.search_capabilities();
assert!(
caps.contains(&"bm25"),
"search_capabilities must contain bm25 after Stage 1, got: {caps:?}"
);
let idx = handle.indexer.read().await;
let results = idx
.search(&crate::core::indexer::SearchQuery {
text: "unique_alpha".to_string(),
top_k: 5,
expand_graph: false,
compact: false,
..Default::default()
})
.await
.expect("search");
assert!(
results.iter().any(|c| c.content.contains("unique_alpha")),
"BM25 lane must return the chunk after Stage 1: {results:?}"
);
}
#[tokio::test]
async fn lexical_only_index_never_runs_stage_2() {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
fs::write(root.join("a.rs"), "pub fn lex_only_func() {}\n").unwrap();
let handle = make_handle_with_flag("lexical-only-test", root.clone(), true);
assert_eq!(
handle.stages.read().await.semantic.status,
crate::core::registry::StageStatus::Skipped
);
let progress = Arc::new(ReindexProgress::new());
spawn_reindex(handle.clone(), progress.clone(), false);
for _ in 0..200 {
if progress.status.load() == ReindexStatus::Complete {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
assert_eq!(progress.status.load(), ReindexStatus::Complete);
let stages = handle.stages.read().await.clone();
assert_eq!(
stages.lexical.status,
crate::core::registry::StageStatus::Ready,
"lexical must be Ready"
);
assert_eq!(
stages.semantic.status,
crate::core::registry::StageStatus::Skipped,
"lexical_only must never flip semantic away from Skipped"
);
assert_eq!(
stages.graph.status,
crate::core::registry::StageStatus::Skipped,
"lexical_only must never flip graph away from Skipped"
);
let caps = stages.search_capabilities();
assert!(
!caps.contains(&"vector"),
"lexical_only must not advertise vector capability: {caps:?}"
);
assert!(
!caps.contains(&"kg"),
"lexical_only must not advertise kg capability: {caps:?}"
);
let idx = handle.indexer.read().await;
let results = idx
.search(&crate::core::indexer::SearchQuery {
text: "lex_only_func".to_string(),
top_k: 5,
expand_graph: false,
compact: false,
stage: Some(crate::core::indexer::SearchStage::Lexical),
..Default::default()
})
.await
.expect("search");
assert!(
results.iter().any(|c| c.content.contains("lex_only_func")),
"lexical lane must return the chunk on lexical_only: {results:?}"
);
assert_eq!(stages.lifecycle_status(), "ready");
}
#[tokio::test]
async fn skip_kg_index_never_runs_phase3() {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
fs::write(root.join("b.rs"), "pub fn skip_kg_func() { let x = 1; }\n").unwrap();
let handle = make_handle_with_flags("skip-kg-test", root.clone(), false, true);
assert_eq!(
handle.stages.read().await.graph.status,
crate::core::registry::StageStatus::Skipped
);
let progress = Arc::new(ReindexProgress::new());
spawn_reindex(handle.clone(), progress.clone(), false);
for _ in 0..200 {
if progress.status.load() == ReindexStatus::Complete {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
assert_eq!(progress.status.load(), ReindexStatus::Complete);
let stages = handle.stages.read().await.clone();
assert_eq!(
stages.lexical.status,
crate::core::registry::StageStatus::Ready,
"lexical must be Ready"
);
assert_eq!(
stages.graph.status,
crate::core::registry::StageStatus::Skipped,
"skip_kg must never flip graph away from Skipped"
);
let caps = stages.search_capabilities();
assert!(
!caps.contains(&"kg"),
"skip_kg must not advertise kg capability: {caps:?}"
);
let indexer = handle.indexer.read().await;
let graph = indexer.snapshot_symbol_graph().await;
assert_eq!(
graph.node_count(),
0,
"symbol graph must be empty when skip_kg=true"
);
}
#[tokio::test]
async fn search_capabilities_grows_as_stages_complete() {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
fs::write(root.join("a.rs"), "pub fn stage_grow() {}\n").unwrap();
let handle = make_handle_with_flag("caps-grow-test", root.clone(), false);
assert!(handle.stages.read().await.search_capabilities().is_empty());
reset_stages_for_reindex(&handle).await;
assert!(handle.stages.read().await.search_capabilities().is_empty());
mark_lexical_ready_semantic_in_progress(&handle, 1, 1, 1).await;
let caps = handle.stages.read().await.search_capabilities();
assert!(caps.contains(&"bm25") && !caps.contains(&"vector"));
mark_semantic_ready_graph_in_progress(&handle, 1, 1).await;
let caps = handle.stages.read().await.search_capabilities();
assert!(caps.contains(&"vector") && !caps.contains(&"kg"));
mark_graph_ready(&handle).await;
let caps = handle.stages.read().await.search_capabilities();
assert!(caps.contains(&"bm25"));
assert!(caps.contains(&"vector"));
assert!(caps.contains(&"kg"));
assert_eq!(handle.stages.read().await.lifecycle_status(), "ready");
}
#[tokio::test]
async fn walk_diagnostics_populated_after_reindex() {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
fs::write(root.join("diag_check.rs"), "fn diag_fn() {}\n").unwrap();
let handle = make_handle_with_flag("diag-test", root.clone(), false);
let progress = Arc::new(ReindexProgress::new());
spawn_reindex(handle.clone(), progress.clone(), false);
for _ in 0..100 {
if progress.status.load() == ReindexStatus::Complete {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
assert_eq!(progress.status.load(), ReindexStatus::Complete);
let diag = handle.walk_diagnostics.read().await.clone();
assert!(
diag.last_walk_started_at.is_some(),
"last_walk_started_at must be set after reindex, got {:?}",
diag
);
assert!(
diag.last_walk_files_seen > 0,
"last_walk_files_seen must be > 0 when files exist, got {:?}",
diag
);
assert!(
diag.last_walk_error.is_none(),
"last_walk_error must be None on a clean walk, got {:?}",
diag.last_walk_error
);
}
#[tokio::test]
async fn walk_diagnostics_error_set_when_zero_files() {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
let handle = make_handle_with_flag("diag-zero-test", root.clone(), false);
let progress = Arc::new(ReindexProgress::new());
spawn_reindex(handle.clone(), progress.clone(), false);
for _ in 0..100 {
if progress.status.load() == ReindexStatus::Complete {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
assert_eq!(progress.status.load(), ReindexStatus::Complete);
let diag = handle.walk_diagnostics.read().await.clone();
assert_eq!(
diag.last_walk_files_seen, 0,
"last_walk_files_seen must be 0 for empty directory, got {:?}",
diag
);
assert!(
diag.last_walk_error.is_some(),
"last_walk_error must be set when zero files are found, got {:?}",
diag
);
}
#[test]
fn reindex_semaphore_selection_routes_by_priority() {
let interactive = reindex_semaphore_for(true) as *const Semaphore;
let background = reindex_semaphore_for(false) as *const Semaphore;
assert_ne!(
interactive, background,
"interactive and background must be different semaphore instances"
);
assert_eq!(
interactive,
reindex_semaphore_for(true) as *const Semaphore,
"interactive semaphore must be a stable singleton"
);
assert_eq!(
background,
reindex_semaphore_for(false) as *const Semaphore,
"background semaphore must be a stable singleton"
);
}
#[tokio::test]
async fn interactive_not_blocked_when_background_semaphore_full() {
let bg_sem = Semaphore::new(MAX_PARALLEL_BACKGROUND_REINDEXES);
let interactive_sem = Semaphore::new(MAX_PARALLEL_REINDEXES);
let _bg_permit = bg_sem
.acquire()
.await
.expect("background semaphore unexpectedly closed");
let interactive_permit = interactive_sem
.try_acquire()
.expect("interactive semaphore must have a free permit even when background is full");
assert_eq!(
bg_sem.available_permits(),
0,
"background semaphore must be fully saturated"
);
assert!(
interactive_sem.available_permits() < MAX_PARALLEL_REINDEXES,
"interactive semaphore must show one consumed permit"
);
drop(interactive_permit);
}
#[test]
fn background_reindex_queue_depth_counts_waiting_tasks() {
let initial = BACKGROUND_QUEUE_DEPTH.load(std::sync::atomic::Ordering::Relaxed);
BACKGROUND_QUEUE_DEPTH.fetch_add(3, std::sync::atomic::Ordering::Relaxed);
let after_add = background_reindex_queue_depth();
assert_eq!(
after_add,
initial + 3,
"queue depth must increase by 3 after 3 increments"
);
BACKGROUND_QUEUE_DEPTH.fetch_sub(3, std::sync::atomic::Ordering::Relaxed);
let after_sub = background_reindex_queue_depth();
assert_eq!(
after_sub, initial,
"queue depth must return to initial after 3 decrements"
);
}
#[test]
fn reindex_guard_fires_on_early_return() {
let progress = Arc::new(ReindexProgress::new());
let mut rx = progress.sender.subscribe();
{
let _guard = ReindexTerminationGuard::new(Arc::clone(&progress));
}
assert_eq!(
progress.status.load(),
ReindexStatus::Failed,
"status must be Failed after guard drops while armed"
);
let msg = rx
.try_recv()
.expect("guard must have broadcast an error event");
assert!(
msg.contains("\"error\""),
"broadcast message must contain event:error; got: {msg}"
);
}
#[test]
fn reindex_guard_does_not_fire_after_disarm() {
let progress = Arc::new(ReindexProgress::new());
let mut rx = progress.sender.subscribe();
{
let mut guard = ReindexTerminationGuard::new(Arc::clone(&progress));
guard.disarm();
}
assert_eq!(
rx.try_recv()
.err()
.map(|e| matches!(e, tokio::sync::broadcast::error::TryRecvError::Empty)),
Some(true),
"no event should be broadcast after disarm"
);
}
}