use crate::core::indexer::{CommitTimings, ParsedBatch};
use crate::core::registry::{IndexHandle, IndexId};
use crate::service::walker::should_skip_content;
use dashmap::DashMap;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering as AtomicOrdering};
use std::sync::Arc;
use std::time::Instant;
use super::hash::{hash_content, shrink_hashes_if_needed, MAX_FILE_HASHES_PER_INDEX};
use super::hash_cache;
use super::progress::ReindexProgress;
use super::prune::to_corpus_relative_path;
pub(super) const REINDEX_BATCH_SIZE: usize = 128;
pub(super) static INPROCESS_EMBEDDER_EVER_READY: AtomicBool = AtomicBool::new(false);
#[cfg(test)]
pub(crate) fn reset_inprocess_embedder_flag_for_tests() {
INPROCESS_EMBEDDER_EVER_READY.store(false, AtomicOrdering::SeqCst);
}
#[cfg(test)]
pub(crate) fn inprocess_embedder_ever_ready_for_tests() -> bool {
INPROCESS_EMBEDDER_EVER_READY.load(AtomicOrdering::SeqCst)
}
#[derive(Clone)]
pub(super) struct BatchCtx {
pub handle: Arc<IndexHandle>,
pub progress: Arc<ReindexProgress>,
pub root: PathBuf,
pub index_id: IndexId,
pub hashes: Arc<DashMap<PathBuf, String>>,
pub mem_limit: Option<u64>,
pub mem_abort: Arc<AtomicBool>,
pub peak_rss_atomic: Arc<AtomicU64>,
pub started: Instant,
pub total: usize,
pub lexical_only: bool,
pub defer_embed: bool,
pub embedder_pid_slot: Option<Arc<AtomicU32>>,
}
#[derive(Default)]
pub(super) struct BatchOutcome {
pub parse_ms: u64,
pub embed_ms: u64,
pub bm25_ms: u64,
pub vector_upsert_ms: u64,
pub vector_count: usize,
pub mem_limit_hit: bool,
pub chunks_dropped_by_cap: usize,
}
pub(super) struct ParsedReadyBatch {
pub parsed: ParsedBatch,
pub new_hashes: Vec<(PathBuf, String)>,
pub batch_files: usize,
pub changed_corpus_paths: Vec<String>,
}
pub(super) struct BatchPayload {
pub to_index: Vec<(String, String)>,
pub to_index_paths: Vec<PathBuf>,
pub new_hashes: Vec<(PathBuf, String)>,
pub changed_corpus_paths: Vec<String>,
}
#[allow(dead_code)]
pub(super) async fn process_one_batch(ctx: &BatchCtx, batch: &[PathBuf]) -> BatchOutcome {
let Some(parsed) = prepare_and_parse_batch(ctx, batch).await else {
return BatchOutcome::default();
};
commit_parsed_and_finalize(ctx, parsed).await
}
pub(super) async fn prepare_and_parse_batch(
ctx: &BatchCtx,
batch: &[PathBuf],
) -> Option<ParsedReadyBatch> {
let payload = prepare_batch_payload(ctx, batch).await;
if payload.to_index.is_empty() {
return None;
}
let batch_files = payload.to_index.len();
let to_index = payload.to_index;
let first_batch_ever = ctx.progress.indexed.load(AtomicOrdering::Acquire) == 0;
let needs_embedder_init = !ctx.lexical_only
&& !ctx.defer_embed
&& if let Some(slot) = ctx.embedder_pid_slot.as_ref() {
slot.load(AtomicOrdering::Acquire) == 0
} else {
first_batch_ever && !INPROCESS_EMBEDDER_EVER_READY.load(AtomicOrdering::Acquire)
};
if needs_embedder_init {
ctx.progress
.push(serde_json::json!({
"event": "embedder_init",
"index_id": ctx.index_id.0,
}))
.await;
}
let parsed = {
let indexer = ctx.handle.indexer.read().await;
let result = if ctx.lexical_only || ctx.defer_embed {
indexer.parse_files_only(to_index).await
} else {
use crate::core::indexer::PROGRESS_CHUNK_INTERVAL;
use std::sync::atomic::Ordering;
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<(usize, u64)>();
let parse_result = indexer.parse_and_embed_files_tracked(to_index, tx).await;
while let Ok((wave_chunks, wave_ms)) = rx.try_recv() {
if wave_chunks >= PROGRESS_CHUNK_INTERVAL {
let cps = (wave_chunks as u64 * 1000)
.checked_div(wave_ms.max(1))
.unwrap_or(0);
ctx.progress
.push(serde_json::json!({
"event": "chunk_progress",
"chunks_done": wave_chunks as u64,
"chunks_per_sec": cps,
"embed_ms": wave_ms,
"indexed": ctx.progress.indexed.load(Ordering::Acquire),
"total_files": ctx.total,
}))
.await;
}
}
parse_result
};
match result {
Ok(p) => p,
Err(e) => {
drop(indexer);
emit_batch_error(ctx, &payload.to_index_paths, e).await;
return None;
}
}
};
if needs_embedder_init {
ctx.progress
.push(serde_json::json!({
"event": "embedder_ready",
"index_id": ctx.index_id.0,
}))
.await;
if ctx.embedder_pid_slot.is_none() {
INPROCESS_EMBEDDER_EVER_READY.store(true, AtomicOrdering::Release);
}
}
if !ctx.lexical_only && !ctx.defer_embed && parsed.vector_count > 0 {
use std::sync::atomic::Ordering;
let batch_chunks = parsed.chunks.len() as u64;
let chunks_per_sec = (batch_chunks * 1000)
.checked_div(parsed.embed_ms.max(1))
.unwrap_or(0);
ctx.progress
.push(serde_json::json!({
"event": "chunk_progress",
"chunks_done": batch_chunks,
"chunks_per_sec": chunks_per_sec,
"embed_ms": parsed.embed_ms,
"indexed": ctx.progress.indexed.load(Ordering::Acquire),
"total_files": ctx.total,
}))
.await;
}
Some(ParsedReadyBatch {
parsed,
new_hashes: payload.new_hashes,
batch_files,
changed_corpus_paths: payload.changed_corpus_paths,
})
}
pub(super) async fn commit_parsed_and_finalize(
ctx: &BatchCtx,
ready: ParsedReadyBatch,
) -> BatchOutcome {
let ParsedReadyBatch {
parsed,
new_hashes,
batch_files,
changed_corpus_paths,
} = ready;
let parse_ms = parsed.parse_ms;
let embed_ms = parsed.embed_ms;
let vector_count = parsed.vector_count;
let commit = {
let indexer = ctx.handle.indexer.write().await;
let mut remove_failed_files: std::collections::HashSet<&str> =
std::collections::HashSet::new();
let mut remove_failures: usize = 0;
for file_path in &changed_corpus_paths {
if let Err(e) = indexer.remove_file_no_kg_rebuild(file_path).await {
remove_failed_files.insert(file_path.as_str());
remove_failures += 1;
tracing::warn!(
index_id = %ctx.index_id.0,
file = %file_path,
error = %e,
remove_failures,
"reindex: #855 pre-commit remove failed — skipping insert for \
this file to avoid duplicate chunks (issue #1002); stale \
chunks will persist until next --force reindex"
);
}
}
let parsed = if remove_failures > 0 {
parsed.retain_files(|f| !remove_failed_files.contains(f))
} else {
parsed
};
match indexer.commit_parsed_batch(parsed, true).await {
Ok(c) => c,
Err(e) => {
drop(indexer);
let placeholder_paths: Vec<PathBuf> =
new_hashes.iter().map(|(p, _)| p.clone()).collect();
emit_batch_error(ctx, &placeholder_paths, e).await;
return BatchOutcome::default();
}
}
};
apply_successful_commit(ctx, new_hashes, batch_files, &commit).await;
let mem_limit_hit = check_post_commit_memory(ctx);
BatchOutcome {
parse_ms,
embed_ms,
bm25_ms: commit.bm25_ms,
vector_upsert_ms: commit.vector_upsert_ms,
vector_count,
mem_limit_hit,
chunks_dropped_by_cap: commit.chunks_dropped_by_cap,
}
}
pub(super) async fn prepare_batch_payload(ctx: &BatchCtx, batch: &[PathBuf]) -> BatchPayload {
use std::sync::atomic::Ordering;
let read_futs = batch.iter().map(|path| {
let path = path.clone();
async move {
let content = tokio::fs::read_to_string(&path).await;
(path, content)
}
});
let read_results = futures::future::join_all(read_futs).await;
let mut to_index: Vec<(String, String)> = Vec::with_capacity(batch.len());
let mut to_index_paths: Vec<PathBuf> = Vec::with_capacity(batch.len());
let mut new_hashes: Vec<(PathBuf, String)> = Vec::with_capacity(batch.len());
let mut changed_corpus_paths: Vec<String> = Vec::with_capacity(batch.len());
for (path, content_res) in read_results {
let rel = to_corpus_relative_path(&ctx.root, &path);
let content = match content_res {
Ok(c) => c,
Err(e) => {
ctx.progress.errors.fetch_add(1, Ordering::Release);
ctx.progress
.push(serde_json::json!({
"event": "error",
"file": rel,
"message": format!("read: {e}"),
"indexed": ctx.progress.indexed.load(Ordering::Acquire),
"total_files": ctx.total,
}))
.await;
continue;
}
};
if should_skip_content(&path, &content) {
tracing::debug!("reindex: skipping minified content in {}", path.display());
emit_skip(ctx, &rel, Some("minified")).await;
continue;
}
let h = hash_content(&content);
let rel_path = PathBuf::from(&rel);
if ctx
.hashes
.get(&rel_path)
.map(|prev| *prev == h)
.unwrap_or(false)
{
emit_skip(ctx, &rel, None).await;
continue;
}
let path_str = rel.clone();
to_index.push((path_str, content));
to_index_paths.push(path.clone());
new_hashes.push((rel_path, h));
changed_corpus_paths.push(rel);
}
BatchPayload {
to_index,
to_index_paths,
new_hashes,
changed_corpus_paths,
}
}
pub(super) async fn emit_skip(ctx: &BatchCtx, rel: &str, reason: Option<&str>) {
use std::sync::atomic::Ordering;
ctx.progress.skipped.fetch_add(1, Ordering::Release);
let indexed = ctx.progress.indexed.fetch_add(1, Ordering::Release) + 1;
let mut event = serde_json::json!({
"event": "skip",
"file": rel,
"indexed": indexed,
"total_files": ctx.total,
});
if let Some(r) = reason {
event["reason"] = serde_json::Value::String(r.to_string());
}
ctx.progress.push(event).await;
}
pub(super) async fn emit_batch_error(
ctx: &BatchCtx,
to_index_paths: &[PathBuf],
err: anyhow::Error,
) {
use std::sync::atomic::Ordering;
let files_in_batch: Vec<String> = to_index_paths
.iter()
.map(|p| to_corpus_relative_path(&ctx.root, p))
.collect();
ctx.progress
.errors
.fetch_add(to_index_paths.len(), Ordering::Release);
ctx.progress
.push(serde_json::json!({
"event": "error",
"files": files_in_batch,
"message": format!("batch index: {err}"),
"indexed": ctx.progress.indexed.load(Ordering::Acquire),
"total_files": ctx.total,
}))
.await;
}
pub(super) async fn apply_successful_commit(
ctx: &BatchCtx,
new_hashes: Vec<(PathBuf, String)>,
batch_files: usize,
commit: &CommitTimings,
) {
use std::sync::atomic::Ordering;
let new_chunks = commit.chunks;
ctx.progress
.total_chunks
.fetch_add(new_chunks, Ordering::Release);
let indexed = ctx
.progress
.indexed
.fetch_add(batch_files, Ordering::Release)
+ batch_files;
let elapsed_ms = ctx.started.elapsed().as_millis() as u64;
let chunks_per_sec = (ctx.progress.total_chunks.load(Ordering::Acquire) as u64 * 1000)
.checked_div(elapsed_ms)
.unwrap_or(0);
for (path, h) in &new_hashes {
ctx.hashes.insert(path.clone(), h.clone());
}
shrink_hashes_if_needed(&ctx.hashes);
hash_cache::persist_batch(
&ctx.handle,
&new_hashes,
MAX_FILE_HASHES_PER_INDEX,
ctx.hashes.len(),
)
.await;
ctx.progress
.push(serde_json::json!({
"event": "batch",
"batch_files": batch_files,
"batch_chunks": new_chunks,
"indexed": indexed,
"total_files": ctx.total,
"elapsed_ms": elapsed_ms,
"chunks_per_sec": chunks_per_sec,
}))
.await;
}
pub(super) fn check_post_commit_memory(ctx: &BatchCtx) -> bool {
use crate::core::memguard::current_rss_mb;
let Some(limit) = ctx.mem_limit else {
return false;
};
let Some(rss) = current_rss_mb() else {
return false;
};
let prev_peak = ctx.peak_rss_atomic.load(AtomicOrdering::Acquire);
if rss > prev_peak {
ctx.peak_rss_atomic.store(rss, AtomicOrdering::Release);
}
if rss >= limit {
tracing::warn!(
"reindex: memory limit hit after commit \
(rss={}MB >= limit={}MB) — skipping \
remaining batches for index {}",
rss,
limit,
ctx.index_id.0
);
ctx.mem_abort.store(true, AtomicOrdering::Release);
return true;
}
false
}