use std::collections::BTreeSet;
use std::path::PathBuf;
use std::sync::{Arc, Mutex, MutexGuard};
use noxu::{
Database, DatabaseConfig, DatabaseEntry, Environment, EnvironmentConfig, Get, NoxuError,
OperationStatus, Transaction, TransactionConfig,
};
use roaring::RoaringBitmap;
use crate::index::{IndexedDoc, TextIndex};
use crate::trigram::extract_trigram_set;
const DOC_KEY_LEN: usize = 4;
const TRIGRAM_KEY_LEN: usize = 8;
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum PersistError {
#[error("noxu: {0}")]
Noxu(#[from] NoxuError),
#[error("roaring io: {0}")]
RoaringIo(#[from] std::io::Error),
#[error("bincode: {0}")]
Bincode(String),
#[error("corrupt key in {db} db: expected {expected} bytes, got {got}")]
CorruptKey {
db: &'static str,
expected: usize,
got: usize,
},
}
impl From<bincode::Error> for PersistError {
fn from(err: bincode::Error) -> Self {
PersistError::Bincode(err.to_string())
}
}
#[derive(Debug, Clone)]
pub struct PersistConfig {
pub env_path: PathBuf,
pub docs_db_name: String,
pub postings_db_name: String,
pub bloom_db_name: String,
pub cache_size_bytes: u64,
}
impl Default for PersistConfig {
fn default() -> Self {
Self {
env_path: PathBuf::from("/var/lib/dynomite/dyntext"),
docs_db_name: "docs".to_string(),
postings_db_name: "postings".to_string(),
bloom_db_name: "bloom".to_string(),
cache_size_bytes: 64 * 1024 * 1024,
}
}
}
#[derive(Clone)]
pub struct NoxuPersister {
inner: Arc<Inner>,
}
struct Inner {
env: Mutex<Environment>,
docs: Mutex<Database>,
postings: Mutex<Database>,
bloom: Mutex<Database>,
}
impl NoxuPersister {
pub fn open(cfg: PersistConfig) -> Result<Self, PersistError> {
let PersistConfig {
env_path,
docs_db_name,
postings_db_name,
bloom_db_name,
cache_size_bytes,
} = cfg;
let env_config = EnvironmentConfig::new(env_path)
.with_allow_create(true)
.with_transactional(true)
.with_cache_size(cache_size_bytes);
let env = Environment::open(env_config)?;
let db_config = DatabaseConfig::new()
.with_allow_create(true)
.with_transactional(true);
let docs = env.open_database(None, &docs_db_name, &db_config)?;
let postings = env.open_database(None, &postings_db_name, &db_config)?;
let bloom = env.open_database(None, &bloom_db_name, &db_config)?;
Ok(Self {
inner: Arc::new(Inner {
env: Mutex::new(env),
docs: Mutex::new(docs),
postings: Mutex::new(postings),
bloom: Mutex::new(bloom),
}),
})
}
pub fn snapshot(&self, idx: &TextIndex) -> Result<(), PersistError> {
let env = self.lock_env();
let docs = self.lock_docs();
let postings = self.lock_postings();
let bloom = self.lock_bloom();
let handles = DbHandles {
docs: &docs,
postings: &postings,
bloom: &bloom,
};
let txn = env.begin_transaction(Some(&TransactionConfig::default()))?;
match snapshot_locked(&txn, &handles, idx) {
Ok(()) => {
txn.commit()?;
Ok(())
}
Err(err) => {
let _ = txn.abort();
Err(err)
}
}
}
pub fn append_doc(&self, doc_id: u32, idx: &TextIndex) -> Result<(), PersistError> {
let Some(doc) = idx.docs().get(&doc_id) else {
return Err(PersistError::Noxu(NoxuError::IllegalArgument(format!(
"append_doc: doc id {doc_id} is not present in the index"
))));
};
let env = self.lock_env();
let docs = self.lock_docs();
let postings = self.lock_postings();
let bloom = self.lock_bloom();
let handles = DbHandles {
docs: &docs,
postings: &postings,
bloom: &bloom,
};
let txn = env.begin_transaction(Some(&TransactionConfig::default()))?;
match append_locked(&txn, &handles, doc_id, doc, idx) {
Ok(()) => {
txn.commit()?;
Ok(())
}
Err(err) => {
let _ = txn.abort();
Err(err)
}
}
}
pub fn load(&self) -> Result<TextIndex, PersistError> {
let docs = self.lock_docs();
let mut idx = TextIndex::new();
let mut last_id: Option<u32> = None;
for_each_record(&docs, |key, value| {
if key.len() != DOC_KEY_LEN {
return Err(PersistError::CorruptKey {
db: "docs",
expected: DOC_KEY_LEN,
got: key.len(),
});
}
let mut buf = [0u8; DOC_KEY_LEN];
buf.copy_from_slice(key);
let doc_id = u32::from_be_bytes(buf);
let next_expected = match last_id {
Some(prev) => prev.checked_add(1).unwrap_or(prev),
None => 0,
};
for filler in next_expected..doc_id {
let assigned = idx.insert(Vec::new());
debug_assert_eq!(assigned, filler);
idx.remove(assigned);
}
let assigned = idx.insert(value.to_vec());
debug_assert_eq!(assigned, doc_id);
last_id = Some(doc_id);
Ok(())
})?;
Ok(idx)
}
pub fn env_path(&self) -> PathBuf {
let env = self.lock_env();
env.get_home().to_path_buf()
}
fn lock_env(&self) -> MutexGuard<'_, Environment> {
self.inner
.env
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
}
fn lock_docs(&self) -> MutexGuard<'_, Database> {
self.inner
.docs
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
}
fn lock_postings(&self) -> MutexGuard<'_, Database> {
self.inner
.postings
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
}
fn lock_bloom(&self) -> MutexGuard<'_, Database> {
self.inner
.bloom
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
}
}
struct DbHandles<'a> {
docs: &'a Database,
postings: &'a Database,
bloom: &'a Database,
}
fn snapshot_locked(
txn: &Transaction,
handles: &DbHandles<'_>,
idx: &TextIndex,
) -> Result<(), PersistError> {
clear_database(handles.docs, Some(txn))?;
clear_database(handles.bloom, Some(txn))?;
clear_database(handles.postings, Some(txn))?;
let mut all_trigrams: BTreeSet<u64> = BTreeSet::new();
for (doc_id, doc) in idx.docs() {
put_doc(txn, handles.docs, *doc_id, &doc.text)?;
put_bloom(txn, handles.bloom, *doc_id, doc)?;
for t in extract_trigram_set(&doc.text) {
all_trigrams.insert(t);
}
}
for trigram in all_trigrams {
if let Some(bitmap) = idx.postings().lookup(trigram) {
put_postings(txn, handles.postings, trigram, bitmap)?;
}
}
Ok(())
}
fn append_locked(
txn: &Transaction,
handles: &DbHandles<'_>,
doc_id: u32,
doc: &IndexedDoc,
idx: &TextIndex,
) -> Result<(), PersistError> {
put_doc(txn, handles.docs, doc_id, &doc.text)?;
put_bloom(txn, handles.bloom, doc_id, doc)?;
for trigram in extract_trigram_set(&doc.text) {
if let Some(bitmap) = idx.postings().lookup(trigram) {
put_postings(txn, handles.postings, trigram, bitmap)?;
}
}
Ok(())
}
fn doc_key(doc_id: u32) -> [u8; DOC_KEY_LEN] {
doc_id.to_be_bytes()
}
fn trigram_key(trigram: u64) -> [u8; TRIGRAM_KEY_LEN] {
trigram.to_be_bytes()
}
fn put_doc(
txn: &Transaction,
docs_db: &Database,
doc_id: u32,
text: &[u8],
) -> Result<(), PersistError> {
let key = doc_key(doc_id);
docs_db.put(
Some(txn),
&DatabaseEntry::from_bytes(&key),
&DatabaseEntry::from_bytes(text),
)?;
Ok(())
}
fn put_bloom(
txn: &Transaction,
bloom_db: &Database,
doc_id: u32,
doc: &IndexedDoc,
) -> Result<(), PersistError> {
let key = doc_key(doc_id);
let value = bincode::serialize(&doc.bloom)?;
bloom_db.put(
Some(txn),
&DatabaseEntry::from_bytes(&key),
&DatabaseEntry::from_bytes(&value),
)?;
Ok(())
}
fn put_postings(
txn: &Transaction,
postings_db: &Database,
trigram: u64,
bitmap: &RoaringBitmap,
) -> Result<(), PersistError> {
let key = trigram_key(trigram);
let mut buf: Vec<u8> = Vec::with_capacity(bitmap.serialized_size());
bitmap.serialize_into(&mut buf)?;
postings_db.put(
Some(txn),
&DatabaseEntry::from_bytes(&key),
&DatabaseEntry::from_bytes(&buf),
)?;
Ok(())
}
fn for_each_record<F>(db: &Database, mut f: F) -> Result<(), PersistError>
where
F: FnMut(&[u8], &[u8]) -> Result<(), PersistError>,
{
let mut cursor = db.open_cursor(None, None)?;
let mut key = DatabaseEntry::new();
let mut value = DatabaseEntry::new();
let mut status = cursor.get(&mut key, &mut value, Get::First, None)?;
while matches!(status, OperationStatus::Success) {
f(key.data(), value.data())?;
status = cursor.get(&mut key, &mut value, Get::Next, None)?;
}
let _ = cursor.close();
Ok(())
}
fn clear_database(db: &Database, txn: Option<&Transaction>) -> Result<(), PersistError> {
let mut keys: Vec<Vec<u8>> = Vec::new();
{
let mut cursor = db.open_cursor(txn, None)?;
let mut key = DatabaseEntry::new();
let mut value = DatabaseEntry::new();
let mut status = cursor.get(&mut key, &mut value, Get::First, None)?;
while matches!(status, OperationStatus::Success) {
keys.push(key.data().to_vec());
status = cursor.get(&mut key, &mut value, Get::Next, None)?;
}
let _ = cursor.close();
}
for k in keys {
db.delete(txn, &DatabaseEntry::from_bytes(&k))?;
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn open_persister(dir: &TempDir) -> NoxuPersister {
let cfg = PersistConfig {
env_path: dir.path().to_path_buf(),
..PersistConfig::default()
};
NoxuPersister::open(cfg).expect("open persister")
}
#[test]
fn open_creates_environment_and_databases() {
let dir = TempDir::new().expect("tempdir");
let p = open_persister(&dir);
assert_eq!(p.env_path(), dir.path());
}
#[test]
fn doc_key_encodes_big_endian_u32() {
assert_eq!(doc_key(0), [0u8, 0, 0, 0]);
assert_eq!(doc_key(1), [0u8, 0, 0, 1]);
assert_eq!(doc_key(0x0102_0304), [1u8, 2, 3, 4]);
}
#[test]
fn trigram_key_encodes_big_endian_u64() {
assert_eq!(trigram_key(0), [0u8; 8]);
assert_eq!(
trigram_key(0x0102_0304_0506_0708),
[1u8, 2, 3, 4, 5, 6, 7, 8]
);
}
#[test]
fn snapshot_then_load_empty_index_round_trips() {
let dir = TempDir::new().expect("tempdir");
let p = open_persister(&dir);
let idx = TextIndex::new();
p.snapshot(&idx).expect("snapshot empty");
let restored = p.load().expect("load empty");
assert_eq!(restored.doc_count(), 0);
}
#[test]
fn append_doc_rejects_unknown_id() {
let dir = TempDir::new().expect("tempdir");
let p = open_persister(&dir);
let idx = TextIndex::new();
let err = p.append_doc(7, &idx).unwrap_err();
assert!(matches!(err, PersistError::Noxu(_)));
}
}