use std::sync::Arc;
use anyhow::{Context, Result};
use crate::core::chunker::RawChunk;
use crate::core::entity::RawEntity;
use crate::core::symbol_graph::SymbolGraph;
use super::super::{max_chunks_per_index, CodeIndexer, CommitTimings, ParsedBatch};
impl CodeIndexer {
pub async fn commit_parsed_batch(
&self,
parsed: ParsedBatch,
defer_graph_rebuild: bool,
) -> Result<CommitTimings> {
self.ensure_chunks_loaded().await;
self.touch_activity();
let ParsedBatch {
chunks: mut all_chunks,
mut embeddings,
entities_by_file,
parse_ms: _,
embed_ms: _,
vector_count: _,
} = parsed;
let cap = max_chunks_per_index();
let pre_filter_dropped = {
let corpus = self.chunks.read().await;
let mut keep_mask: Vec<bool> = Vec::with_capacity(all_chunks.len());
let mut new_count = corpus.len();
let mut dropped = 0usize;
for chunk in &all_chunks {
let is_update = corpus.contains_key(&chunk.id);
if is_update {
keep_mask.push(true);
} else if new_count < cap {
new_count += 1;
keep_mask.push(true);
} else {
dropped += 1;
keep_mask.push(false);
}
}
drop(corpus);
if dropped > 0 {
let mut kept_chunks: Vec<RawChunk> = Vec::with_capacity(all_chunks.len() - dropped);
let mut kept_embeddings: Vec<Option<Vec<f32>>> =
Vec::with_capacity(all_chunks.len() - dropped);
for ((chunk, vec_opt), keep) in all_chunks
.drain(..)
.zip(embeddings.drain(..))
.zip(keep_mask)
{
if keep {
kept_chunks.push(chunk);
kept_embeddings.push(vec_opt);
}
}
all_chunks = kept_chunks;
embeddings = kept_embeddings;
}
dropped
};
if pre_filter_dropped > 0 {
tracing::warn!(
"index '{}' chunk cap ({}) reached — pre-filtered {} chunks before commit \
(prevents leak into BM25/HNSW/embedding cache)",
self.index_id,
cap,
pre_filter_dropped
);
}
let chunk_total = all_chunks.len();
if chunk_total == 0 {
self.commit_entities(entities_by_file).await;
return Ok(CommitTimings {
chunks_dropped_by_cap: pre_filter_dropped,
..CommitTimings::default()
});
}
let vec_start = std::time::Instant::now();
self.commit_vectors_batch(&all_chunks, &embeddings).await?;
let vector_upsert_ms = vec_start.elapsed().as_millis() as u64;
let bm25_start = std::time::Instant::now();
self.commit_bm25_batch(&all_chunks).await;
let bm25_ms = bm25_start.elapsed().as_millis() as u64;
self.commit_embeddings_cache(&all_chunks, embeddings).await;
if self.corpus.is_some() {
self.commit_corpus_to_redb(&all_chunks, &entities_by_file)
.await;
}
self.commit_corpus(&mut all_chunks).await;
self.commit_entities(entities_by_file).await;
let kg_ms = if defer_graph_rebuild {
0
} else {
let kg_start = std::time::Instant::now();
self.rebuild_symbol_graph().await;
kg_start.elapsed().as_millis() as u64
};
self.spawn_incremental_persist(false);
Ok(CommitTimings {
chunks: chunk_total,
bm25_ms,
vector_upsert_ms,
kg_ms,
chunks_dropped_by_cap: pre_filter_dropped,
})
}
pub(crate) async fn commit_vectors_batch(
&self,
chunks: &[RawChunk],
embeddings: &[Option<Vec<f32>>],
) -> Result<()> {
let Some(store) = &self.store else {
return Ok(());
};
let mut items: Vec<(String, Vec<f32>)> = Vec::new();
for (chunk, vec_opt) in chunks.iter().zip(embeddings.iter()) {
let Some(v) = vec_opt.as_ref() else {
continue;
};
if v.iter().any(|x| x.is_nan()) {
tracing::warn!(
chunk_id = %chunk.id,
"commit_vectors_batch: NaN component in embedding vector — \
skipping HNSW upsert for this chunk (issue #764). \
This indicates a sidecar or model defect; \
check embedderd logs."
);
continue;
}
if v.iter().all(|x| *x == 0.0_f32) {
tracing::warn!(
chunk_id = %chunk.id,
"commit_vectors_batch: all-zero embedding vector — \
skipping HNSW upsert for this chunk (issue #764). \
This indicates a sidecar or model defect; \
check embedderd logs."
);
continue;
}
items.push((chunk.id.clone(), v.clone()));
}
if items.is_empty() {
return Ok(());
}
store
.upsert_batch(&items)
.await
.context("batch upsert chunk vectors")
}
pub(crate) async fn commit_bm25_batch(&self, chunks: &[RawChunk]) {
let mut bm25 = self.bm25.write().await;
for chunk in chunks {
let text = Self::bm25_doc_text(chunk);
bm25.upsert_document(&chunk.id, &text);
}
}
pub(crate) async fn commit_embeddings_cache(
&self,
chunks: &[RawChunk],
embeddings: Vec<Option<Vec<f32>>>,
) {
if self.embedder.is_none() {
return;
}
let mut emb_cache = self.chunk_embeddings.write().await;
for (chunk, vec_opt) in chunks.iter().zip(embeddings) {
if let Some(vec) = vec_opt {
emb_cache.put(chunk.id.clone(), vec);
}
}
}
pub(crate) async fn commit_corpus(&self, chunks: &mut Vec<RawChunk>) {
let cap = max_chunks_per_index();
let mut corpus = self.chunks.write().await;
let mut dropped = 0usize;
for chunk in chunks.drain(..) {
if !corpus.contains_key(&chunk.id) && corpus.len() >= cap {
dropped += 1;
continue;
}
corpus.insert(chunk.id.clone(), chunk);
}
if dropped > 0 {
tracing::warn!(
"index '{}' chunk cap ({}) reached — dropped {} new chunks in batch",
self.index_id,
cap,
dropped
);
}
}
pub(crate) async fn commit_corpus_to_redb(
&self,
chunks: &[RawChunk],
entities_by_file: &[(String, Vec<RawEntity>)],
) {
let Some(corpus) = self.corpus.clone() else {
return;
};
let chunks = chunks.to_vec();
let entities = entities_by_file.to_vec();
let index_id = self.index_id.clone();
let result = tokio::task::spawn_blocking(move || -> anyhow::Result<()> {
corpus.upsert_batch(&chunks, &entities)
})
.await;
match result {
Ok(Ok(())) => {}
Ok(Err(e)) => tracing::warn!(
"index '{index_id}': redb corpus write failed ({e}) — \
in-memory commit succeeded; on-disk corpus will re-converge \
on the next batch or shutdown flush"
),
Err(e) => tracing::warn!("index '{index_id}': redb corpus write task panicked ({e})"),
}
}
pub(crate) async fn commit_entities(&self, entities_by_file: Vec<(String, Vec<RawEntity>)>) {
let mut emap = self.entities.write().await;
for (path, ents) in entities_by_file {
emap.insert(path, ents);
}
}
pub fn chunk_count(&self) -> usize {
self.chunks.try_read().map(|g| g.len()).unwrap_or(0)
}
pub async fn symbol_graph(&self) -> Arc<SymbolGraph> {
Arc::clone(&*self.symbol_graph.read().await)
}
pub fn corpus_arc(&self) -> Option<Arc<crate::core::corpus::CorpusStore>> {
self.corpus.as_ref().map(Arc::clone)
}
}