pub(crate) mod commit;
pub(crate) mod embed;
use anyhow::{Context, Result};
use crate::core::chunker::{chunk_ast, RawChunk};
use crate::core::entity::RawEntity;
use crate::core::symbol_graph::{ChunkTuple, SymbolGraph};
use super::{populate_virtual_terms, CodeIndexer, ParsedBatch};
pub(crate) const PROGRESS_CHUNK_INTERVAL: usize = 32;
impl CodeIndexer {
pub(super) async fn rebuild_symbol_graph(&self) {
let kg_cap = crate::core::symbol_graph::max_kg_nodes();
let chunks = self.chunks.read().await;
let snapshot_cap = if kg_cap == 0 {
chunks.len()
} else {
(kg_cap.saturating_mul(2)).min(chunks.len())
};
let mut all_tuples: Vec<ChunkTuple> = chunks
.values()
.map(|c| {
(
c.id.clone(),
c.file.clone(),
c.function_name.clone(),
c.calls.clone(),
c.inherits_from.clone(),
c.chunk_type.clone(),
)
})
.collect();
drop(chunks);
all_tuples.sort_unstable_by(|a, b| a.1.cmp(&b.1).then_with(|| a.0.cmp(&b.0)));
if snapshot_cap < all_tuples.len() {
tracing::warn!(
index_id = %self.index_id,
total_chunks = all_tuples.len(),
snapshot_cap,
"kg: snapshot truncated to {} chunks (2×MAX_KG_NODES={}); \
symbols in the dropped portion will have no KG edges this boot. \
Raise TRUSTY_MAX_KG_NODES or run --force reindex to rebuild the \
graph at full size. (issue #824)",
snapshot_cap,
snapshot_cap / 2,
);
}
let tuples: Vec<ChunkTuple> = all_tuples.into_iter().take(snapshot_cap).collect();
let entities_snapshot: Vec<(String, Vec<crate::core::entity::RawEntity>)> = {
let ents = self.entities.read().await;
ents.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
};
let new_graph = std::sync::Arc::new(SymbolGraph::build_from_chunks_with_entities(
&tuples,
&entities_snapshot,
));
drop(tuples);
drop(entities_snapshot);
let new_graph = crate::core::symbol_graph::save_then_merge_contrib(
new_graph,
self.corpus.clone(),
self.index_id.clone(),
)
.await;
*self.symbol_graph.write().await = new_graph;
}
pub async fn add_chunk(&self, chunk: RawChunk) -> Result<()> {
self.add_chunk_inner(chunk).await?;
self.rebuild_symbol_graph().await;
Ok(())
}
pub(super) async fn add_chunk_inner(&self, chunk: RawChunk) -> Result<()> {
self.ensure_chunks_loaded().await;
self.touch_activity();
let id = chunk.id.clone();
{
let chunks = self.chunks.read().await;
let cap = super::max_chunks_per_index();
if !chunks.contains_key(&id) && chunks.len() >= cap {
tracing::warn!(
"index '{}' chunk cap ({}) reached — skipping chunk {}",
self.index_id,
cap,
id
);
return Ok(());
}
}
if let (Some(embedder), Some(store)) = (&self.embedder, &self.store) {
let vec = embedder
.embed(&chunk.content)
.await
.context("embed chunk content")?;
store
.upsert(&id, vec.clone())
.await
.context("upsert chunk vector")?;
self.chunk_embeddings.write().await.put(id.clone(), vec);
}
let bm25_text = Self::bm25_doc_text(&chunk);
self.bm25.write().await.upsert_document(&id, &bm25_text);
self.chunks.write().await.insert(id, chunk);
Ok(())
}
pub async fn index_file(&self, file_path: &str, content: &str) -> Result<()> {
let (mut chunks, entities) = chunk_ast(file_path, content);
populate_virtual_terms(&mut chunks, &entities);
let chunk_contents: Vec<String> = chunks.iter().map(|c| c.content.clone()).collect();
if !chunks.is_empty() {
let embeddings = self.embed_chunks_in_batches(&chunks, None).await?;
let parsed = ParsedBatch {
chunks,
embeddings,
entities_by_file: Vec::new(),
parse_ms: 0,
embed_ms: 0,
vector_count: 0,
};
self.commit_parsed_batch(parsed, true).await?;
}
let all_entities = self
.enrich_with_nlp_entities(file_path, content, &chunk_contents, entities)
.await;
self.entities
.write()
.await
.insert(file_path.to_string(), all_entities);
self.rebuild_symbol_graph().await;
Ok(())
}
async fn enrich_with_nlp_entities(
&self,
file_path: &str,
content: &str,
#[cfg_attr(not(feature = "clustering"), allow(unused_variables))]
chunk_contents: &[String],
base_entities: Vec<RawEntity>,
) -> Vec<RawEntity> {
let doc_text = crate::core::ner::extract_doc_comments(content);
let ner_entities = self.ner.extract(&doc_text, file_path);
if !ner_entities.is_empty() {
tracing::debug!(
"ner: {} NaturalLanguagePhrase entities for {}",
ner_entities.len(),
file_path
);
}
let mut all_entities = base_entities;
all_entities.extend(ner_entities);
#[cfg(feature = "clustering")]
if let Some(embedder) = &self.embedder {
let refs: Vec<&str> = chunk_contents.iter().map(|s| s.as_str()).collect();
let cluster_entities = crate::core::concept_cluster::cluster_concepts_from_contents(
&refs,
embedder.as_ref(),
file_path,
)
.await;
if !cluster_entities.is_empty() {
tracing::debug!(
"concept_cluster: {} ConceptCluster entities for {}",
cluster_entities.len(),
file_path
);
all_entities.extend(cluster_entities);
}
}
all_entities
}
pub async fn index_files_batch(&self, files: &[(String, String)]) -> Result<usize> {
self.index_files_batch_inner(files, false).await
}
pub async fn index_files_batch_no_rebuild(&self, files: &[(String, String)]) -> Result<usize> {
self.index_files_batch_inner(files, true).await
}
pub async fn rebuild_symbol_graph_now(&self) {
self.rebuild_symbol_graph().await;
}
async fn index_files_batch_inner(
&self,
files: &[(String, String)],
defer_graph_rebuild: bool,
) -> Result<usize> {
if files.is_empty() {
return Ok(0);
}
let parsed = self.parse_and_embed_files(files.to_vec()).await?;
let timings = self
.commit_parsed_batch(parsed, defer_graph_rebuild)
.await?;
Ok(timings.chunks)
}
pub async fn parse_and_embed_files(&self, files: Vec<(String, String)>) -> Result<ParsedBatch> {
self.parse_files_inner(files, true, None).await
}
pub async fn parse_and_embed_files_tracked(
&self,
files: Vec<(String, String)>,
progress_tx: tokio::sync::mpsc::UnboundedSender<(usize, u64)>,
) -> Result<ParsedBatch> {
self.parse_files_inner(files, true, Some(progress_tx)).await
}
pub async fn parse_files_only(&self, files: Vec<(String, String)>) -> Result<ParsedBatch> {
self.parse_files_inner(files, false, None).await
}
async fn parse_files_inner(
&self,
files: Vec<(String, String)>,
embed: bool,
progress_tx: Option<tokio::sync::mpsc::UnboundedSender<(usize, u64)>>,
) -> Result<ParsedBatch> {
if files.is_empty() {
return Ok(ParsedBatch::default());
}
let parse_start = std::time::Instant::now();
let parsed = Self::parse_files_parallel(files).await?;
let mut all_chunks: Vec<RawChunk> = Vec::new();
let mut entities_by_file: Vec<(String, Vec<RawEntity>)> = Vec::with_capacity(parsed.len());
for (path, chunks, entities) in parsed {
all_chunks.extend(chunks);
entities_by_file.push((path, entities));
}
let parse_ms = parse_start.elapsed().as_millis() as u64;
let (embeddings, embed_ms, vector_count) = if embed {
let embed_start = std::time::Instant::now();
let embeddings = self
.embed_chunks_in_batches(&all_chunks, progress_tx.as_ref())
.await?;
let embed_ms = embed_start.elapsed().as_millis() as u64;
let vector_count = embeddings.iter().filter(|e| e.is_some()).count();
(embeddings, embed_ms, vector_count)
} else {
let embeddings: Vec<Option<Vec<f32>>> = vec![None; all_chunks.len()];
(embeddings, 0, 0)
};
Ok(ParsedBatch {
chunks: all_chunks,
embeddings,
entities_by_file,
parse_ms,
embed_ms,
vector_count,
})
}
async fn parse_files_parallel(
files: Vec<(String, String)>,
) -> Result<Vec<(String, Vec<RawChunk>, Vec<RawEntity>)>> {
use rayon::prelude::*;
tokio::task::spawn_blocking(move || {
files
.par_iter()
.map(|(path, content)| {
let (mut chunks, entities) = chunk_ast(path, content);
populate_virtual_terms(&mut chunks, &entities);
(path.clone(), chunks, entities)
})
.collect()
})
.await
.context("batch parse task panicked")
}
pub async fn warm_embedder(&self) {
let Some(embedder) = &self.embedder else {
return;
};
match embedder.embed_batch(&["warm"]).await {
Ok(_) => {
tracing::debug!(
"warm_embedder[{}]: embedder pre-warm succeeded",
self.index_id
);
}
Err(e) => {
tracing::debug!(
"warm_embedder[{}]: embedder pre-warm failed ({e}) — \
will retry on first batch",
self.index_id
);
}
}
}
pub async fn embed_deferred_chunks(
&self,
progress_tx: Option<&tokio::sync::mpsc::UnboundedSender<(usize, u64)>>,
) -> anyhow::Result<(usize, usize)> {
let chunks: Vec<RawChunk> = {
self.ensure_chunks_loaded().await;
let map = self.chunks.read().await;
map.values().cloned().collect()
};
let total = chunks.len();
if total == 0 || self.embedder.is_none() || self.store.is_none() {
return Ok((0, total));
}
let embeddings = self.embed_chunks_in_batches(&chunks, progress_tx).await?;
self.commit_vectors_batch(&chunks, &embeddings).await?;
self.commit_embeddings_cache(&chunks, embeddings).await;
Ok((chunks.len(), total))
}
}
#[cfg(test)]
mod warm_embedder_tests {
use super::super::CodeIndexer;
#[tokio::test]
async fn warm_embedder_noop_without_embedder() {
let indexer = CodeIndexer::new("warm-test", "/tmp");
indexer.warm_embedder().await;
}
}
#[cfg(test)]
mod progress_interval_tests {
use super::PROGRESS_CHUNK_INTERVAL;
#[test]
fn progress_interval_constant_is_32() {
assert_eq!(PROGRESS_CHUNK_INTERVAL, 32);
}
}