use crate::core::indexer::CommitTimings;
use crate::core::memguard::{current_rss_mb, memory_limit_mb};
use crate::core::registry::{IndexHandle, IndexId};
use crate::service::walker::{should_skip_content, walk_source_files};
use crossbeam_utils::atomic::AtomicCell;
use dashmap::DashMap;
use serde::Serialize;
use sha2::{Digest, Sha256};
use std::path::PathBuf;
use std::sync::{Arc, OnceLock};
use std::time::Instant;
use tokio::sync::{broadcast, Mutex, Semaphore};
fn reindex_semaphore() -> &'static Semaphore {
static SEM: OnceLock<Semaphore> = OnceLock::new();
SEM.get_or_init(|| Semaphore::new(1))
}
const REINDEX_BATCH_SIZE: usize = 128;
fn file_hashes() -> &'static DashMap<IndexId, Arc<DashMap<PathBuf, String>>> {
static FILE_HASHES: OnceLock<DashMap<IndexId, Arc<DashMap<PathBuf, String>>>> = OnceLock::new();
FILE_HASHES.get_or_init(DashMap::new)
}
fn hashes_for(id: &IndexId) -> Arc<DashMap<PathBuf, String>> {
file_hashes()
.entry(id.clone())
.or_insert_with(|| Arc::new(DashMap::new()))
.clone()
}
const MAX_FILE_HASHES_PER_INDEX: usize = 200_000;
fn shrink_hashes_if_needed(map: &DashMap<PathBuf, String>) {
let len = map.len();
if len <= MAX_FILE_HASHES_PER_INDEX {
return;
}
let target = MAX_FILE_HASHES_PER_INDEX * 9 / 10;
let to_remove = len.saturating_sub(target);
let keys: Vec<PathBuf> = map
.iter()
.take(to_remove)
.map(|e| e.key().clone())
.collect();
for k in keys {
map.remove(&k);
}
tracing::info!(
"file-hash cache exceeded {} entries — dropped {} to bound memory",
MAX_FILE_HASHES_PER_INDEX,
to_remove
);
}
const MAX_REPLAY_EVENTS: usize = 500;
const REINDEX_PROGRESS_TTL_SECS: u64 = 60;
fn hash_content(content: &str) -> String {
let mut hasher = Sha256::new();
hasher.update(content.as_bytes());
format!("{:x}", hasher.finalize())
}
const BROADCAST_CAPACITY: usize = 256;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum ReindexStatus {
Running,
Complete,
Failed,
}
pub struct ReindexProgress {
pub status: AtomicCell<ReindexStatus>,
pub total_files: std::sync::atomic::AtomicUsize,
pub indexed: std::sync::atomic::AtomicUsize,
pub total_chunks: std::sync::atomic::AtomicUsize,
pub errors: std::sync::atomic::AtomicUsize,
pub skipped: std::sync::atomic::AtomicUsize,
pub events: Arc<Mutex<Vec<String>>>,
pub sender: broadcast::Sender<String>,
}
impl ReindexProgress {
pub fn new() -> Self {
let (sender, _) = broadcast::channel(BROADCAST_CAPACITY);
Self {
status: AtomicCell::new(ReindexStatus::Running),
total_files: Default::default(),
indexed: Default::default(),
total_chunks: Default::default(),
errors: Default::default(),
skipped: Default::default(),
events: Arc::new(Mutex::new(Vec::new())),
sender,
}
}
pub async fn push(&self, event: serde_json::Value) {
let line = event.to_string();
{
let mut buf = self.events.lock().await;
if buf.len() >= MAX_REPLAY_EVENTS {
buf.remove(0);
}
buf.push(line.clone());
}
let _ = self.sender.send(line);
}
}
impl Default for ReindexProgress {
fn default() -> Self {
Self::new()
}
}
pub fn spawn_reindex(handle: Arc<IndexHandle>, progress: Arc<ReindexProgress>, force: bool) {
spawn_reindex_with_cleanup(handle, progress, force, None);
}
pub fn spawn_reindex_with_cleanup(
handle: Arc<IndexHandle>,
progress: Arc<ReindexProgress>,
force: bool,
cleanup_map: Option<Arc<DashMap<IndexId, Arc<ReindexProgress>>>>,
) {
let cleanup_id = handle.id.clone();
tokio::spawn(async move {
use std::sync::atomic::Ordering;
let _permit = reindex_semaphore()
.acquire()
.await
.expect("reindex semaphore is never closed");
let started = Instant::now();
let root = handle.root_path.clone();
let index_id: IndexId = handle.id.clone();
let walk = walk_source_files(&root);
let total = walk.files.len();
progress.total_files.store(total, Ordering::Release);
progress
.push(serde_json::json!({
"event": "start",
"total_files": total,
"index_id": index_id.0,
"root_path": root,
"force": force,
}))
.await;
let hashes = hashes_for(&index_id);
if force {
hashes.clear();
}
let mut total_parse_ms: u64 = 0;
let mut total_embed_ms: u64 = 0;
let mut total_bm25_ms: u64 = 0;
let mut total_vector_upsert_ms: u64 = 0;
let mut total_vector_count: usize = 0;
let mem_limit = memory_limit_mb();
let mut peak_rss_mb: u64 = current_rss_mb().unwrap_or(0);
let mut mem_limit_hit: bool = false;
const MEM_CHECK_EVERY_N_BATCHES: usize = 10;
for (batch_index, batch) in walk.files.chunks(REINDEX_BATCH_SIZE).enumerate() {
if batch_index.is_multiple_of(MEM_CHECK_EVERY_N_BATCHES) {
if let Some(rss) = current_rss_mb() {
if rss > peak_rss_mb {
peak_rss_mb = rss;
}
if let Some(limit) = mem_limit {
if rss >= limit {
tracing::warn!(
"reindex: memory limit hit (rss={}MB >= limit={}MB), \
skipping remaining batches for index {}",
rss,
limit,
index_id.0
);
mem_limit_hit = true;
break;
}
}
}
}
let read_futs = batch.iter().map(|path| {
let path = path.clone();
async move {
let content = tokio::fs::read_to_string(&path).await;
(path, content)
}
});
let read_results = futures::future::join_all(read_futs).await;
let mut to_index: Vec<(String, String)> = Vec::with_capacity(batch.len());
let mut to_index_paths: Vec<PathBuf> = Vec::with_capacity(batch.len());
let mut new_hashes: Vec<(PathBuf, String)> = Vec::with_capacity(batch.len());
for (path, content_res) in read_results {
let rel = path
.strip_prefix(&root)
.unwrap_or(&path)
.display()
.to_string();
let content = match content_res {
Ok(c) => c,
Err(e) => {
progress.errors.fetch_add(1, Ordering::Release);
progress
.push(serde_json::json!({
"event": "error",
"file": rel,
"message": format!("read: {e}"),
"indexed": progress.indexed.load(Ordering::Acquire),
"total_files": total,
}))
.await;
continue;
}
};
if should_skip_content(&path, &content) {
tracing::debug!("reindex: skipping minified content in {}", path.display());
progress.skipped.fetch_add(1, Ordering::Release);
let indexed = progress.indexed.fetch_add(1, Ordering::Release) + 1;
progress
.push(serde_json::json!({
"event": "skip",
"file": rel,
"reason": "minified",
"indexed": indexed,
"total_files": total,
}))
.await;
continue;
}
let h = hash_content(&content);
if hashes.get(&path).map(|prev| *prev == h).unwrap_or(false) {
progress.skipped.fetch_add(1, Ordering::Release);
let indexed = progress.indexed.fetch_add(1, Ordering::Release) + 1;
progress
.push(serde_json::json!({
"event": "skip",
"file": rel,
"indexed": indexed,
"total_files": total,
}))
.await;
continue;
}
let path_str = path.to_string_lossy().to_string();
to_index.push((path_str, content));
to_index_paths.push(path.clone());
new_hashes.push((path, h));
}
if to_index.is_empty() {
continue;
}
let result: anyhow::Result<(u64, u64, usize, CommitTimings)> = async {
let parsed = {
let indexer = handle.indexer.read().await;
indexer.parse_and_embed_files(to_index.clone()).await?
};
let parse_ms = parsed.parse_ms;
let embed_ms = parsed.embed_ms;
let vector_count = parsed.vector_count;
let indexer = handle.indexer.write().await;
let commit = indexer.commit_parsed_batch(parsed, true).await?;
Ok((parse_ms, embed_ms, vector_count, commit))
}
.await;
match result {
Ok((parse_ms, embed_ms, vector_count, commit)) => {
let new_chunks = commit.chunks;
total_parse_ms = total_parse_ms.saturating_add(parse_ms);
total_embed_ms = total_embed_ms.saturating_add(embed_ms);
total_bm25_ms = total_bm25_ms.saturating_add(commit.bm25_ms);
total_vector_upsert_ms =
total_vector_upsert_ms.saturating_add(commit.vector_upsert_ms);
total_vector_count = total_vector_count.saturating_add(vector_count);
progress
.total_chunks
.fetch_add(new_chunks, Ordering::Release);
let batch_files = to_index.len();
let indexed =
progress.indexed.fetch_add(batch_files, Ordering::Release) + batch_files;
let elapsed_ms = started.elapsed().as_millis() as u64;
let chunks_per_sec = (progress.total_chunks.load(Ordering::Acquire) as u64
* 1000)
.checked_div(elapsed_ms)
.unwrap_or(0);
for (path, h) in new_hashes {
hashes.insert(path, h);
}
shrink_hashes_if_needed(&hashes);
progress
.push(serde_json::json!({
"event": "batch",
"batch_files": batch_files,
"batch_chunks": new_chunks,
"indexed": indexed,
"total_files": total,
"elapsed_ms": elapsed_ms,
"chunks_per_sec": chunks_per_sec,
}))
.await;
}
Err(e) => {
let files_in_batch: Vec<String> = to_index_paths
.iter()
.map(|p| p.strip_prefix(&root).unwrap_or(p).display().to_string())
.collect();
progress
.errors
.fetch_add(to_index_paths.len(), Ordering::Release);
progress
.push(serde_json::json!({
"event": "error",
"files": files_in_batch,
"message": format!("batch index: {e}"),
"indexed": progress.indexed.load(Ordering::Acquire),
"total_files": total,
}))
.await;
}
}
}
let kg_start = Instant::now();
let (symbol_count, edge_count) = {
let indexer = handle.indexer.read().await;
indexer.rebuild_symbol_graph_now().await;
let g = indexer.symbol_graph().await;
(g.node_count(), g.edge_count())
};
let kg_ms = kg_start.elapsed().as_millis() as u64;
progress.status.store(ReindexStatus::Complete);
let total_chunks = progress.total_chunks.load(Ordering::Acquire);
let elapsed_ms = started.elapsed().as_millis() as u64;
let chunks_per_sec = (total_chunks as u64 * 1000)
.checked_div(elapsed_ms)
.unwrap_or(0);
if let Some(rss) = current_rss_mb() {
if rss > peak_rss_mb {
peak_rss_mb = rss;
}
}
let indexed_final = progress.indexed.load(Ordering::Acquire);
tracing::info!(
"reindex complete: index={} files={} chunks={} elapsed_ms={} \
peak_rss_mb={} memory_limit_hit={}",
index_id.0,
indexed_final,
total_chunks,
elapsed_ms,
peak_rss_mb,
mem_limit_hit,
);
progress
.push(serde_json::json!({
"event": "complete",
"indexed": indexed_final,
"total_chunks": total_chunks,
"skipped": progress.skipped.load(Ordering::Acquire),
"errors": progress.errors.load(Ordering::Acquire),
"elapsed_ms": elapsed_ms,
"chunks_per_sec": chunks_per_sec,
"peak_rss_mb": peak_rss_mb,
"memory_limit_hit": mem_limit_hit,
"timings": {
"parse_ms": total_parse_ms,
"embed_ms": total_embed_ms,
"bm25_ms": total_bm25_ms,
"vector_upsert_ms": total_vector_upsert_ms,
"kg_ms": kg_ms,
"vector_count": total_vector_count,
"symbol_count": symbol_count,
"edge_count": edge_count,
},
}))
.await;
if let Some(map) = cleanup_map {
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_secs(REINDEX_PROGRESS_TTL_SECS)).await;
map.remove(&cleanup_id);
});
}
});
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::indexer::CodeIndexer;
use std::fs;
use std::sync::atomic::Ordering;
#[tokio::test]
async fn reindex_walks_directory_and_emits_events() {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
fs::write(root.join("a.rs"), "fn a() {}").unwrap();
fs::write(root.join("b.py"), "def b():\n pass\n").unwrap();
fs::create_dir(root.join("target")).unwrap();
fs::write(root.join("target/skip.rs"), "fn skip() {}").unwrap();
let indexer = CodeIndexer::new("test".to_string(), root.clone());
let handle = Arc::new(IndexHandle {
id: IndexId::new("test"),
indexer: Arc::new(tokio::sync::RwLock::new(indexer)),
root_path: root.clone(),
});
let progress = Arc::new(ReindexProgress::new());
spawn_reindex(handle, progress.clone(), false);
for _ in 0..100 {
if progress.status.load() == ReindexStatus::Complete {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
assert_eq!(progress.status.load(), ReindexStatus::Complete);
assert_eq!(progress.total_files.load(Ordering::Acquire), 2);
assert_eq!(progress.indexed.load(Ordering::Acquire), 2);
let events = progress.events.lock().await;
assert!(events
.first()
.map(|s| s.contains("\"start\""))
.unwrap_or(false));
assert!(events
.last()
.map(|s| s.contains("\"complete\""))
.unwrap_or(false));
}
}