use super::*;
use crate::core::indexer::CodeIndexer;
use crate::core::registry::{IndexHandle, IndexId, IndexStages, StageStatus};
use std::fs;
use std::sync::atomic::Ordering;
use std::sync::Arc;
#[tokio::test]
async fn reindex_honours_include_paths_filter() {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
fs::create_dir_all(root.join("api")).unwrap();
fs::create_dir_all(root.join("ui")).unwrap();
fs::write(root.join("api/keep.rs"), "fn keep_me() {}\n").unwrap();
fs::write(root.join("ui/drop.rs"), "fn drop_me() {}\n").unwrap();
let indexer = CodeIndexer::new("filter-test", root.clone());
let handle = Arc::new(IndexHandle {
id: IndexId::new("filter-test"),
indexer: Arc::new(tokio::sync::RwLock::new(indexer)),
root_path: root.clone(),
include_paths: vec![root.join("api")],
exclude_globs: vec![],
extensions: vec![],
domain_terms: vec![],
include_docs: false,
respect_gitignore: true,
extra_skip_dirs: crate::service::walker::default_extra_skip_dirs(),
data_file_max_bytes: crate::service::walker::DEFAULT_DATA_FILE_MAX_BYTES,
path_filter: vec![],
context_embedding: Arc::new(tokio::sync::RwLock::new(None)),
context_summary: Arc::new(tokio::sync::RwLock::new(None)),
indexed_head_sha: Arc::new(tokio::sync::RwLock::new(None)),
last_indexed_at: Arc::new(tokio::sync::RwLock::new(None)),
lexical_only: false,
skip_kg: false,
defer_embed: true,
stages: Arc::new(tokio::sync::RwLock::new(IndexStages::default())),
search_pressure: Arc::new(tokio::sync::Notify::new()),
walk_diagnostics: Arc::new(tokio::sync::RwLock::new(
crate::core::registry::WalkDiagnostics::default(),
)),
});
let progress = Arc::new(ReindexProgress::new());
spawn_reindex(handle.clone(), progress.clone(), false);
for _ in 0..100 {
if progress.status.load() == ReindexStatus::Complete {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
assert_eq!(progress.status.load(), ReindexStatus::Complete);
assert_eq!(
progress.total_files.load(Ordering::Acquire),
1,
"only api/keep.rs should be walked"
);
let idx = handle.indexer.read().await;
let r = idx
.search(&crate::core::indexer::SearchQuery {
text: "keep_me".into(),
top_k: 5,
expand_graph: false,
compact: false,
..Default::default()
})
.await
.unwrap();
assert!(r.iter().any(|c| c.content.contains("keep_me")));
let r2 = idx
.search(&crate::core::indexer::SearchQuery {
text: "drop_me".into(),
top_k: 5,
expand_graph: false,
compact: false,
..Default::default()
})
.await
.unwrap();
assert!(
!r2.iter().any(|c| c.content.contains("drop_me")),
"ui/drop.rs must not have been indexed"
);
}
#[tokio::test]
async fn reindex_honours_path_filter() {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
std::fs::create_dir_all(root.join("common-utils")).unwrap();
std::fs::create_dir_all(root.join("other-repo")).unwrap();
std::fs::write(root.join("common-utils/keep.rs"), "fn keep_common() {}\n").unwrap();
std::fs::write(root.join("other-repo/drop.rs"), "fn drop_other() {}\n").unwrap();
let indexer = CodeIndexer::new("pf-test", root.clone());
let handle = Arc::new(IndexHandle {
id: IndexId::new("pf-test"),
indexer: Arc::new(tokio::sync::RwLock::new(indexer)),
root_path: root.clone(),
include_paths: vec![],
exclude_globs: vec![],
extensions: vec![],
domain_terms: vec![],
include_docs: false,
respect_gitignore: true,
extra_skip_dirs: crate::service::walker::default_extra_skip_dirs(),
data_file_max_bytes: crate::service::walker::DEFAULT_DATA_FILE_MAX_BYTES,
path_filter: vec!["common-*".to_string()],
context_embedding: Arc::new(tokio::sync::RwLock::new(None)),
context_summary: Arc::new(tokio::sync::RwLock::new(None)),
indexed_head_sha: Arc::new(tokio::sync::RwLock::new(None)),
last_indexed_at: Arc::new(tokio::sync::RwLock::new(None)),
lexical_only: false,
skip_kg: false,
defer_embed: true,
stages: Arc::new(tokio::sync::RwLock::new(IndexStages::default())),
search_pressure: Arc::new(tokio::sync::Notify::new()),
walk_diagnostics: Arc::new(tokio::sync::RwLock::new(
crate::core::registry::WalkDiagnostics::default(),
)),
});
let progress = Arc::new(ReindexProgress::new());
spawn_reindex(handle.clone(), progress.clone(), false);
for _ in 0..100 {
if progress.status.load() == ReindexStatus::Complete {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
assert_eq!(progress.status.load(), ReindexStatus::Complete);
assert_eq!(
progress.total_files.load(Ordering::Acquire),
1,
"only common-utils/keep.rs should pass the path_filter"
);
let idx = handle.indexer.read().await;
let r = idx
.search(&crate::core::indexer::SearchQuery {
text: "keep_common".into(),
top_k: 5,
expand_graph: false,
compact: false,
..Default::default()
})
.await
.unwrap();
assert!(r.iter().any(|c| c.content.contains("keep_common")));
let r2 = idx
.search(&crate::core::indexer::SearchQuery {
text: "drop_other".into(),
top_k: 5,
expand_graph: false,
compact: false,
..Default::default()
})
.await
.unwrap();
assert!(
!r2.iter().any(|c| c.content.contains("drop_other")),
"other-repo must not have been indexed"
);
}
#[tokio::test]
async fn reindex_walks_directory_and_emits_events() {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
fs::write(root.join("a.rs"), "fn a() {}").unwrap();
fs::write(root.join("b.py"), "def b():\n pass\n").unwrap();
fs::create_dir(root.join("target")).unwrap();
fs::write(root.join("target/skip.rs"), "fn skip() {}").unwrap();
let indexer = CodeIndexer::new("test".to_string(), root.clone());
let handle = Arc::new(IndexHandle::bare(
IndexId::new("test"),
Arc::new(tokio::sync::RwLock::new(indexer)),
root.clone(),
));
let progress = Arc::new(ReindexProgress::new());
spawn_reindex(handle, progress.clone(), false);
for _ in 0..100 {
if progress.status.load() == ReindexStatus::Complete {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
assert_eq!(progress.status.load(), ReindexStatus::Complete);
assert_eq!(progress.total_files.load(Ordering::Acquire), 2);
assert_eq!(progress.indexed.load(Ordering::Acquire), 2);
let events = progress.events.lock().await;
assert!(
events
.first()
.map(|s| s.contains("\"walk_complete\""))
.unwrap_or(false),
"first event must be walk_complete (issue #317); got: {:?}",
events.first()
);
assert!(
events
.get(1)
.map(|s| s.contains("\"start\""))
.unwrap_or(false),
"second event must be start; got: {:?}",
events.get(1)
);
assert!(
events
.last()
.map(|s| s.contains("\"complete\""))
.unwrap_or(false),
"last event must be complete; got: {:?}",
events.last()
);
}
#[tokio::test]
async fn reindex_persists_chunks_end_to_end() {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
fs::create_dir_all(root.join("crates/foo/src")).unwrap();
fs::create_dir_all(root.join("excluded")).unwrap();
fs::write(root.join(".gitignore"), "excluded/\n").unwrap();
let lib_rs = root.join("crates/foo/src/lib.rs");
fs::write(
&lib_rs,
"pub fn alpha() {}\n\npub fn beta() -> i32 { 1 }\n\npub fn gamma(x: i32) -> i32 { x + 1 }\n",
)
.unwrap();
fs::write(
root.join("excluded/should_not_index.rs"),
"pub fn nope() {}\n",
)
.unwrap();
let id = IndexId::new("e2e-pipeline-test");
let indexer = CodeIndexer::new(id.0.clone(), root.clone());
let handle = Arc::new(IndexHandle::bare(
id.clone(),
Arc::new(tokio::sync::RwLock::new(indexer)),
root.clone(),
));
let progress = Arc::new(ReindexProgress::new());
spawn_reindex(handle.clone(), progress.clone(), false);
for _ in 0..100 {
if progress.status.load() == ReindexStatus::Complete {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
assert_eq!(progress.status.load(), ReindexStatus::Complete);
assert_eq!(
progress.total_files.load(Ordering::Acquire),
1,
"walker must yield exactly 1 file (gitignored subtree pruned)"
);
let chunks = progress.total_chunks.load(Ordering::Acquire);
assert!(
chunks > 0,
"regression: walker yielded 1 file but chunker persisted 0 chunks \
on the first (cold-cache) reindex"
);
assert_eq!(
progress.skipped.load(Ordering::Acquire),
0,
"first reindex hash-skipped a file (cold cache should hash-miss everything)"
);
let rel_lib_rs = "crates/foo/src/lib.rs";
let expected_resolved = root.join(rel_lib_rs).to_string_lossy().into_owned();
{
let idx = handle.indexer.read().await;
assert!(
idx.chunk_count() > 0,
"regression: indexer corpus is empty after reindex"
);
let results = idx
.search(&crate::core::indexer::SearchQuery {
text: "alpha".into(),
top_k: 5,
expand_graph: false,
compact: false,
..Default::default()
})
.await
.unwrap();
assert!(
results.iter().any(|c| c.file == expected_resolved),
"no chunk resolves to root_path + relative lib.rs (#602): \
expected {expected_resolved:?}, got {:?}",
results.iter().map(|c| c.file.clone()).collect::<Vec<_>>()
);
}
{
let idx = handle.indexer.read().await;
let raw_files: Vec<String> = idx
.raw_chunks_snapshot()
.await
.into_iter()
.map(|c| c.file)
.collect();
assert!(
raw_files.iter().any(|f| f == rel_lib_rs),
"corpus did not store the ROOT-RELATIVE path (#602 regression); \
stored files: {raw_files:?}"
);
assert!(
raw_files
.iter()
.all(|f| !std::path::Path::new(f).is_absolute()),
"corpus stored an ABSOLUTE path (#602 regression): {raw_files:?}"
);
}
let progress2 = Arc::new(ReindexProgress::new());
spawn_reindex(handle.clone(), progress2.clone(), false);
for _ in 0..100 {
if progress2.status.load() == ReindexStatus::Complete {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
assert_eq!(progress2.status.load(), ReindexStatus::Complete);
assert_eq!(
progress2.total_files.load(Ordering::Acquire),
1,
"second reindex must still walk 1 file"
);
assert_eq!(
progress2.total_chunks.load(Ordering::Acquire),
0,
"second reindex of unchanged files MUST emit 0 new chunks (hash-skip path)"
);
assert_eq!(
progress2.skipped.load(Ordering::Acquire),
1,
"second reindex must report the file as hash-skipped"
);
{
let idx = handle.indexer.read().await;
assert!(
idx.chunk_count() > 0,
"regression: corpus emptied by a hash-skip-only second reindex"
);
}
}
#[tokio::test]
async fn context_embedding_populated_after_reindex() {
use crate::core::embed::{Embedder, MockEmbedder};
use crate::core::store::{UsearchStore, VectorStore};
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
fs::write(root.join("lib.rs"), "fn hello() {}\n").unwrap();
fs::write(
root.join("README.md"),
"# proj\n\nA test project for #112.\n",
)
.unwrap();
let dim = 32;
let embedder: Arc<dyn Embedder> = Arc::new(MockEmbedder::new(dim));
let store: Arc<dyn VectorStore> = Arc::new(UsearchStore::new(dim).expect("usearch new"));
let indexer = CodeIndexer::new("ctx-test", root.clone()).with_components(embedder, store);
let handle = Arc::new(IndexHandle::bare(
IndexId::new("ctx-test"),
Arc::new(tokio::sync::RwLock::new(indexer)),
root.clone(),
));
let progress = Arc::new(ReindexProgress::new());
spawn_reindex(handle.clone(), progress.clone(), false);
for _ in 0..100 {
if progress.status.load() == ReindexStatus::Complete {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
assert_eq!(progress.status.load(), ReindexStatus::Complete);
let ctx = handle.context_embedding.read().await.clone();
assert!(
ctx.is_some(),
"context_embedding must be populated when metadata is present and embedder is wired"
);
assert_eq!(ctx.unwrap().len(), dim, "embedding must have embedder dim");
let summary = handle.context_summary.read().await.clone();
assert!(summary.is_some(), "context_summary must be populated");
let s = summary.unwrap();
assert!(s.contains("proj") || s.contains("README"));
}
#[tokio::test]
async fn reindex_marks_failed_on_zero_vectors_and_preserves_corpus() {
use crate::core::embed::Embedder;
use crate::core::store::{UsearchStore, VectorStore};
use anyhow::anyhow;
struct FailingEmbedder;
#[async_trait::async_trait]
impl Embedder for FailingEmbedder {
async fn embed(&self, _text: &str) -> anyhow::Result<Vec<f32>> {
Err(anyhow!("simulated embedder failure (embed)"))
}
async fn embed_batch(&self, _texts: &[&str]) -> anyhow::Result<Vec<Vec<f32>>> {
Err(anyhow!("simulated embedder failure (every batch)"))
}
fn dimension(&self) -> usize {
32
}
}
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
fs::write(root.join("lib.rs"), "pub fn alpha() {}\n").unwrap();
let dim = 32;
let embedder: Arc<dyn Embedder> = Arc::new(FailingEmbedder);
let store: Arc<dyn VectorStore> = Arc::new(UsearchStore::new(dim).expect("usearch new"));
let mut indexer = CodeIndexer::new("fail-601", root.clone()).with_components(embedder, store);
let corpus_path = tmp.path().join("index.redb");
let corpus = crate::core::corpus::CorpusStore::open(&corpus_path).expect("open corpus");
let mut prev = crate::core::chunker::chunk_text("prev/file.rs", "fn previous() {}", 64, 64);
prev[0].id = "prev/file.rs:1:1".into();
prev[0].file = "prev/file.rs".into();
corpus.upsert_chunks(&prev).expect("seed prev chunk");
indexer.set_corpus_store(Arc::new(corpus));
let mut handle_inner = IndexHandle::bare(
IndexId::new("fail-601"),
Arc::new(tokio::sync::RwLock::new(indexer)),
root.clone(),
);
handle_inner.defer_embed = false;
let handle = Arc::new(handle_inner);
let progress = Arc::new(ReindexProgress::new());
spawn_reindex(handle.clone(), progress.clone(), false);
let mut terminal = ReindexStatus::Running;
for _ in 0..100 {
let s = progress.status.load();
if s != ReindexStatus::Running {
terminal = s;
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
assert_eq!(
terminal,
ReindexStatus::Failed,
"embed failure must mark the reindex Failed, not Complete"
);
let stages = handle.stages.read().await.clone();
assert_eq!(stages.lifecycle_status(), "failed");
assert_eq!(stages.semantic.status, StageStatus::Failed);
assert!(
stages.semantic.failure.is_some(),
"failed semantic stage must carry a reason"
);
let events = progress.events.lock().await.clone();
assert!(
events.iter().any(|e| e.contains("\"fatal\":true")
&& e.contains("\"event\":\"error\"")
&& e.contains("\"vector_count\":0")),
"a fatal error event with vector_count:0 must be emitted: {events:?}"
);
let live = handle.indexer.read().await.raw_chunks_snapshot().await;
assert!(
!live.iter().any(|c| c.file == "lib.rs"),
"non-destructive: the failed rebuild must not promote lib.rs chunks; \
got: {:?}",
live.iter().map(|c| c.id.clone()).collect::<Vec<_>>()
);
}
#[tokio::test]
async fn context_embedding_none_when_no_metadata() {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
fs::write(root.join("lib.rs"), "fn hello() {}\n").unwrap();
let indexer = CodeIndexer::new("no-meta", root.clone());
let handle = Arc::new(IndexHandle::bare(
IndexId::new("no-meta"),
Arc::new(tokio::sync::RwLock::new(indexer)),
root.clone(),
));
let progress = Arc::new(ReindexProgress::new());
spawn_reindex(handle.clone(), progress.clone(), false);
for _ in 0..100 {
if progress.status.load() == ReindexStatus::Complete {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
assert_eq!(progress.status.load(), ReindexStatus::Complete);
assert!(handle.context_embedding.read().await.is_none());
assert!(handle.context_summary.read().await.is_none());
}
fn make_handle_with_flag(
id: &str,
root: std::path::PathBuf,
lexical_only: bool,
) -> Arc<IndexHandle> {
make_handle_with_flags(id, root, lexical_only, false)
}
fn make_handle_with_flags(
id: &str,
root: std::path::PathBuf,
lexical_only: bool,
skip_kg: bool,
) -> Arc<IndexHandle> {
use crate::core::registry::{IndexStages, StageState};
let indexer = CodeIndexer::new(id.to_string(), root.clone());
let stages = if lexical_only {
IndexStages {
lexical: StageState::pending(),
semantic: StageState::skipped(),
graph: StageState::skipped(),
}
} else if skip_kg {
IndexStages {
lexical: StageState::pending(),
semantic: StageState::pending(),
graph: StageState::skipped(),
}
} else {
IndexStages::default()
};
Arc::new(IndexHandle {
id: IndexId::new(id),
indexer: Arc::new(tokio::sync::RwLock::new(indexer)),
root_path: root,
include_paths: vec![],
exclude_globs: vec![],
extensions: vec![],
domain_terms: vec![],
include_docs: false,
respect_gitignore: true,
extra_skip_dirs: crate::service::walker::default_extra_skip_dirs(),
data_file_max_bytes: crate::service::walker::DEFAULT_DATA_FILE_MAX_BYTES,
path_filter: vec![],
context_embedding: Arc::new(tokio::sync::RwLock::new(None)),
context_summary: Arc::new(tokio::sync::RwLock::new(None)),
indexed_head_sha: Arc::new(tokio::sync::RwLock::new(None)),
last_indexed_at: Arc::new(tokio::sync::RwLock::new(None)),
lexical_only,
skip_kg,
defer_embed: false,
stages: Arc::new(tokio::sync::RwLock::new(stages)),
search_pressure: Arc::new(tokio::sync::Notify::new()),
walk_diagnostics: Arc::new(tokio::sync::RwLock::new(
crate::core::registry::WalkDiagnostics::default(),
)),
})
}
#[tokio::test]
async fn stage_1_completes_and_search_works_before_embedding() {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
fs::write(root.join("hello.rs"), "pub fn unique_alpha() {}\n").unwrap();
let handle = make_handle_with_flag("stage1-test", root.clone(), false);
let progress = Arc::new(ReindexProgress::new());
spawn_reindex(handle.clone(), progress.clone(), false);
for _ in 0..200 {
if progress.status.load() == ReindexStatus::Complete {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
assert_eq!(progress.status.load(), ReindexStatus::Complete);
let stages = handle.stages.read().await.clone();
assert_eq!(
stages.lexical.status,
crate::core::registry::StageStatus::Ready,
"stage 1 must finish on a BM25-only reindex"
);
let caps = stages.search_capabilities();
assert!(
caps.contains(&"bm25"),
"search_capabilities must contain bm25 after Stage 1, got: {caps:?}"
);
let idx = handle.indexer.read().await;
let results = idx
.search(&crate::core::indexer::SearchQuery {
text: "unique_alpha".to_string(),
top_k: 5,
expand_graph: false,
compact: false,
..Default::default()
})
.await
.expect("search");
assert!(
results.iter().any(|c| c.content.contains("unique_alpha")),
"BM25 lane must return the chunk after Stage 1: {results:?}"
);
}
#[tokio::test]
async fn lexical_only_index_never_runs_stage_2() {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
fs::write(root.join("a.rs"), "pub fn lex_only_func() {}\n").unwrap();
let handle = make_handle_with_flag("lexical-only-test", root.clone(), true);
assert_eq!(
handle.stages.read().await.semantic.status,
crate::core::registry::StageStatus::Skipped
);
let progress = Arc::new(ReindexProgress::new());
spawn_reindex(handle.clone(), progress.clone(), false);
for _ in 0..200 {
if progress.status.load() == ReindexStatus::Complete {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
assert_eq!(progress.status.load(), ReindexStatus::Complete);
let stages = handle.stages.read().await.clone();
assert_eq!(
stages.lexical.status,
crate::core::registry::StageStatus::Ready,
"lexical must be Ready"
);
assert_eq!(
stages.semantic.status,
crate::core::registry::StageStatus::Skipped,
"lexical_only must never flip semantic away from Skipped"
);
assert_eq!(
stages.graph.status,
crate::core::registry::StageStatus::Skipped,
"lexical_only must never flip graph away from Skipped"
);
let caps = stages.search_capabilities();
assert!(
!caps.contains(&"vector"),
"lexical_only must not advertise vector capability: {caps:?}"
);
assert!(
!caps.contains(&"kg"),
"lexical_only must not advertise kg capability: {caps:?}"
);
let idx = handle.indexer.read().await;
let results = idx
.search(&crate::core::indexer::SearchQuery {
text: "lex_only_func".to_string(),
top_k: 5,
expand_graph: false,
compact: false,
stage: Some(crate::core::indexer::SearchStage::Lexical),
..Default::default()
})
.await
.expect("search");
assert!(
results.iter().any(|c| c.content.contains("lex_only_func")),
"lexical lane must return the chunk on lexical_only: {results:?}"
);
assert_eq!(stages.lifecycle_status(), "ready");
}
#[tokio::test]
async fn skip_kg_index_never_runs_phase3() {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
fs::write(root.join("b.rs"), "pub fn skip_kg_func() { let x = 1; }\n").unwrap();
let handle = make_handle_with_flags("skip-kg-test", root.clone(), false, true);
assert_eq!(
handle.stages.read().await.graph.status,
crate::core::registry::StageStatus::Skipped
);
let progress = Arc::new(ReindexProgress::new());
spawn_reindex(handle.clone(), progress.clone(), false);
for _ in 0..200 {
if progress.status.load() == ReindexStatus::Complete {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
assert_eq!(progress.status.load(), ReindexStatus::Complete);
let stages = handle.stages.read().await.clone();
assert_eq!(
stages.lexical.status,
crate::core::registry::StageStatus::Ready,
"lexical must be Ready"
);
assert_eq!(
stages.graph.status,
crate::core::registry::StageStatus::Skipped,
"skip_kg must never flip graph away from Skipped"
);
let caps = stages.search_capabilities();
assert!(
!caps.contains(&"kg"),
"skip_kg must not advertise kg capability: {caps:?}"
);
let indexer = handle.indexer.read().await;
let graph = indexer.snapshot_symbol_graph().await;
assert_eq!(
graph.node_count(),
0,
"symbol graph must be empty when skip_kg=true"
);
}
#[tokio::test]
async fn search_capabilities_grows_as_stages_complete() {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
fs::write(root.join("a.rs"), "pub fn stage_grow() {}\n").unwrap();
let handle = make_handle_with_flag("caps-grow-test", root.clone(), false);
assert!(handle.stages.read().await.search_capabilities().is_empty());
reset_stages_for_reindex(&handle).await;
assert!(handle.stages.read().await.search_capabilities().is_empty());
mark_lexical_ready_semantic_in_progress(&handle, 1, 1, 1).await;
let caps = handle.stages.read().await.search_capabilities();
assert!(caps.contains(&"bm25") && !caps.contains(&"vector"));
mark_semantic_ready_graph_in_progress(&handle, 1, 1).await;
let caps = handle.stages.read().await.search_capabilities();
assert!(caps.contains(&"vector") && !caps.contains(&"kg"));
mark_graph_ready(&handle).await;
let caps = handle.stages.read().await.search_capabilities();
assert!(caps.contains(&"bm25"));
assert!(caps.contains(&"vector"));
assert!(caps.contains(&"kg"));
assert_eq!(handle.stages.read().await.lifecycle_status(), "ready");
}
#[tokio::test]
async fn walk_diagnostics_populated_after_reindex() {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
fs::write(root.join("diag_check.rs"), "fn diag_fn() {}\n").unwrap();
let handle = make_handle_with_flag("diag-test", root.clone(), false);
let progress = Arc::new(ReindexProgress::new());
spawn_reindex(handle.clone(), progress.clone(), false);
for _ in 0..100 {
if progress.status.load() == ReindexStatus::Complete {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
assert_eq!(progress.status.load(), ReindexStatus::Complete);
let diag = handle.walk_diagnostics.read().await.clone();
assert!(
diag.last_walk_started_at.is_some(),
"last_walk_started_at must be set after reindex, got {:?}",
diag
);
assert!(
diag.last_walk_files_seen > 0,
"last_walk_files_seen must be > 0 when files exist, got {:?}",
diag
);
assert!(
diag.last_walk_error.is_none(),
"last_walk_error must be None on a clean walk, got {:?}",
diag.last_walk_error
);
}
#[tokio::test]
async fn walk_diagnostics_error_set_when_zero_files() {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
let handle = make_handle_with_flag("diag-zero-test", root.clone(), false);
let progress = Arc::new(ReindexProgress::new());
spawn_reindex(handle.clone(), progress.clone(), false);
for _ in 0..100 {
if progress.status.load() == ReindexStatus::Complete {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
assert_eq!(progress.status.load(), ReindexStatus::Complete);
let diag = handle.walk_diagnostics.read().await.clone();
assert_eq!(
diag.last_walk_files_seen, 0,
"last_walk_files_seen must be 0 for empty directory, got {:?}",
diag
);
assert!(
diag.last_walk_error.is_some(),
"last_walk_error must be set when zero files are found, got {:?}",
diag
);
}
#[test]
fn reindex_semaphore_selection_routes_by_priority() {
let interactive = reindex_semaphore_for(true) as *const Semaphore;
let background = reindex_semaphore_for(false) as *const Semaphore;
assert_ne!(
interactive, background,
"interactive and background must be different semaphore instances"
);
assert_eq!(
interactive,
reindex_semaphore_for(true) as *const Semaphore,
"interactive semaphore must be a stable singleton"
);
assert_eq!(
background,
reindex_semaphore_for(false) as *const Semaphore,
"background semaphore must be a stable singleton"
);
}
#[tokio::test]
async fn interactive_not_blocked_when_background_semaphore_full() {
let bg_sem = Semaphore::new(MAX_PARALLEL_BACKGROUND_REINDEXES);
let interactive_sem = Semaphore::new(MAX_PARALLEL_REINDEXES);
let _bg_permit = bg_sem
.acquire()
.await
.expect("background semaphore unexpectedly closed");
let interactive_permit = interactive_sem
.try_acquire()
.expect("interactive semaphore must have a free permit even when background is full");
assert_eq!(
bg_sem.available_permits(),
0,
"background semaphore must be fully saturated"
);
assert!(
interactive_sem.available_permits() < MAX_PARALLEL_REINDEXES,
"interactive semaphore must show one consumed permit"
);
drop(interactive_permit);
}
#[test]
fn background_reindex_queue_depth_counts_waiting_tasks() {
let initial = BACKGROUND_QUEUE_DEPTH.load(std::sync::atomic::Ordering::Relaxed);
BACKGROUND_QUEUE_DEPTH.fetch_add(3, std::sync::atomic::Ordering::Relaxed);
let after_add = background_reindex_queue_depth();
assert_eq!(
after_add,
initial + 3,
"queue depth must increase by 3 after 3 increments"
);
BACKGROUND_QUEUE_DEPTH.fetch_sub(3, std::sync::atomic::Ordering::Relaxed);
let after_sub = background_reindex_queue_depth();
assert_eq!(
after_sub, initial,
"queue depth must return to initial after 3 decrements"
);
}
#[test]
fn reindex_guard_fires_on_early_return() {
let progress = Arc::new(ReindexProgress::new());
let mut rx = progress.sender.subscribe();
{
let _guard = ReindexTerminationGuard::new(Arc::clone(&progress));
}
assert_eq!(
progress.status.load(),
ReindexStatus::Failed,
"status must be Failed after guard drops while armed"
);
let msg = rx
.try_recv()
.expect("guard must have broadcast an error event");
assert!(
msg.contains("\"error\""),
"broadcast message must contain event:error; got: {msg}"
);
}
#[test]
fn reindex_guard_does_not_fire_after_disarm() {
let progress = Arc::new(ReindexProgress::new());
let mut rx = progress.sender.subscribe();
{
let mut guard = ReindexTerminationGuard::new(Arc::clone(&progress));
guard.disarm();
}
assert_eq!(
rx.try_recv()
.err()
.map(|e| matches!(e, tokio::sync::broadcast::error::TryRecvError::Empty)),
Some(true),
"no event should be broadcast after disarm"
);
}
#[test]
fn incremental_reindex_no_durable_data_loss() {
use crate::core::chunker::{ChunkType, RawChunk};
use crate::core::corpus::CorpusStore;
let dir = tempfile::tempdir().unwrap();
let chunk = |file: &str, id: &str, content: &str| RawChunk {
id: id.to_string(),
file: file.to_string(),
start_line: 1,
end_line: 1,
content: content.to_string(),
function_name: None,
language: Some("rust".to_string()),
chunk_type: ChunkType::Code,
calls: Vec::new(),
inherits_from: Vec::new(),
chunk_depth: 0,
parent_chunk_id: None,
child_chunk_ids: Vec::new(),
nlp_keywords: Vec::new(),
nlp_code_refs: Vec::new(),
virtual_terms: Vec::new(),
};
let live_path = dir.path().join("index.redb");
{
let live = CorpusStore::open(&live_path).unwrap();
live.upsert_chunks(&[
chunk("stable.rs", "stable:1:1", "fn stable_v1() {}"),
chunk("changing.rs", "changing:1:1", "fn version_one() {}"),
])
.unwrap();
live.upsert_entities(&[
("stable.rs".to_string(), Vec::new()),
("changing.rs".to_string(), Vec::new()),
])
.unwrap();
live.upsert_file_hashes(&[("stable.rs", "aa"), ("changing.rs", "bb")])
.unwrap();
}
let pre_fix_staging_path = dir.path().join("pre_fix.redb");
{
let staging = CorpusStore::open_fresh(&pre_fix_staging_path).unwrap();
staging
.upsert_chunks(&[chunk("changing.rs", "changing:1:1", "fn version_two() {}")])
.unwrap();
}
let pre_fix_store = CorpusStore::open(&pre_fix_staging_path).unwrap();
let pre_fix_chunks = pre_fix_store.load_all_chunks().unwrap();
assert!(
pre_fix_chunks.iter().all(|c| c.file != "stable.rs"),
"PRE-FIX model: stable.rs must be absent from the unfixed staging corpus \
(this proves the bug existed — the fix is needed)"
);
assert_eq!(
pre_fix_chunks.len(),
1,
"PRE-FIX model: only the re-embedded file must be present"
);
let post_fix_staging_path = dir.path().join("post_fix.redb");
{
let live = CorpusStore::open(&live_path).unwrap();
let staging = CorpusStore::open_fresh(&post_fix_staging_path).unwrap();
staging.copy_all_from(&live).unwrap();
staging
.upsert_chunks(&[chunk("changing.rs", "changing:1:1", "fn version_two() {}")])
.unwrap();
}
let post_fix_store = CorpusStore::open(&post_fix_staging_path).unwrap();
let mut post_fix_chunks = post_fix_store.load_all_chunks().unwrap();
post_fix_chunks.sort_by(|a, b| a.file.cmp(&b.file));
assert_eq!(
post_fix_chunks.len(),
2,
"POST-FIX model: BOTH files must be present after the incremental \
reindex + simulated restart; got: {:?}",
post_fix_chunks.iter().map(|c| &c.file).collect::<Vec<_>>()
);
let stable = post_fix_chunks
.iter()
.find(|c| c.file == "stable.rs")
.expect("BUG #839: stable.rs must survive in the durable corpus after the fix");
assert_eq!(
stable.content, "fn stable_v1() {}",
"stable.rs must retain its original content (it was hash-skipped)"
);
let changing = post_fix_chunks
.iter()
.find(|c| c.file == "changing.rs")
.expect("changing.rs must be present after the second reindex");
assert_eq!(
changing.content, "fn version_two() {}",
"changing.rs must have the new content after the second reindex"
);
let hashes = post_fix_store.load_file_hashes().unwrap();
assert!(
hashes.iter().any(|(f, _)| f == "stable.rs"),
"stable.rs file hash must survive in the durable corpus so future \
incremental reindexes can still hash-skip it"
);
}
#[test]
fn incremental_reindex_carryover_failure_aborts() {
use crate::core::chunker::{ChunkType, RawChunk};
use crate::core::corpus::CorpusStore;
let dir = tempfile::tempdir().unwrap();
let make_chunk = |file: &str, id: &str, content: &str| RawChunk {
id: id.to_string(),
file: file.to_string(),
start_line: 1,
end_line: 1,
content: content.to_string(),
function_name: None,
language: Some("rust".to_string()),
chunk_type: ChunkType::Code,
calls: Vec::new(),
inherits_from: Vec::new(),
chunk_depth: 0,
parent_chunk_id: None,
child_chunk_ids: Vec::new(),
nlp_keywords: Vec::new(),
nlp_code_refs: Vec::new(),
virtual_terms: Vec::new(),
};
let live_path = dir.path().join("live_abort_test.redb");
{
let live = CorpusStore::open(&live_path).unwrap();
live.upsert_chunks(&[
make_chunk("alpha.rs", "alpha:1:1", "fn alpha() {}"),
make_chunk("beta.rs", "beta:1:1", "fn beta() {}"),
])
.unwrap();
live.upsert_file_hashes(&[("alpha.rs", "hash_a"), ("beta.rs", "hash_b")])
.unwrap();
}
{
let check = CorpusStore::open(&live_path).unwrap();
assert_eq!(
check.load_all_chunks().unwrap().len(),
2,
"pre-condition: live corpus must have 2 chunks"
);
}
let dir_staging_path = dir.path().join("staging_is_a_dir");
std::fs::create_dir_all(&dir_staging_path).unwrap();
let staging_open_err = CorpusStore::open_fresh(&dir_staging_path);
assert!(
staging_open_err.is_err(),
"opening a directory as a redb corpus must return Err — \
this confirms the error-propagation path is exercised"
);
{
let live_after = CorpusStore::open(&live_path).unwrap();
let chunks_after = live_after.load_all_chunks().unwrap();
assert_eq!(
chunks_after.len(),
2,
"ABORT PATH: live corpus must STILL have 2 chunks after a failed \
staging setup — got {:?}",
chunks_after.iter().map(|c| &c.file).collect::<Vec<_>>()
);
assert!(
chunks_after.iter().any(|c| c.file == "alpha.rs"),
"alpha.rs must remain in the live corpus after a failed carryover"
);
assert!(
chunks_after.iter().any(|c| c.file == "beta.rs"),
"beta.rs must remain in the live corpus after a failed carryover"
);
}
let good_staging_path = dir.path().join("good_staging_sanity.redb");
{
let good_live = CorpusStore::open(&live_path).unwrap();
let good_staging = CorpusStore::open_fresh(&good_staging_path).unwrap();
let copy_result = good_staging.copy_all_from(&good_live);
assert!(
copy_result.is_ok(),
"copy_all_from must succeed when both source and destination are valid: {:?}",
copy_result
);
let copied = good_staging.load_all_chunks().unwrap();
assert_eq!(
copied.len(),
2,
"copy_all_from sanity: must copy all 2 chunks from the live corpus"
);
}
}
#[tokio::test]
async fn last_indexed_stamped_after_reindex() {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
fs::write(root.join("alpha.rs"), "pub fn alpha() {}\n").unwrap();
let handle = make_handle_with_flag("li-stamp-test", root, false);
let progress = Arc::new(ReindexProgress::new());
spawn_reindex(handle.clone(), progress.clone(), false);
for _ in 0..200 {
if progress.status.load() == ReindexStatus::Complete {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
assert_eq!(progress.status.load(), ReindexStatus::Complete);
let ts = handle.last_indexed_at.read().await.clone();
assert!(
ts.is_some(),
"#878: last_indexed_at must be Some after a completed reindex; got None"
);
let ts_str = ts.unwrap();
assert!(
chrono::DateTime::parse_from_rfc3339(&ts_str).is_ok(),
"#878: last_indexed_at must be a valid RFC-3339 string; got: {ts_str}"
);
}
#[tokio::test]
async fn lexical_chunks_reports_corpus_total_not_pass_count() {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
fs::write(
root.join("beta.rs"),
"pub fn beta() {}\npub fn gamma() {}\npub fn delta() {}\n",
)
.unwrap();
let handle = make_handle_with_flag("lc-total-test", root, false);
let progress1 = Arc::new(ReindexProgress::new());
spawn_reindex(handle.clone(), progress1.clone(), false);
for _ in 0..200 {
if progress1.status.load() == ReindexStatus::Complete {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
assert_eq!(progress1.status.load(), ReindexStatus::Complete);
let chunks_pass1 = progress1.total_chunks.load(Ordering::Acquire);
assert!(
chunks_pass1 > 0,
"first reindex must commit at least one chunk"
);
let stages_after_pass1 = handle.stages.read().await.clone();
let lexical_chunks_after_pass1 = stages_after_pass1.lexical.chunks.unwrap_or(0);
assert_eq!(
lexical_chunks_after_pass1, chunks_pass1,
"#879: after first reindex stages.lexical.chunks ({lexical_chunks_after_pass1}) \
must equal total_chunks ({chunks_pass1})"
);
let progress2 = Arc::new(ReindexProgress::new());
spawn_reindex(handle.clone(), progress2.clone(), false);
for _ in 0..200 {
if progress2.status.load() == ReindexStatus::Complete {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
assert_eq!(progress2.status.load(), ReindexStatus::Complete);
let chunks_pass2 = progress2.total_chunks.load(Ordering::Acquire);
assert_eq!(
chunks_pass2, 0,
"no-change reindex must produce 0 new chunks (all hash-skipped); got {chunks_pass2}"
);
let stages_after_pass2 = handle.stages.read().await.clone();
let lexical_chunks_after_pass2 = stages_after_pass2.lexical.chunks.unwrap_or(0);
assert_eq!(
lexical_chunks_after_pass2, chunks_pass1,
"#879: after no-change reindex stages.lexical.chunks ({lexical_chunks_after_pass2}) \
must equal the corpus total ({chunks_pass1}), not the per-pass count ({chunks_pass2})"
);
}
#[test]
fn inprocess_embedder_flag_reset_restores_false() {
reset_inprocess_embedder_flag_for_tests();
assert!(
!inprocess_embedder_ever_ready_for_tests(),
"pre-condition: flag must be false after initial reset"
);
reset_inprocess_embedder_flag_for_tests();
assert!(
!inprocess_embedder_ever_ready_for_tests(),
"reset must be idempotent: flag remains false when called on already-false flag"
);
}
#[test]
fn inprocess_embedder_flag_isolated_across_scenarios() {
reset_inprocess_embedder_flag_for_tests();
assert!(
!inprocess_embedder_ever_ready_for_tests(),
"Scenario A: flag must be false after reset — guarantees isolation from \
any prior test that set it to true (issue #1179 isolation contract)"
);
reset_inprocess_embedder_flag_for_tests();
assert!(
!inprocess_embedder_ever_ready_for_tests(),
"Scenario B: second consecutive reset must leave flag false (idempotent)"
);
}