use std::path::Path;
use anyhow::{Context, Result};
use redb::{Database, ReadableTable, ReadableTableMetadata, TableDefinition};
use crate::core::chunker::RawChunk;
use crate::core::entity::RawEntity;
const REDB_CACHE_SIZE_BYTES: usize = 16 * 1024 * 1024 * 1024;
const CHUNKS_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("chunks");
const ENTITIES_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("entities");
pub struct CorpusStore {
db: Database,
path: std::path::PathBuf,
}
impl CorpusStore {
pub fn open(path: &Path) -> Result<Self> {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)
.with_context(|| format!("create parent of {}", path.display()))?;
}
let db = Database::builder()
.set_cache_size(REDB_CACHE_SIZE_BYTES)
.create(path)
.with_context(|| format!("open redb corpus at {}", path.display()))?;
{
let txn = db.begin_write().context("begin corpus init txn")?;
{
txn.open_table(CHUNKS_TABLE).context("init chunks table")?;
txn.open_table(ENTITIES_TABLE)
.context("init entities table")?;
}
txn.commit().context("commit corpus init txn")?;
}
Ok(Self {
db,
path: path.to_path_buf(),
})
}
pub fn open_fresh(path: &Path) -> Result<Self> {
match std::fs::remove_file(path) {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => {
return Err(e)
.with_context(|| format!("clear stale staging corpus at {}", path.display()))
}
}
Self::open(path)
}
pub fn path(&self) -> &Path {
&self.path
}
pub fn upsert_chunks(&self, chunks: &[RawChunk]) -> Result<()> {
if chunks.is_empty() {
return Ok(());
}
let txn = self.db.begin_write().context("begin chunk upsert txn")?;
{
let mut table = txn.open_table(CHUNKS_TABLE)?;
for chunk in chunks {
let bytes = serde_json::to_vec(chunk)
.with_context(|| format!("serialize chunk {}", chunk.id))?;
table
.insert(chunk.id.as_str(), bytes.as_slice())
.with_context(|| format!("insert chunk {}", chunk.id))?;
}
}
txn.commit().context("commit chunk upsert txn")?;
Ok(())
}
pub fn upsert_entities(&self, entities: &[(String, Vec<RawEntity>)]) -> Result<()> {
if entities.is_empty() {
return Ok(());
}
let txn = self.db.begin_write().context("begin entity upsert txn")?;
{
let mut table = txn.open_table(ENTITIES_TABLE)?;
for (file, ents) in entities {
let bytes = serde_json::to_vec(ents)
.with_context(|| format!("serialize entities for {file}"))?;
table
.insert(file.as_str(), bytes.as_slice())
.with_context(|| format!("insert entities for {file}"))?;
}
}
txn.commit().context("commit entity upsert txn")?;
Ok(())
}
pub fn upsert_batch(
&self,
chunks: &[RawChunk],
entities: &[(String, Vec<RawEntity>)],
) -> Result<()> {
if chunks.is_empty() && entities.is_empty() {
return Ok(());
}
let txn = self.db.begin_write().context("begin batch upsert txn")?;
{
let mut chunks_tbl = txn
.open_table(CHUNKS_TABLE)
.context("open chunks table for batch upsert")?;
for chunk in chunks {
let bytes = serde_json::to_vec(chunk)
.with_context(|| format!("serialize chunk {}", chunk.id))?;
chunks_tbl
.insert(chunk.id.as_str(), bytes.as_slice())
.with_context(|| format!("insert chunk {}", chunk.id))?;
}
let mut entities_tbl = txn
.open_table(ENTITIES_TABLE)
.context("open entities table for batch upsert")?;
for (file, ents) in entities {
let bytes = serde_json::to_vec(ents)
.with_context(|| format!("serialize entities for {file}"))?;
entities_tbl
.insert(file.as_str(), bytes.as_slice())
.with_context(|| format!("insert entities for {file}"))?;
}
}
txn.commit().context("commit batch upsert txn")?;
Ok(())
}
pub fn delete_chunks(&self, ids: &[String]) -> Result<()> {
if ids.is_empty() {
return Ok(());
}
let txn = self.db.begin_write().context("begin chunk delete txn")?;
{
let mut table = txn.open_table(CHUNKS_TABLE)?;
for id in ids {
table
.remove(id.as_str())
.with_context(|| format!("delete chunk {id}"))?;
}
}
txn.commit().context("commit chunk delete txn")?;
Ok(())
}
pub fn delete_entities(&self, file: &str) -> Result<()> {
let txn = self.db.begin_write().context("begin entity delete txn")?;
{
let mut table = txn.open_table(ENTITIES_TABLE)?;
table
.remove(file)
.with_context(|| format!("delete entities for {file}"))?;
}
txn.commit().context("commit entity delete txn")?;
Ok(())
}
pub fn load_all_chunks(&self) -> Result<Vec<RawChunk>> {
let txn = self.db.begin_read().context("begin chunk read txn")?;
let table = txn.open_table(CHUNKS_TABLE)?;
let mut out = Vec::new();
for entry in table.iter().context("iterate chunks table")? {
let (key, value) = entry.context("read chunk row")?;
match serde_json::from_slice::<RawChunk>(value.value()) {
Ok(chunk) => out.push(chunk),
Err(e) => {
tracing::warn!("corpus: skipping corrupt chunk row '{}' ({e})", key.value())
}
}
}
Ok(out)
}
pub fn get_chunks(&self, ids: &[&str]) -> Result<Vec<RawChunk>> {
if ids.is_empty() {
return Ok(Vec::new());
}
let txn = self.db.begin_read().context("begin chunk point-read txn")?;
let table = txn.open_table(CHUNKS_TABLE)?;
let mut out = Vec::with_capacity(ids.len());
for id in ids {
let Some(value) = table
.get(*id)
.with_context(|| format!("point-read chunk {id}"))?
else {
tracing::warn!("corpus: chunk '{id}' not found in redb — skipping");
continue;
};
match serde_json::from_slice::<RawChunk>(value.value()) {
Ok(chunk) => out.push(chunk),
Err(e) => {
tracing::warn!("corpus: skipping corrupt chunk row '{id}' ({e})")
}
}
}
Ok(out)
}
pub fn load_all_entities(&self) -> Result<Vec<(String, Vec<RawEntity>)>> {
let txn = self.db.begin_read().context("begin entity read txn")?;
let table = txn.open_table(ENTITIES_TABLE)?;
let mut out = Vec::new();
for entry in table.iter().context("iterate entities table")? {
let (key, value) = entry.context("read entity row")?;
let file = key.value().to_string();
match serde_json::from_slice::<Vec<RawEntity>>(value.value()) {
Ok(ents) => out.push((file, ents)),
Err(e) => {
tracing::warn!("corpus: skipping corrupt entity row '{file}' ({e})")
}
}
}
Ok(out)
}
pub fn chunk_count(&self) -> Result<usize> {
let txn = self.db.begin_read().context("begin count txn")?;
let table = txn.open_table(CHUNKS_TABLE)?;
Ok(table.len().context("count chunks")? as usize)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::chunker::{ChunkType, RawChunk};
fn raw(id: &str, content: &str) -> RawChunk {
RawChunk {
id: id.to_string(),
file: "src/lib.rs".to_string(),
start_line: 1,
end_line: 1,
content: content.to_string(),
function_name: None,
language: Some("rust".to_string()),
chunk_type: ChunkType::Code,
calls: Vec::new(),
inherits_from: Vec::new(),
chunk_depth: 0,
parent_chunk_id: None,
child_chunk_ids: Vec::new(),
nlp_keywords: Vec::new(),
nlp_code_refs: Vec::new(),
virtual_terms: Vec::new(),
}
}
#[test]
fn roundtrip() {
let dir = tempfile::tempdir().unwrap();
let store = CorpusStore::open(&dir.path().join("index.redb")).unwrap();
let chunks = vec![raw("a:1:1", "fn a() {}"), raw("b:1:1", "fn b() {}")];
store.upsert_chunks(&chunks).unwrap();
store
.upsert_entities(&[("src/lib.rs".to_string(), Vec::new())])
.unwrap();
assert_eq!(store.chunk_count().unwrap(), 2);
drop(store);
let store = CorpusStore::open(&dir.path().join("index.redb")).unwrap();
let mut loaded = store.load_all_chunks().unwrap();
loaded.sort_by(|x, y| x.id.cmp(&y.id));
assert_eq!(loaded.len(), 2);
assert_eq!(loaded[0].id, "a:1:1");
assert_eq!(loaded[0].content, "fn a() {}");
let entities = store.load_all_entities().unwrap();
assert_eq!(entities.len(), 1);
assert_eq!(entities[0].0, "src/lib.rs");
}
#[test]
fn batch_upsert_is_atomic_roundtrip() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("index.redb");
{
let store = CorpusStore::open(&path).unwrap();
store
.upsert_batch(
&[raw("a:1:1", "fn a() {}"), raw("b:1:1", "fn b() {}")],
&[("src/lib.rs".to_string(), Vec::new())],
)
.unwrap();
assert_eq!(store.chunk_count().unwrap(), 2);
}
let store = CorpusStore::open(&path).unwrap();
let mut loaded = store.load_all_chunks().unwrap();
loaded.sort_by(|x, y| x.id.cmp(&y.id));
assert_eq!(loaded.len(), 2);
assert_eq!(loaded[0].id, "a:1:1");
let entities = store.load_all_entities().unwrap();
assert_eq!(entities.len(), 1);
assert_eq!(entities[0].0, "src/lib.rs");
store
.upsert_batch(&[raw("c:1:1", "fn c() {}")], &[])
.unwrap();
assert_eq!(store.chunk_count().unwrap(), 3);
store
.upsert_batch(&[], &[("src/other.rs".to_string(), Vec::new())])
.unwrap();
assert_eq!(store.load_all_entities().unwrap().len(), 2);
store.upsert_batch(&[], &[]).unwrap();
assert_eq!(store.chunk_count().unwrap(), 3);
}
#[test]
fn get_chunks_batch_reads_subset() {
let dir = tempfile::tempdir().unwrap();
let store = CorpusStore::open(&dir.path().join("index.redb")).unwrap();
store
.upsert_chunks(&[
raw("a:1:1", "fn a() {}"),
raw("b:1:1", "fn b() {}"),
raw("c:1:1", "fn c() {}"),
])
.unwrap();
let got = store
.get_chunks(&["c:1:1", "missing:0:0", "a:1:1"])
.unwrap();
assert_eq!(got.len(), 2, "unknown id must be skipped, not error");
assert_eq!(got[0].id, "c:1:1", "input order must be preserved");
assert_eq!(got[0].content, "fn c() {}");
assert_eq!(got[1].id, "a:1:1");
assert!(store.get_chunks(&[]).unwrap().is_empty());
assert!(store.get_chunks(&["nope:0:0"]).unwrap().is_empty());
}
#[test]
fn missing_db_is_empty() {
let dir = tempfile::tempdir().unwrap();
let store = CorpusStore::open(&dir.path().join("fresh.redb")).unwrap();
assert_eq!(store.chunk_count().unwrap(), 0);
assert!(store.load_all_chunks().unwrap().is_empty());
assert!(store.load_all_entities().unwrap().is_empty());
}
#[test]
fn delete_removes_chunk() {
let dir = tempfile::tempdir().unwrap();
let store = CorpusStore::open(&dir.path().join("index.redb")).unwrap();
store
.upsert_chunks(&[raw("a:1:1", "x"), raw("b:1:1", "y")])
.unwrap();
store.delete_chunks(&["a:1:1".to_string()]).unwrap();
assert_eq!(store.chunk_count().unwrap(), 1);
let loaded = store.load_all_chunks().unwrap();
assert_eq!(loaded.len(), 1);
assert_eq!(loaded[0].id, "b:1:1");
store.delete_chunks(&["nope:0:0".to_string()]).unwrap();
assert_eq!(store.chunk_count().unwrap(), 1);
}
#[test]
fn empty_batches_are_noops() {
let dir = tempfile::tempdir().unwrap();
let store = CorpusStore::open(&dir.path().join("index.redb")).unwrap();
store.upsert_chunks(&[]).unwrap();
store.upsert_entities(&[]).unwrap();
store.delete_chunks(&[]).unwrap();
assert_eq!(store.chunk_count().unwrap(), 0);
}
#[test]
fn delete_entities_removes_file_row() {
let dir = tempfile::tempdir().unwrap();
let store = CorpusStore::open(&dir.path().join("index.redb")).unwrap();
store
.upsert_entities(&[
("src/a.rs".to_string(), Vec::new()),
("src/b.rs".to_string(), Vec::new()),
])
.unwrap();
assert_eq!(store.load_all_entities().unwrap().len(), 2);
store.delete_entities("src/a.rs").unwrap();
let remaining = store.load_all_entities().unwrap();
assert_eq!(remaining.len(), 1);
assert_eq!(remaining[0].0, "src/b.rs");
store.delete_entities("src/never.rs").unwrap();
assert_eq!(store.load_all_entities().unwrap().len(), 1);
}
#[test]
fn path_accessor_returns_open_path() {
let dir = tempfile::tempdir().unwrap();
let p = dir.path().join("index.redb");
let store = CorpusStore::open(&p).unwrap();
assert_eq!(store.path(), p.as_path());
}
#[test]
fn open_fresh_truncates_stale_staging_file() {
let dir = tempfile::tempdir().unwrap();
let p = dir.path().join("index.redb.tmp");
{
let store = CorpusStore::open(&p).unwrap();
store.upsert_chunks(&[raw("stale:1:1", "old")]).unwrap();
assert_eq!(store.chunk_count().unwrap(), 1);
}
assert!(p.exists());
let fresh = CorpusStore::open_fresh(&p).unwrap();
assert_eq!(fresh.chunk_count().unwrap(), 0);
assert_eq!(fresh.path(), p.as_path());
let fresh2 = CorpusStore::open_fresh(&dir.path().join("never.redb.tmp")).unwrap();
assert_eq!(fresh2.chunk_count().unwrap(), 0);
}
}