use crate::core::indexer::{CommitTimings, ParsedBatch};
use crate::core::memguard::{current_rss_mb, index_memory_limit_mb};
use crate::core::registry::{IndexHandle, IndexId};
use crate::service::walker::{should_skip_content, walk_source_files};
use anyhow::Context;
use crossbeam_utils::atomic::AtomicCell;
use dashmap::DashMap;
use serde::Serialize;
use sha2::{Digest, Sha256};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering};
use std::sync::{Arc, OnceLock};
use std::time::{Duration, Instant};
use tokio::sync::{broadcast, mpsc, Mutex, Semaphore};
fn reindex_semaphore() -> &'static Semaphore {
static SEM: OnceLock<Semaphore> = OnceLock::new();
SEM.get_or_init(|| Semaphore::new(1))
}
const REINDEX_BATCH_SIZE: usize = 128;
fn file_hashes() -> &'static DashMap<IndexId, Arc<DashMap<PathBuf, String>>> {
static FILE_HASHES: OnceLock<DashMap<IndexId, Arc<DashMap<PathBuf, String>>>> = OnceLock::new();
FILE_HASHES.get_or_init(DashMap::new)
}
fn hashes_for(id: &IndexId) -> Arc<DashMap<PathBuf, String>> {
file_hashes()
.entry(id.clone())
.or_insert_with(|| Arc::new(DashMap::new()))
.clone()
}
const MAX_FILE_HASHES_PER_INDEX: usize = 200_000;
fn shrink_hashes_if_needed(map: &DashMap<PathBuf, String>) {
let len = map.len();
if len <= MAX_FILE_HASHES_PER_INDEX {
return;
}
let target = MAX_FILE_HASHES_PER_INDEX * 9 / 10;
let to_remove = len.saturating_sub(target);
let keys: Vec<PathBuf> = map
.iter()
.take(to_remove)
.map(|e| e.key().clone())
.collect();
for k in keys {
map.remove(&k);
}
tracing::info!(
"file-hash cache exceeded {} entries — dropped {} to bound memory",
MAX_FILE_HASHES_PER_INDEX,
to_remove
);
}
const MAX_REPLAY_EVENTS: usize = 500;
const REINDEX_PROGRESS_TTL_SECS: u64 = 60;
fn hash_content(content: &str) -> String {
let mut hasher = Sha256::new();
hasher.update(content.as_bytes());
format!("{:x}", hasher.finalize())
}
const BROADCAST_CAPACITY: usize = 256;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum ReindexStatus {
Running,
Complete,
AbortedMemory,
Failed,
}
pub struct ReindexProgress {
pub status: AtomicCell<ReindexStatus>,
pub total_files: std::sync::atomic::AtomicUsize,
pub indexed: std::sync::atomic::AtomicUsize,
pub total_chunks: std::sync::atomic::AtomicUsize,
pub errors: std::sync::atomic::AtomicUsize,
pub skipped: std::sync::atomic::AtomicUsize,
pub events: Arc<Mutex<Vec<String>>>,
pub sender: broadcast::Sender<String>,
}
impl ReindexProgress {
pub fn new() -> Self {
let (sender, _) = broadcast::channel(BROADCAST_CAPACITY);
Self {
status: AtomicCell::new(ReindexStatus::Running),
total_files: Default::default(),
indexed: Default::default(),
total_chunks: Default::default(),
errors: Default::default(),
skipped: Default::default(),
events: Arc::new(Mutex::new(Vec::new())),
sender,
}
}
pub async fn push(&self, event: serde_json::Value) {
let line = event.to_string();
{
let mut buf = self.events.lock().await;
if buf.len() >= MAX_REPLAY_EVENTS {
buf.remove(0);
}
buf.push(line.clone());
}
let _ = self.sender.send(line);
}
}
impl Default for ReindexProgress {
fn default() -> Self {
Self::new()
}
}
pub fn spawn_reindex(handle: Arc<IndexHandle>, progress: Arc<ReindexProgress>, force: bool) {
spawn_reindex_with_cleanup(handle, progress, force, None, None);
}
fn collect_files_to_index(handle: &IndexHandle) -> crate::service::walker::WalkResult {
let include_paths: Vec<PathBuf> = if handle.include_paths.is_empty() {
vec![handle.root_path.clone()]
} else {
handle.include_paths.clone()
};
let mut walked_files: Vec<PathBuf> = Vec::new();
let mut total_skipped_dirs: usize = 0;
for subtree in &include_paths {
let w = walk_source_files(subtree);
walked_files.extend(w.files);
total_skipped_dirs = total_skipped_dirs.saturating_add(w.skipped_dirs);
}
if !handle.exclude_globs.is_empty() {
let excludes = handle.exclude_globs.clone();
walked_files.retain(|p| !crate::core::repo_config::path_matches_any_glob(p, &excludes));
}
if !handle.extensions.is_empty() {
let allowed = handle.extensions.clone();
walked_files.retain(|p| {
p.extension()
.and_then(|e| e.to_str())
.map(|e| allowed.iter().any(|x| x.eq_ignore_ascii_case(e)))
.unwrap_or(false)
});
}
if !handle.path_filter.is_empty() {
let patterns = handle.path_filter.clone();
let root = handle.root_path.clone();
walked_files.retain(|p| crate::core::registry::path_matches_filter(p, &root, &patterns));
}
walked_files.sort();
walked_files.dedup();
crate::service::walker::WalkResult {
files: walked_files,
skipped_dirs: total_skipped_dirs,
}
}
fn spawn_memory_poller(
mem_limit: Option<u64>,
mem_abort: Arc<AtomicBool>,
peak_rss: Arc<AtomicU64>,
index_id: String,
) -> (tokio::task::JoinHandle<()>, Arc<AtomicBool>) {
const MEM_POLL_INTERVAL: Duration = Duration::from_secs(1);
let stop = Arc::new(AtomicBool::new(false));
let stop_clone = stop.clone();
let handle = tokio::spawn(async move {
let mut ticker = tokio::time::interval(MEM_POLL_INTERVAL);
ticker.tick().await;
loop {
if stop_clone.load(AtomicOrdering::Acquire) {
break;
}
if let Some(rss) = current_rss_mb() {
let mut prev = peak_rss.load(AtomicOrdering::Acquire);
while rss > prev {
match peak_rss.compare_exchange_weak(
prev,
rss,
AtomicOrdering::AcqRel,
AtomicOrdering::Acquire,
) {
Ok(_) => break,
Err(cur) => prev = cur,
}
}
if let Some(limit) = mem_limit {
if rss >= limit && !mem_abort.load(AtomicOrdering::Acquire) {
tracing::warn!(
"reindex memory poller: rss={}MB >= limit={}MB \
— tripping abort flag for index {}",
rss,
limit,
index_id,
);
mem_abort.store(true, AtomicOrdering::Release);
}
}
}
ticker.tick().await;
}
});
(handle, stop)
}
fn schedule_progress_cleanup(
cleanup_map: Option<Arc<DashMap<IndexId, Arc<ReindexProgress>>>>,
cleanup_id: IndexId,
) {
let Some(map) = cleanup_map else {
return;
};
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_secs(REINDEX_PROGRESS_TTL_SECS)).await;
map.remove(&cleanup_id);
});
}
async fn refresh_context_embedding(handle: &Arc<IndexHandle>) {
use crate::service::context_inference::{make_display_summary, scrape_metadata_summary};
let Some(summary) = scrape_metadata_summary(&handle.root_path) else {
tracing::debug!(
"context_inference: no recognised metadata files under {} for index {}",
handle.root_path.display(),
handle.id.0
);
*handle.context_embedding.write().await = None;
*handle.context_summary.write().await = None;
return;
};
let display = make_display_summary(&summary);
let indexer = handle.indexer.read().await;
let embed_result = indexer.embed_text(&summary).await;
drop(indexer);
match embed_result {
Ok(Some(vec)) => {
*handle.context_embedding.write().await = Some(vec);
*handle.context_summary.write().await = Some(display);
tracing::info!(
"context_inference: refreshed context embedding for index {}",
handle.id.0
);
}
Ok(None) => {
tracing::debug!(
"context_inference: no embedder wired on index {} — skipping context embedding",
handle.id.0
);
*handle.context_embedding.write().await = None;
*handle.context_summary.write().await = Some(display);
}
Err(e) => {
tracing::warn!(
"context_inference: embed failed for index {}: {e}",
handle.id.0
);
*handle.context_embedding.write().await = None;
*handle.context_summary.write().await = Some(display);
}
}
}
#[derive(Clone)]
struct BatchCtx {
handle: Arc<IndexHandle>,
progress: Arc<ReindexProgress>,
root: PathBuf,
index_id: IndexId,
hashes: Arc<DashMap<PathBuf, String>>,
mem_limit: Option<u64>,
mem_abort: Arc<AtomicBool>,
peak_rss_atomic: Arc<AtomicU64>,
started: Instant,
total: usize,
}
#[derive(Default)]
struct BatchOutcome {
parse_ms: u64,
embed_ms: u64,
bm25_ms: u64,
vector_upsert_ms: u64,
vector_count: usize,
mem_limit_hit: bool,
}
#[allow(dead_code)]
async fn process_one_batch(ctx: &BatchCtx, batch: &[PathBuf]) -> BatchOutcome {
let Some(parsed) = prepare_and_parse_batch(ctx, batch).await else {
return BatchOutcome::default();
};
commit_parsed_and_finalize(ctx, parsed).await
}
struct ParsedReadyBatch {
parsed: ParsedBatch,
new_hashes: Vec<(PathBuf, String)>,
batch_files: usize,
}
async fn prepare_and_parse_batch(ctx: &BatchCtx, batch: &[PathBuf]) -> Option<ParsedReadyBatch> {
let payload = prepare_batch_payload(ctx, batch).await;
if payload.to_index.is_empty() {
return None;
}
let batch_files = payload.to_index.len();
let to_index = payload.to_index;
let parsed = {
let indexer = ctx.handle.indexer.read().await;
match indexer.parse_and_embed_files(to_index).await {
Ok(p) => p,
Err(e) => {
drop(indexer);
emit_batch_error(ctx, &payload.to_index_paths, e).await;
return None;
}
}
};
Some(ParsedReadyBatch {
parsed,
new_hashes: payload.new_hashes,
batch_files,
})
}
async fn commit_parsed_and_finalize(ctx: &BatchCtx, ready: ParsedReadyBatch) -> BatchOutcome {
let ParsedReadyBatch {
parsed,
new_hashes,
batch_files,
} = ready;
let parse_ms = parsed.parse_ms;
let embed_ms = parsed.embed_ms;
let vector_count = parsed.vector_count;
let commit = {
let indexer = ctx.handle.indexer.write().await;
match indexer.commit_parsed_batch(parsed, true).await {
Ok(c) => c,
Err(e) => {
drop(indexer);
let placeholder_paths: Vec<PathBuf> =
new_hashes.iter().map(|(p, _)| p.clone()).collect();
emit_batch_error(ctx, &placeholder_paths, e).await;
return BatchOutcome::default();
}
}
};
apply_successful_commit(ctx, new_hashes, batch_files, &commit).await;
let mem_limit_hit = check_post_commit_memory(ctx);
BatchOutcome {
parse_ms,
embed_ms,
bm25_ms: commit.bm25_ms,
vector_upsert_ms: commit.vector_upsert_ms,
vector_count,
mem_limit_hit,
}
}
struct BatchPayload {
to_index: Vec<(String, String)>,
to_index_paths: Vec<PathBuf>,
new_hashes: Vec<(PathBuf, String)>,
}
async fn prepare_batch_payload(ctx: &BatchCtx, batch: &[PathBuf]) -> BatchPayload {
use std::sync::atomic::Ordering;
let read_futs = batch.iter().map(|path| {
let path = path.clone();
async move {
let content = tokio::fs::read_to_string(&path).await;
(path, content)
}
});
let read_results = futures::future::join_all(read_futs).await;
let mut to_index: Vec<(String, String)> = Vec::with_capacity(batch.len());
let mut to_index_paths: Vec<PathBuf> = Vec::with_capacity(batch.len());
let mut new_hashes: Vec<(PathBuf, String)> = Vec::with_capacity(batch.len());
for (path, content_res) in read_results {
let rel = path
.strip_prefix(&ctx.root)
.unwrap_or(&path)
.display()
.to_string();
let content = match content_res {
Ok(c) => c,
Err(e) => {
ctx.progress.errors.fetch_add(1, Ordering::Release);
ctx.progress
.push(serde_json::json!({
"event": "error",
"file": rel,
"message": format!("read: {e}"),
"indexed": ctx.progress.indexed.load(Ordering::Acquire),
"total_files": ctx.total,
}))
.await;
continue;
}
};
if should_skip_content(&path, &content) {
tracing::debug!("reindex: skipping minified content in {}", path.display());
emit_skip(ctx, &rel, Some("minified")).await;
continue;
}
let h = hash_content(&content);
if ctx
.hashes
.get(&path)
.map(|prev| *prev == h)
.unwrap_or(false)
{
emit_skip(ctx, &rel, None).await;
continue;
}
let path_str = path.to_string_lossy().to_string();
to_index.push((path_str, content));
to_index_paths.push(path.clone());
new_hashes.push((path, h));
}
BatchPayload {
to_index,
to_index_paths,
new_hashes,
}
}
async fn emit_skip(ctx: &BatchCtx, rel: &str, reason: Option<&str>) {
use std::sync::atomic::Ordering;
ctx.progress.skipped.fetch_add(1, Ordering::Release);
let indexed = ctx.progress.indexed.fetch_add(1, Ordering::Release) + 1;
let mut event = serde_json::json!({
"event": "skip",
"file": rel,
"indexed": indexed,
"total_files": ctx.total,
});
if let Some(r) = reason {
event["reason"] = serde_json::Value::String(r.to_string());
}
ctx.progress.push(event).await;
}
async fn emit_batch_error(ctx: &BatchCtx, to_index_paths: &[PathBuf], err: anyhow::Error) {
use std::sync::atomic::Ordering;
let files_in_batch: Vec<String> = to_index_paths
.iter()
.map(|p| p.strip_prefix(&ctx.root).unwrap_or(p).display().to_string())
.collect();
ctx.progress
.errors
.fetch_add(to_index_paths.len(), Ordering::Release);
ctx.progress
.push(serde_json::json!({
"event": "error",
"files": files_in_batch,
"message": format!("batch index: {err}"),
"indexed": ctx.progress.indexed.load(Ordering::Acquire),
"total_files": ctx.total,
}))
.await;
}
async fn apply_successful_commit(
ctx: &BatchCtx,
new_hashes: Vec<(PathBuf, String)>,
batch_files: usize,
commit: &CommitTimings,
) {
use std::sync::atomic::Ordering;
let new_chunks = commit.chunks;
ctx.progress
.total_chunks
.fetch_add(new_chunks, Ordering::Release);
let indexed = ctx
.progress
.indexed
.fetch_add(batch_files, Ordering::Release)
+ batch_files;
let elapsed_ms = ctx.started.elapsed().as_millis() as u64;
let chunks_per_sec = (ctx.progress.total_chunks.load(Ordering::Acquire) as u64 * 1000)
.checked_div(elapsed_ms)
.unwrap_or(0);
for (path, h) in new_hashes {
ctx.hashes.insert(path, h);
}
shrink_hashes_if_needed(&ctx.hashes);
ctx.progress
.push(serde_json::json!({
"event": "batch",
"batch_files": batch_files,
"batch_chunks": new_chunks,
"indexed": indexed,
"total_files": ctx.total,
"elapsed_ms": elapsed_ms,
"chunks_per_sec": chunks_per_sec,
}))
.await;
}
fn check_post_commit_memory(ctx: &BatchCtx) -> bool {
let Some(limit) = ctx.mem_limit else {
return false;
};
let Some(rss) = current_rss_mb() else {
return false;
};
let prev_peak = ctx.peak_rss_atomic.load(AtomicOrdering::Acquire);
if rss > prev_peak {
ctx.peak_rss_atomic.store(rss, AtomicOrdering::Release);
}
if rss >= limit {
tracing::warn!(
"reindex: memory limit hit after commit \
(rss={}MB >= limit={}MB) — skipping \
remaining batches for index {}",
rss,
limit,
ctx.index_id.0
);
ctx.mem_abort.store(true, AtomicOrdering::Release);
return true;
}
false
}
struct KgRebuildOutcome {
symbol_count: usize,
edge_count: usize,
kg_ms: u64,
kg_skipped: bool,
}
async fn rebuild_symbol_graph_for_reindex(handle: &IndexHandle) -> KgRebuildOutcome {
let kg_start = Instant::now();
let indexer = handle.indexer.read().await;
indexer.rebuild_symbol_graph_now().await;
let g = indexer.symbol_graph().await;
KgRebuildOutcome {
symbol_count: g.node_count(),
edge_count: g.edge_count(),
kg_ms: kg_start.elapsed().as_millis() as u64,
kg_skipped: false,
}
}
struct RunTotals {
parse_ms: u64,
embed_ms: u64,
bm25_ms: u64,
vector_upsert_ms: u64,
vector_count: usize,
mem_limit_hit: bool,
}
async fn emit_complete_event(
progress: &ReindexProgress,
started: Instant,
peak_rss_mb: u64,
totals: &RunTotals,
kg: &KgRebuildOutcome,
) {
use std::sync::atomic::Ordering;
let total_chunks = progress.total_chunks.load(Ordering::Acquire);
let elapsed_ms = started.elapsed().as_millis() as u64;
let chunks_per_sec = (total_chunks as u64 * 1000)
.checked_div(elapsed_ms)
.unwrap_or(0);
let indexed_final = progress.indexed.load(Ordering::Acquire);
let status_str = if totals.mem_limit_hit {
"aborted_memory"
} else {
"complete"
};
progress
.push(serde_json::json!({
"event": "complete",
"status": status_str,
"indexed": indexed_final,
"total_chunks": total_chunks,
"skipped": progress.skipped.load(Ordering::Acquire),
"errors": progress.errors.load(Ordering::Acquire),
"elapsed_ms": elapsed_ms,
"chunks_per_sec": chunks_per_sec,
"peak_rss_mb": peak_rss_mb,
"memory_limit_hit": totals.mem_limit_hit,
"kg_skipped": kg.kg_skipped,
"timings": {
"parse_ms": totals.parse_ms,
"embed_ms": totals.embed_ms,
"bm25_ms": totals.bm25_ms,
"vector_upsert_ms": totals.vector_upsert_ms,
"kg_ms": kg.kg_ms,
"vector_count": totals.vector_count,
"symbol_count": kg.symbol_count,
"edge_count": kg.edge_count,
},
}))
.await;
}
async fn begin_force_corpus_swap(handle: &IndexHandle, index_id: &IndexId) -> Option<PathBuf> {
{
if !handle.indexer.read().await.has_corpus_store() {
return None;
}
}
let tmp_path = match crate::service::persistence::corpus_redb_tmp_path(&index_id.0) {
Ok(p) => p,
Err(e) => {
tracing::warn!(
"force reindex: cannot resolve staging corpus path for '{}' ({e}) — \
reindex will write directly to the live corpus",
index_id.0
);
return None;
}
};
let tmp_for_open = tmp_path.clone();
let staged = tokio::task::spawn_blocking(move || {
crate::core::corpus::CorpusStore::open_fresh(&tmp_for_open)
})
.await;
let staged = match staged {
Ok(Ok(store)) => store,
Ok(Err(e)) => {
tracing::warn!(
"force reindex: could not open staging corpus for '{}' ({e}) — \
reindex will write directly to the live corpus",
index_id.0
);
return None;
}
Err(e) => {
tracing::warn!(
"force reindex: staging corpus open task panicked for '{}': {e}",
index_id.0
);
return None;
}
};
let mut indexer = handle.indexer.write().await;
let _prev = indexer.swap_corpus_store(Arc::new(staged));
drop(indexer);
tracing::info!(
"force reindex: staging rebuilt corpus for '{}' in {}",
index_id.0,
tmp_path.display()
);
Some(tmp_path)
}
async fn commit_force_corpus_swap(handle: &IndexHandle, index_id: &IndexId, tmp_path: &Path) {
let live_path = match crate::service::persistence::corpus_redb_path(&index_id.0) {
Ok(p) => p,
Err(e) => {
tracing::warn!(
"force reindex: cannot resolve live corpus path for '{}' ({e}) — \
staged corpus left at {}",
index_id.0,
tmp_path.display()
);
return;
}
};
{
let mut indexer = handle.indexer.write().await;
let _ = indexer.take_corpus_store();
}
let tmp = tmp_path.to_path_buf();
let live = live_path.clone();
let index_id_inner = index_id.0.clone();
let reopened = tokio::task::spawn_blocking(
move || -> anyhow::Result<crate::core::corpus::CorpusStore> {
std::fs::rename(&tmp, &live).with_context(|| {
format!(
"atomic-swap rename {} -> {} for '{index_id_inner}'",
tmp.display(),
live.display()
)
})?;
crate::core::corpus::CorpusStore::open(&live)
.with_context(|| format!("re-open swapped corpus for '{index_id_inner}'"))
},
)
.await;
match reopened {
Ok(Ok(store)) => {
handle
.indexer
.write()
.await
.set_corpus_store(Arc::new(store));
tracing::info!(
"force reindex: atomically swapped rebuilt corpus into {} for '{}'",
live_path.display(),
index_id.0
);
}
Ok(Err(e)) => tracing::warn!(
"force reindex: atomic corpus swap failed for '{}' ({e}) — \
previous corpus preserved; in-memory state is the rebuilt one",
index_id.0
),
Err(e) => tracing::warn!(
"force reindex: atomic corpus swap task panicked for '{}': {e}",
index_id.0
),
}
}
async fn abort_force_corpus_swap(handle: &IndexHandle, index_id: &IndexId, tmp_path: &Path) {
{
let mut indexer = handle.indexer.write().await;
let _ = indexer.take_corpus_store();
}
let live_path = crate::service::persistence::corpus_redb_path(&index_id.0);
let tmp = tmp_path.to_path_buf();
let index_id_inner = index_id.0.clone();
let restored = tokio::task::spawn_blocking(
move || -> anyhow::Result<Option<crate::core::corpus::CorpusStore>> {
match std::fs::remove_file(&tmp) {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => tracing::warn!(
"force reindex: could not delete staging corpus {} for '{index_id_inner}': {e}",
tmp.display()
),
}
match live_path {
Ok(live) => Ok(Some(crate::core::corpus::CorpusStore::open(&live)?)),
Err(e) => {
tracing::warn!(
"force reindex: cannot resolve live corpus path for '{index_id_inner}' \
({e}) — index left without a durable corpus until next restart"
);
Ok(None)
}
}
},
)
.await;
match restored {
Ok(Ok(Some(store))) => {
handle
.indexer
.write()
.await
.set_corpus_store(Arc::new(store));
tracing::warn!(
"force reindex: aborted — discarded staging corpus and restored the \
original durable corpus for '{}'",
index_id.0
);
}
Ok(Ok(None)) => {}
Ok(Err(e)) => tracing::warn!(
"force reindex: could not restore the original corpus for '{}' after abort ({e})",
index_id.0
),
Err(e) => tracing::warn!(
"force reindex: corpus-restore task panicked for '{}': {e}",
index_id.0
),
}
}
pub fn spawn_reindex_with_cleanup(
handle: Arc<IndexHandle>,
progress: Arc<ReindexProgress>,
force: bool,
cleanup_map: Option<Arc<DashMap<IndexId, Arc<ReindexProgress>>>>,
aborted_map: Option<Arc<DashMap<IndexId, Instant>>>,
) {
let cleanup_id = handle.id.clone();
tokio::spawn(async move {
use std::sync::atomic::Ordering;
let _permit = reindex_semaphore()
.acquire()
.await
.expect("reindex semaphore is never closed");
let started = Instant::now();
let root = handle.root_path.clone();
let index_id: IndexId = handle.id.clone();
let walk = collect_files_to_index(&handle);
let total = walk.files.len();
progress.total_files.store(total, Ordering::Release);
progress
.push(serde_json::json!({
"event": "start",
"total_files": total,
"index_id": index_id.0,
"root_path": root,
"force": force,
}))
.await;
let hashes = hashes_for(&index_id);
if force {
hashes.clear();
}
let corpus_swap_tmp: Option<PathBuf> = if force {
begin_force_corpus_swap(&handle, &index_id).await
} else {
None
};
let mut total_parse_ms: u64 = 0;
let mut total_embed_ms: u64 = 0;
let mut total_bm25_ms: u64 = 0;
let mut total_vector_upsert_ms: u64 = 0;
let mut total_vector_count: usize = 0;
let mem_limit = index_memory_limit_mb();
let mem_abort = Arc::new(AtomicBool::new(false));
let peak_rss_atomic = Arc::new(AtomicU64::new(current_rss_mb().unwrap_or(0)));
let mut mem_limit_hit: bool = false;
let (poller_handle, poller_stop) = spawn_memory_poller(
mem_limit,
mem_abort.clone(),
peak_rss_atomic.clone(),
index_id.0.clone(),
);
let ctx = BatchCtx {
handle: handle.clone(),
progress: progress.clone(),
root: root.clone(),
index_id: index_id.clone(),
hashes: hashes.clone(),
mem_limit,
mem_abort: mem_abort.clone(),
peak_rss_atomic: peak_rss_atomic.clone(),
started,
total,
};
let batches: Vec<Vec<PathBuf>> = walk
.files
.chunks(REINDEX_BATCH_SIZE)
.map(|b| b.to_vec())
.collect();
let (tx, mut rx) = mpsc::channel::<ParsedReadyBatch>(1);
let producer_ctx = ctx.clone();
let producer_mem_abort = mem_abort.clone();
let producer_index_id = index_id.0.clone();
let producer = tokio::spawn(async move {
for batch in batches {
if producer_mem_abort.load(AtomicOrdering::Acquire) {
let rss = current_rss_mb().unwrap_or(0);
tracing::warn!(
"reindex: memory limit hit before batch (rss={}MB, \
limit={:?}MB) — producer halting for index {}",
rss,
producer_ctx.mem_limit,
producer_index_id
);
break;
}
let Some(ready) = prepare_and_parse_batch(&producer_ctx, &batch).await else {
continue;
};
if tx.send(ready).await.is_err() {
break;
}
}
});
while let Some(ready) = rx.recv().await {
let outcome = commit_parsed_and_finalize(&ctx, ready).await;
total_parse_ms = total_parse_ms.saturating_add(outcome.parse_ms);
total_embed_ms = total_embed_ms.saturating_add(outcome.embed_ms);
total_bm25_ms = total_bm25_ms.saturating_add(outcome.bm25_ms);
total_vector_upsert_ms =
total_vector_upsert_ms.saturating_add(outcome.vector_upsert_ms);
total_vector_count = total_vector_count.saturating_add(outcome.vector_count);
if outcome.mem_limit_hit {
mem_limit_hit = true;
rx.close();
while rx.recv().await.is_some() {}
break;
}
}
let _ = producer.await;
{
let indexer = handle.indexer.read().await;
indexer.force_incremental_persist();
}
if let Some(tmp_path) = &corpus_swap_tmp {
let aborted = mem_limit_hit || mem_abort.load(AtomicOrdering::Acquire);
if aborted {
abort_force_corpus_swap(&handle, &index_id, tmp_path).await;
} else {
commit_force_corpus_swap(&handle, &index_id, tmp_path).await;
}
}
let kg = rebuild_symbol_graph_for_reindex(&handle).await;
if mem_limit_hit || mem_abort.load(AtomicOrdering::Acquire) {
tracing::warn!(
"reindex: memory limit was breached during batch processing for \
index {} (peak_rss={}MB, limit={:?}MB) — KG was still rebuilt \
(symbols={}, edges={}) because graph construction is bounded by \
TRUSTY_MAX_KG_NODES and independent of the embedding spike",
index_id.0,
peak_rss_atomic.load(AtomicOrdering::Acquire),
mem_limit,
kg.symbol_count,
kg.edge_count,
);
}
poller_stop.store(true, AtomicOrdering::Release);
let _ = poller_handle.await;
let aborted_memory = mem_limit_hit || mem_abort.load(AtomicOrdering::Acquire);
if aborted_memory {
progress.status.store(ReindexStatus::AbortedMemory);
if let Some(map) = aborted_map.as_ref() {
map.insert(index_id.clone(), Instant::now());
}
} else {
progress.status.store(ReindexStatus::Complete);
}
if let Some(rss) = current_rss_mb() {
let prev = peak_rss_atomic.load(AtomicOrdering::Acquire);
if rss > prev {
peak_rss_atomic.store(rss, AtomicOrdering::Release);
}
}
let peak_rss_mb = peak_rss_atomic.load(AtomicOrdering::Acquire);
let indexed_final = progress.indexed.load(Ordering::Acquire);
let total_chunks = progress.total_chunks.load(Ordering::Acquire);
let elapsed_ms = started.elapsed().as_millis() as u64;
tracing::info!(
"reindex complete: index={} files={} chunks={} elapsed_ms={} \
peak_rss_mb={} memory_limit_hit={}",
index_id.0,
indexed_final,
total_chunks,
elapsed_ms,
peak_rss_mb,
mem_limit_hit,
);
let totals = RunTotals {
parse_ms: total_parse_ms,
embed_ms: total_embed_ms,
bm25_ms: total_bm25_ms,
vector_upsert_ms: total_vector_upsert_ms,
vector_count: total_vector_count,
mem_limit_hit,
};
emit_complete_event(&progress, started, peak_rss_mb, &totals, &kg).await;
refresh_context_embedding(&handle).await;
schedule_progress_cleanup(cleanup_map, cleanup_id);
});
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::indexer::CodeIndexer;
use std::fs;
use std::sync::atomic::Ordering;
#[tokio::test]
async fn reindex_honours_include_paths_filter() {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
fs::create_dir_all(root.join("api")).unwrap();
fs::create_dir_all(root.join("ui")).unwrap();
fs::write(root.join("api/keep.rs"), "fn keep_me() {}\n").unwrap();
fs::write(root.join("ui/drop.rs"), "fn drop_me() {}\n").unwrap();
let indexer = CodeIndexer::new("filter-test", root.clone());
let handle = Arc::new(IndexHandle {
id: IndexId::new("filter-test"),
indexer: Arc::new(tokio::sync::RwLock::new(indexer)),
root_path: root.clone(),
include_paths: vec![root.join("api")],
exclude_globs: vec![],
extensions: vec![],
domain_terms: vec![],
path_filter: vec![],
context_embedding: Arc::new(tokio::sync::RwLock::new(None)),
context_summary: Arc::new(tokio::sync::RwLock::new(None)),
});
let progress = Arc::new(ReindexProgress::new());
spawn_reindex(handle.clone(), progress.clone(), false);
for _ in 0..100 {
if progress.status.load() == ReindexStatus::Complete {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
assert_eq!(progress.status.load(), ReindexStatus::Complete);
assert_eq!(
progress.total_files.load(Ordering::Acquire),
1,
"only api/keep.rs should be walked"
);
let idx = handle.indexer.read().await;
let r = idx
.search(&crate::core::indexer::SearchQuery {
text: "keep_me".into(),
top_k: 5,
expand_graph: false,
compact: false,
..Default::default()
})
.await
.unwrap();
assert!(r.iter().any(|c| c.content.contains("keep_me")));
let r2 = idx
.search(&crate::core::indexer::SearchQuery {
text: "drop_me".into(),
top_k: 5,
expand_graph: false,
compact: false,
..Default::default()
})
.await
.unwrap();
assert!(
!r2.iter().any(|c| c.content.contains("drop_me")),
"ui/drop.rs must not have been indexed"
);
}
#[tokio::test]
async fn reindex_honours_path_filter() {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
std::fs::create_dir_all(root.join("common-utils")).unwrap();
std::fs::create_dir_all(root.join("other-repo")).unwrap();
std::fs::write(root.join("common-utils/keep.rs"), "fn keep_common() {}\n").unwrap();
std::fs::write(root.join("other-repo/drop.rs"), "fn drop_other() {}\n").unwrap();
let indexer = CodeIndexer::new("pf-test", root.clone());
let handle = Arc::new(IndexHandle {
id: IndexId::new("pf-test"),
indexer: Arc::new(tokio::sync::RwLock::new(indexer)),
root_path: root.clone(),
include_paths: vec![],
exclude_globs: vec![],
extensions: vec![],
domain_terms: vec![],
path_filter: vec!["common-*".to_string()],
context_embedding: Arc::new(tokio::sync::RwLock::new(None)),
context_summary: Arc::new(tokio::sync::RwLock::new(None)),
});
let progress = Arc::new(ReindexProgress::new());
spawn_reindex(handle.clone(), progress.clone(), false);
for _ in 0..100 {
if progress.status.load() == ReindexStatus::Complete {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
assert_eq!(progress.status.load(), ReindexStatus::Complete);
assert_eq!(
progress.total_files.load(Ordering::Acquire),
1,
"only common-utils/keep.rs should pass the path_filter"
);
let idx = handle.indexer.read().await;
let r = idx
.search(&crate::core::indexer::SearchQuery {
text: "keep_common".into(),
top_k: 5,
expand_graph: false,
compact: false,
..Default::default()
})
.await
.unwrap();
assert!(r.iter().any(|c| c.content.contains("keep_common")));
let r2 = idx
.search(&crate::core::indexer::SearchQuery {
text: "drop_other".into(),
top_k: 5,
expand_graph: false,
compact: false,
..Default::default()
})
.await
.unwrap();
assert!(
!r2.iter().any(|c| c.content.contains("drop_other")),
"other-repo must not have been indexed"
);
}
#[tokio::test]
async fn reindex_walks_directory_and_emits_events() {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
fs::write(root.join("a.rs"), "fn a() {}").unwrap();
fs::write(root.join("b.py"), "def b():\n pass\n").unwrap();
fs::create_dir(root.join("target")).unwrap();
fs::write(root.join("target/skip.rs"), "fn skip() {}").unwrap();
let indexer = CodeIndexer::new("test".to_string(), root.clone());
let handle = Arc::new(IndexHandle::bare(
IndexId::new("test"),
Arc::new(tokio::sync::RwLock::new(indexer)),
root.clone(),
));
let progress = Arc::new(ReindexProgress::new());
spawn_reindex(handle, progress.clone(), false);
for _ in 0..100 {
if progress.status.load() == ReindexStatus::Complete {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
assert_eq!(progress.status.load(), ReindexStatus::Complete);
assert_eq!(progress.total_files.load(Ordering::Acquire), 2);
assert_eq!(progress.indexed.load(Ordering::Acquire), 2);
let events = progress.events.lock().await;
assert!(events
.first()
.map(|s| s.contains("\"start\""))
.unwrap_or(false));
assert!(events
.last()
.map(|s| s.contains("\"complete\""))
.unwrap_or(false));
}
#[tokio::test]
async fn context_embedding_populated_after_reindex() {
use crate::core::embed::{Embedder, MockEmbedder};
use crate::core::store::{UsearchStore, VectorStore};
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
fs::write(root.join("lib.rs"), "fn hello() {}\n").unwrap();
fs::write(
root.join("README.md"),
"# proj\n\nA test project for #112.\n",
)
.unwrap();
let dim = 32;
let embedder: Arc<dyn Embedder> = Arc::new(MockEmbedder::new(dim));
let store: Arc<dyn VectorStore> = Arc::new(UsearchStore::new(dim).expect("usearch new"));
let indexer = CodeIndexer::new("ctx-test", root.clone()).with_components(embedder, store);
let handle = Arc::new(IndexHandle::bare(
IndexId::new("ctx-test"),
Arc::new(tokio::sync::RwLock::new(indexer)),
root.clone(),
));
let progress = Arc::new(ReindexProgress::new());
spawn_reindex(handle.clone(), progress.clone(), false);
for _ in 0..100 {
if progress.status.load() == ReindexStatus::Complete {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
assert_eq!(progress.status.load(), ReindexStatus::Complete);
let ctx = handle.context_embedding.read().await.clone();
assert!(
ctx.is_some(),
"context_embedding must be populated when metadata is present and embedder is wired"
);
assert_eq!(ctx.unwrap().len(), dim, "embedding must have embedder dim");
let summary = handle.context_summary.read().await.clone();
assert!(summary.is_some(), "context_summary must be populated");
let s = summary.unwrap();
assert!(s.contains("proj") || s.contains("README"));
}
#[tokio::test]
async fn context_embedding_none_when_no_metadata() {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
fs::write(root.join("lib.rs"), "fn hello() {}\n").unwrap();
let indexer = CodeIndexer::new("no-meta", root.clone());
let handle = Arc::new(IndexHandle::bare(
IndexId::new("no-meta"),
Arc::new(tokio::sync::RwLock::new(indexer)),
root.clone(),
));
let progress = Arc::new(ReindexProgress::new());
spawn_reindex(handle.clone(), progress.clone(), false);
for _ in 0..100 {
if progress.status.load() == ReindexStatus::Complete {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
assert_eq!(progress.status.load(), ReindexStatus::Complete);
assert!(handle.context_embedding.read().await.is_none());
assert!(handle.context_summary.read().await.is_none());
}
}