use super::progress::ReindexProgress;
use super::semaphore::background_reindex_semaphore;
use super::stages::now_rfc3339;
use crate::core::registry::{IndexHandle, StageState, StageStatus};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
pub(super) fn spawn_deferred_embed_pass(handle: Arc<IndexHandle>, progress: Arc<ReindexProgress>) {
let index_id = handle.id.clone();
tokio::spawn(async move {
let _permit = match background_reindex_semaphore().acquire().await {
Ok(p) => p,
Err(_) => {
tracing::warn!(
"deferred_embed[{}]: background semaphore closed — skipping embed pass",
index_id.0,
);
return;
}
};
let total_chunks = {
let indexer = handle.indexer.read().await;
indexer.chunk_count()
};
tracing::info!(
"deferred_embed[{}]: starting background embed pass ({} chunks)",
index_id.0,
total_chunks,
);
{
let mut stages = handle.stages.write().await;
stages.semantic.started_at = Some(now_rfc3339());
stages.semantic.total = Some(total_chunks);
stages.semantic.embedded = Some(0);
}
progress
.push(serde_json::json!({
"event": "embed_start",
"index_id": index_id.0,
"total_chunks": total_chunks,
}))
.await;
let (progress_tx, mut progress_rx) = tokio::sync::mpsc::unbounded_channel::<(usize, u64)>();
let embedded_counter = Arc::new(AtomicUsize::new(0));
let counter_clone = Arc::clone(&embedded_counter);
let stages_clone = Arc::clone(&handle.stages);
let progress_updater = tokio::spawn(async move {
while let Some((wave_chunks, _ms)) = progress_rx.recv().await {
let n = counter_clone.fetch_add(wave_chunks, Ordering::AcqRel) + wave_chunks;
let mut stages = stages_clone.write().await;
stages.semantic.embedded = Some(n);
}
});
let result = {
let indexer = handle.indexer.read().await;
indexer.embed_deferred_chunks(Some(&progress_tx)).await
};
drop(progress_tx);
let _ = progress_updater.await;
match result {
Ok((embedded, total)) => {
{
let indexer = handle.indexer.read().await;
indexer.force_incremental_persist();
}
tracing::info!(
"deferred_embed[{}]: embedded {}/{} chunks — marking semantic Ready",
index_id.0,
embedded,
total,
);
{
let mut stages = handle.stages.write().await;
stages.semantic.status = StageStatus::Ready;
stages.semantic.completed_at = Some(now_rfc3339());
stages.semantic.embedded = Some(embedded);
stages.semantic.total = Some(total);
}
progress
.push(serde_json::json!({
"event": "embed_complete",
"index_id": index_id.0,
"embedded": embedded,
"total": total,
}))
.await;
}
Err(e) => {
let reason = format!("{e:#}");
tracing::error!(
"deferred_embed[{}]: embed pass failed — {reason}",
index_id.0,
);
{
let mut stages = handle.stages.write().await;
stages.semantic = StageState::failed(reason.clone());
}
progress
.push(serde_json::json!({
"event": "embed_error",
"index_id": index_id.0,
"message": reason,
}))
.await;
}
}
});
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::{indexer::CodeIndexer, registry::IndexId};
use crate::service::reindex::ReindexProgress;
use std::sync::Arc;
#[tokio::test]
async fn deferred_embed_pass_marks_semantic_ready_and_is_idempotent() {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
let indexer = CodeIndexer::new("defer-ready-test", root.clone());
let handle = Arc::new(crate::core::registry::IndexHandle::bare(
IndexId::new("defer-ready-test"),
Arc::new(tokio::sync::RwLock::new(indexer)),
root,
));
let progress = Arc::new(ReindexProgress::new());
spawn_deferred_embed_pass(handle.clone(), progress.clone());
for _ in 0..100 {
let stages = handle.stages.read().await;
if stages.semantic.status != crate::core::registry::StageStatus::Pending {
break;
}
drop(stages);
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
let stages = handle.stages.read().await;
assert_eq!(
stages.semantic.status,
crate::core::registry::StageStatus::Ready,
"deferred embed pass (no-embedder) must flip semantic to Ready"
);
}
#[tokio::test]
async fn failing_deferred_embed_pass_marks_semantic_failed() {
use crate::core::{
chunker::{ChunkType, RawChunk},
embed::Embedder,
indexer::ParsedBatch,
store::{UsearchStore, VectorStore},
};
use anyhow::bail;
use std::sync::Arc as StdArc;
struct FailingEmbedder;
#[async_trait::async_trait]
impl Embedder for FailingEmbedder {
async fn embed(&self, _text: &str) -> anyhow::Result<Vec<f32>> {
bail!("injected embed failure for test")
}
async fn embed_batch(&self, _texts: &[&str]) -> anyhow::Result<Vec<Vec<f32>>> {
bail!("injected embed failure for test")
}
fn dimension(&self) -> usize {
8
}
}
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
let store: StdArc<dyn VectorStore> =
StdArc::new(UsearchStore::new(8).expect("usearch new"));
let indexer = CodeIndexer::new("defer-fail-test", root.clone())
.with_components(StdArc::new(FailingEmbedder), store);
let parsed = ParsedBatch {
chunks: vec![RawChunk {
id: "test:1:1".into(),
file: "test.rs".into(),
start_line: 1,
end_line: 1,
content: "fn test_fn() {}".into(),
function_name: None,
language: Some("rust".into()),
chunk_type: ChunkType::Code,
calls: vec![],
inherits_from: vec![],
chunk_depth: 0,
parent_chunk_id: None,
child_chunk_ids: vec![],
nlp_keywords: vec![],
nlp_code_refs: vec![],
virtual_terms: vec![],
}],
embeddings: vec![None],
entities_by_file: vec![],
parse_ms: 0,
embed_ms: 0,
vector_count: 0,
};
indexer.commit_parsed_batch(parsed, false).await.ok();
let handle = Arc::new(crate::core::registry::IndexHandle::bare(
IndexId::new("defer-fail-test"),
Arc::new(tokio::sync::RwLock::new(indexer)),
root,
));
let progress = Arc::new(ReindexProgress::new());
spawn_deferred_embed_pass(handle.clone(), progress.clone());
for _ in 0..100 {
let stages = handle.stages.read().await;
if stages.semantic.status != crate::core::registry::StageStatus::Pending {
break;
}
drop(stages);
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
let stages = handle.stages.read().await;
assert_eq!(
stages.semantic.status,
crate::core::registry::StageStatus::Failed,
"failing deferred embed pass must flip semantic to Failed (issue #928)"
);
assert!(
stages.semantic.failure.is_some(),
"Failed stage must carry the failure reason"
);
}
#[tokio::test]
async fn deferred_embed_pass_pre_seeds_total_before_embedding() {
use crate::core::{
chunker::{ChunkType, RawChunk},
indexer::ParsedBatch,
};
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
let indexer = CodeIndexer::new("defer-total-test", root.clone());
let parsed = ParsedBatch {
chunks: vec![RawChunk {
id: "test:1:1".into(),
file: "test.rs".into(),
start_line: 1,
end_line: 1,
content: "fn total_test() {}".into(),
function_name: None,
language: Some("rust".into()),
chunk_type: ChunkType::Code,
calls: vec![],
inherits_from: vec![],
chunk_depth: 0,
parent_chunk_id: None,
child_chunk_ids: vec![],
nlp_keywords: vec![],
nlp_code_refs: vec![],
virtual_terms: vec![],
}],
embeddings: vec![None],
entities_by_file: vec![],
parse_ms: 0,
embed_ms: 0,
vector_count: 0,
};
indexer.commit_parsed_batch(parsed, false).await.ok();
let handle = Arc::new(crate::core::registry::IndexHandle::bare(
IndexId::new("defer-total-test"),
Arc::new(tokio::sync::RwLock::new(indexer)),
root,
));
let progress = Arc::new(ReindexProgress::new());
spawn_deferred_embed_pass(handle.clone(), progress.clone());
let mut total_seen: Option<usize> = None;
for _ in 0..200 {
let stages = handle.stages.read().await;
if stages.semantic.total.is_some() {
total_seen = stages.semantic.total;
break;
}
drop(stages);
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
assert_eq!(
total_seen,
Some(1),
"stages.semantic.total must be pre-seeded to 1 (the chunk count) \
before embed_deferred_chunks runs — so /indexes/:id/status shows \
real N/total progress even during embedding"
);
}
}