use std::sync::Arc;
use anyhow::Result;
use rsclaw::kb::{
KbEmbedder, KbIndex, KbPaths, KbStore, StubEmbedder,
compactor::run_compactor_tick,
ledger::LedgerStatus,
store::ledger,
sync::{KbSourceSyncer, ManualUploadSyncer, SyncContext, SyncReason},
worker::{DefaultDispatcher, HandlerCtx, WorkerConfig, WorkerPool},
};
use tempfile::TempDir;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn compactor_advances_indexing_complete_ledger() -> Result<()> {
let tmp = TempDir::new()?;
let store = Arc::new(KbStore::open(&tmp.path().join("kb.redb"))?);
let paths = Arc::new(KbPaths::new(tmp.path().join("kb")));
paths.ensure_layout()?;
let embedder: Arc<dyn KbEmbedder> = Arc::new(StubEmbedder::default());
let index = Arc::new(KbIndex::open(&paths)?);
let fixture = tmp.path().join("a.md");
std::fs::write(&fixture, "# A\n\nbody one.")?;
let syncer = ManualUploadSyncer {
source_id: "test:a".into(),
file_path: fixture,
tags: vec![],
};
let sctx = SyncContext {
store: store.clone(),
paths: paths.clone(),
index: index.clone(),
embedder: embedder.clone(),
};
syncer.sync(&sctx, SyncReason::Manual).await.unwrap();
let hctx = HandlerCtx {
store: store.clone(),
paths: paths.clone(),
embedder,
index,
};
WorkerPool::run_one_blocking(&hctx, &WorkerConfig::default(), &DefaultDispatcher)?;
let rtx = store.begin_read()?;
let done_before = ledger::list_by_status(&rtx, LedgerStatus::IndexingComplete)?;
assert_eq!(
done_before.len(),
1,
"worker should have advanced the ledger"
);
drop(rtx);
let now = chrono::Utc::now().timestamp_millis();
let stats = run_compactor_tick(&store, &paths, now)?;
assert_eq!(stats.ledger_advanced_to_cleanup, 1);
let rtx = store.begin_read()?;
let pending = ledger::list_by_status(&rtx, LedgerStatus::CleanupPending)?;
assert_eq!(pending.len(), 1, "ledger should now be CleanupPending");
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn compactor_snapshot_files_appear_via_kb_index() -> Result<()> {
use rsclaw::kb::KbIndex;
let tmp = TempDir::new()?;
let store = Arc::new(KbStore::open(&tmp.path().join("kb.redb"))?);
let paths = Arc::new(KbPaths::new(tmp.path().join("kb")));
paths.ensure_layout()?;
let embedder: Arc<dyn KbEmbedder> = Arc::new(StubEmbedder::default());
let index = Arc::new(KbIndex::open(&paths)?);
let fixture = tmp.path().join("snap.md");
std::fs::write(&fixture, "# Snap\n\nbody for snapshot test.")?;
let syncer = ManualUploadSyncer {
source_id: "test:snap".into(),
file_path: fixture,
tags: vec![],
};
let sctx = SyncContext {
store: store.clone(),
paths: paths.clone(),
index: index.clone(),
embedder: embedder.clone(),
};
syncer.sync(&sctx, SyncReason::Manual).await.unwrap();
let hctx = HandlerCtx {
store: store.clone(),
paths: paths.clone(),
embedder,
index: index.clone(),
};
WorkerPool::run_one_blocking(&hctx, &WorkerConfig::default(), &DefaultDispatcher)?;
index.snapshot_hnsw(&paths)?;
let snap_dir = paths.root.join("hnsw");
assert!(
snap_dir.join("snapshot.meta.json").exists(),
"meta.json missing"
);
assert!(
snap_dir.join("snapshot.hnsw.graph").exists(),
"hnsw.graph missing"
);
assert!(
snap_dir.join("snapshot.hnsw.data").exists(),
"hnsw.data missing"
);
Ok(())
}