use std::collections::HashMap;
use std::fmt;
use std::fs::{File, OpenOptions};
use std::ops::RangeBounds;
use std::path::{Path, PathBuf};
use crate::codec::{crc32c, decode_document, encode_document_into};
use crate::error::{Error, Result};
use crate::index::{SecondaryIndex, in_bounds, total_cmp_value};
use crate::sys::{read_exact_at, write_all_at};
use crate::value::{Document, Value};
pub const MAX_RECORD_BYTES: usize = 64 * 1024 * 1024;
const HEADER_MAGIC: [u8; 8] = *b"BISONDB1";
const FORMAT_VERSION: u16 = 1;
const HEADER_LEN: u64 = 16;
const FRAME_LEN: usize = 8;
const MIN_PAYLOAD: usize = 1 + 8;
const OP_PUT: u8 = 1;
const OP_DELETE: u8 = 2;
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct DocId(u64);
impl DocId {
#[inline]
#[must_use]
pub const fn get(self) -> u64 {
self.0
}
}
impl From<u64> for DocId {
#[inline]
fn from(raw: u64) -> Self {
DocId(raw)
}
}
impl From<DocId> for u64 {
#[inline]
fn from(id: DocId) -> Self {
id.0
}
}
impl fmt::Display for DocId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Clone, Copy)]
struct BodyLoc {
offset: u64,
len: u32,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct Stats {
pub live_documents: usize,
pub file_bytes: u64,
pub live_bytes: u64,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
pub enum SyncPolicy {
Always,
#[default]
Manual,
}
#[derive(Clone, Copy, Debug, Default)]
pub struct DbOptions {
sync: SyncPolicy,
}
impl DbOptions {
#[must_use]
pub fn new() -> Self {
DbOptions::default()
}
#[must_use]
pub fn sync(mut self, policy: SyncPolicy) -> Self {
self.sync = policy;
self
}
#[must_use]
pub fn build_sync_policy(&self) -> SyncPolicy {
self.sync
}
pub fn open<P: AsRef<Path>>(self, path: P) -> Result<Db> {
Db::open_inner(path.as_ref().to_path_buf(), self.sync)
}
}
pub struct Db {
file: File,
path: PathBuf,
index: HashMap<u64, BodyLoc>,
tail: u64,
next_id: u64,
scratch: Vec<u8>,
indexes: HashMap<String, SecondaryIndex>,
sync: SyncPolicy,
}
impl Db {
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
DbOptions::new().open(path)
}
pub fn open_with<P: AsRef<Path>>(path: P, options: DbOptions) -> Result<Self> {
options.open(path)
}
fn open_inner(path: PathBuf, sync: SyncPolicy) -> Result<Self> {
let _ = std::fs::remove_file(compacting_path(&path));
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(&path)?;
let file_len = file.metadata()?.len();
let mut db = Db {
file,
path,
index: HashMap::new(),
tail: HEADER_LEN,
next_id: 1,
scratch: Vec::with_capacity(256),
indexes: HashMap::new(),
sync,
};
if file_len == 0 {
db.write_header()?;
sync_parent_dir(&db.path)?;
} else {
db.verify_header(file_len)?;
db.replay(file_len)?;
}
Ok(db)
}
#[must_use]
pub fn sync_policy(&self) -> SyncPolicy {
self.sync
}
pub fn insert(&mut self, doc: Document) -> Result<DocId> {
let id = self.next_id;
self.append(OP_PUT, id, Some(&doc))?;
self.next_id = id + 1;
self.index_add(id, &doc);
Ok(DocId(id))
}
pub fn get(&self, id: DocId) -> Result<Option<Document>> {
match self.index.get(&id.0).copied() {
Some(loc) => self.read_body(loc).map(Some),
None => Ok(None),
}
}
pub fn update(&mut self, id: DocId, doc: Document) -> Result<bool> {
let Some(loc) = self.index.get(&id.0).copied() else {
return Ok(false);
};
if !self.indexes.is_empty() {
let old = self.read_body(loc)?;
self.index_remove(id.0, &old);
}
self.append(OP_PUT, id.0, Some(&doc))?;
self.index_add(id.0, &doc);
Ok(true)
}
pub fn delete(&mut self, id: DocId) -> Result<bool> {
let Some(loc) = self.index.get(&id.0).copied() else {
return Ok(false);
};
if !self.indexes.is_empty() {
let old = self.read_body(loc)?;
self.index_remove(id.0, &old);
}
self.append(OP_DELETE, id.0, None)?;
Ok(true)
}
#[must_use]
pub fn contains(&self, id: DocId) -> bool {
self.index.contains_key(&id.0)
}
#[must_use]
pub fn len(&self) -> usize {
self.index.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.index.is_empty()
}
pub fn ids(&self) -> impl Iterator<Item = DocId> + '_ {
self.index.keys().copied().map(DocId)
}
pub fn flush(&mut self) -> Result<()> {
self.file.sync_all()?;
Ok(())
}
#[must_use]
pub fn path(&self) -> &Path {
&self.path
}
#[must_use]
pub fn stats(&self) -> Stats {
let live_bytes = self.index.values().map(|loc| u64::from(loc.len)).sum();
Stats {
live_documents: self.index.len(),
file_bytes: self.tail,
live_bytes,
}
}
pub fn compact(&mut self) -> Result<()> {
let temp_path = compacting_path(&self.path);
let _ = std::fs::remove_file(&temp_path);
let temp = OpenOptions::new()
.read(true)
.write(true)
.create_new(true)
.open(&temp_path)?;
write_header_to(&temp)?;
let mut new_index = HashMap::with_capacity(self.index.len());
let mut tail = HEADER_LEN;
let mut body = Vec::new();
let mut frame = Vec::new();
for (&id, &loc) in &self.index {
body.resize(loc.len as usize, 0);
read_exact_at(&self.file, &mut body, loc.offset)?;
frame.clear();
frame.extend_from_slice(&[0u8; FRAME_LEN]);
frame.push(OP_PUT);
frame.extend_from_slice(&id.to_le_bytes());
frame.extend_from_slice(&body);
let payload_len = frame.len() - FRAME_LEN;
let crc = crc32c(&frame[FRAME_LEN..]);
frame[0..4].copy_from_slice(&(payload_len as u32).to_le_bytes());
frame[4..8].copy_from_slice(&crc.to_le_bytes());
write_all_at(&temp, &frame, tail)?;
let offset = tail + FRAME_LEN as u64 + MIN_PAYLOAD as u64;
let _ = new_index.insert(
id,
BodyLoc {
offset,
len: loc.len,
},
);
tail += (FRAME_LEN + payload_len) as u64;
}
temp.sync_all()?;
drop(temp);
self.file = OpenOptions::new().read(true).write(true).open(&temp_path)?;
std::fs::rename(&temp_path, &self.path)?;
self.file = OpenOptions::new().read(true).write(true).open(&self.path)?;
sync_parent_dir(&self.path)?;
self.index = new_index;
self.tail = tail;
Ok(())
}
pub fn create_index(&mut self, field: &str) -> Result<()> {
if self.indexes.contains_key(field) {
return Ok(());
}
let mut index = SecondaryIndex::new();
let entries: Vec<(u64, BodyLoc)> = self.index.iter().map(|(id, loc)| (*id, *loc)).collect();
for (id, loc) in entries {
let doc = self.read_body(loc)?;
if let Some(value) = doc.get(field) {
index.add(value, id);
}
}
let _ = self.indexes.insert(field.to_string(), index);
Ok(())
}
pub fn drop_index(&mut self, field: &str) -> bool {
self.indexes.remove(field).is_some()
}
pub fn indexes(&self) -> impl Iterator<Item = &str> {
self.indexes.keys().map(String::as_str)
}
pub fn find(&self, field: &str, value: &Value) -> Result<Vec<DocId>> {
if let Some(index) = self.indexes.get(field) {
return Ok(index.equal(value).into_iter().map(DocId).collect());
}
let mut out = Vec::new();
for (id, loc) in &self.index {
let doc = self.read_body(*loc)?;
if doc
.get(field)
.is_some_and(|v| total_cmp_value(v, value) == core::cmp::Ordering::Equal)
{
out.push(DocId(*id));
}
}
Ok(out)
}
pub fn range<R: RangeBounds<Value>>(&self, field: &str, range: R) -> Result<Vec<DocId>> {
let lo = range.start_bound();
let hi = range.end_bound();
if let Some(index) = self.indexes.get(field) {
return Ok(index.range(lo, hi).into_iter().map(DocId).collect());
}
let mut out = Vec::new();
for (id, loc) in &self.index {
let doc = self.read_body(*loc)?;
if doc.get(field).is_some_and(|v| in_bounds(v, lo, hi)) {
out.push(DocId(*id));
}
}
Ok(out)
}
fn read_body(&self, loc: BodyLoc) -> Result<Document> {
let mut buf = vec![0u8; loc.len as usize];
read_exact_at(&self.file, &mut buf, loc.offset)?;
decode_document(&buf)
}
fn index_add(&mut self, id: u64, doc: &Document) {
for (field, index) in &mut self.indexes {
if let Some(value) = doc.get(field) {
index.add(value, id);
}
}
}
fn index_remove(&mut self, id: u64, doc: &Document) {
for (field, index) in &mut self.indexes {
if let Some(value) = doc.get(field) {
index.remove(value, id);
}
}
}
fn append(&mut self, op: u8, id: u64, doc: Option<&Document>) -> Result<()> {
self.scratch.clear();
self.scratch.extend_from_slice(&[0u8; FRAME_LEN]);
self.scratch.push(op);
self.scratch.extend_from_slice(&id.to_le_bytes());
if let Some(doc) = doc {
encode_document_into(&mut self.scratch, doc)?;
}
let payload_len = self.scratch.len() - FRAME_LEN;
if payload_len > MAX_RECORD_BYTES {
return Err(Error::ValueTooLarge);
}
let crc = crc32c(&self.scratch[FRAME_LEN..]);
self.scratch[0..4].copy_from_slice(&(payload_len as u32).to_le_bytes());
self.scratch[4..8].copy_from_slice(&crc.to_le_bytes());
write_all_at(&self.file, &self.scratch, self.tail)?;
let record_start = self.tail;
self.tail += (FRAME_LEN + payload_len) as u64;
match op {
OP_PUT => {
let offset = record_start + FRAME_LEN as u64 + MIN_PAYLOAD as u64;
let len = (payload_len - MIN_PAYLOAD) as u32;
let _ = self.index.insert(id, BodyLoc { offset, len });
}
OP_DELETE => {
let _ = self.index.remove(&id);
}
_ => {}
}
if self.sync == SyncPolicy::Always {
self.file.sync_all()?;
}
Ok(())
}
fn write_header(&mut self) -> Result<()> {
write_header_to(&self.file)?;
self.file.sync_all()?;
Ok(())
}
fn verify_header(&self, file_len: u64) -> Result<()> {
if file_len < HEADER_LEN {
return Err(Error::BadMagic);
}
let mut header = [0u8; HEADER_LEN as usize];
read_exact_at(&self.file, &mut header, 0)?;
if header[0..8] != HEADER_MAGIC {
return Err(Error::BadMagic);
}
let version = u16::from_le_bytes([header[8], header[9]]);
if version > FORMAT_VERSION {
return Err(Error::UnsupportedVersion(version));
}
Ok(())
}
fn replay(&mut self, file_len: u64) -> Result<()> {
let mut offset = HEADER_LEN;
let mut frame = [0u8; FRAME_LEN];
loop {
if offset + FRAME_LEN as u64 > file_len {
break;
}
read_exact_at(&self.file, &mut frame, offset)?;
let payload_len = u32::from_le_bytes([frame[0], frame[1], frame[2], frame[3]]) as usize;
let expected_crc = u32::from_le_bytes([frame[4], frame[5], frame[6], frame[7]]);
if !(MIN_PAYLOAD..=MAX_RECORD_BYTES).contains(&payload_len) {
break;
}
let record_end = offset + FRAME_LEN as u64 + payload_len as u64;
if record_end > file_len {
break;
}
let mut payload = vec![0u8; payload_len];
read_exact_at(&self.file, &mut payload, offset + FRAME_LEN as u64)?;
if crc32c(&payload) != expected_crc {
if record_end == file_len {
break;
}
return Err(Error::Corrupt("crc mismatch"));
}
let op = payload[0];
let id = u64::from_le_bytes([
payload[1], payload[2], payload[3], payload[4], payload[5], payload[6], payload[7],
payload[8],
]);
match op {
OP_PUT => {
let offset = offset + FRAME_LEN as u64 + MIN_PAYLOAD as u64;
let len = (payload_len - MIN_PAYLOAD) as u32;
let _ = self.index.insert(id, BodyLoc { offset, len });
}
OP_DELETE => {
let _ = self.index.remove(&id);
}
_ => return Err(Error::Corrupt("unknown record op")),
}
if id >= self.next_id {
self.next_id = id + 1;
}
offset = record_end;
}
if offset < file_len {
self.file.set_len(offset)?;
}
self.tail = offset;
Ok(())
}
}
impl fmt::Debug for Db {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Db")
.field("path", &self.path)
.field("live_documents", &self.index.len())
.field("file_bytes", &self.tail)
.field("sync", &self.sync)
.finish()
}
}
const _: fn() = || {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<Db>();
};
impl Drop for Db {
fn drop(&mut self) {
if self.sync == SyncPolicy::Manual {
let _ = self.file.sync_all();
}
}
}
fn write_header_to(file: &File) -> Result<()> {
let mut header = [0u8; HEADER_LEN as usize];
header[0..8].copy_from_slice(&HEADER_MAGIC);
header[8..10].copy_from_slice(&FORMAT_VERSION.to_le_bytes());
write_all_at(file, &header, 0)?;
Ok(())
}
fn compacting_path(path: &Path) -> PathBuf {
let mut name = path.as_os_str().to_os_string();
name.push(".compacting");
PathBuf::from(name)
}
#[cfg(unix)]
fn sync_parent_dir(path: &Path) -> Result<()> {
let parent = path.parent().filter(|p| !p.as_os_str().is_empty());
let dir = parent.unwrap_or_else(|| Path::new("."));
let handle = File::open(dir)?;
handle.sync_all()?;
Ok(())
}
#[cfg(windows)]
fn sync_parent_dir(_path: &Path) -> Result<()> {
Ok(())
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
use crate::value::Value;
use std::sync::atomic::{AtomicU64, Ordering};
fn temp_path() -> PathBuf {
static COUNTER: AtomicU64 = AtomicU64::new(0);
let n = COUNTER.fetch_add(1, Ordering::Relaxed);
let pid = std::process::id();
let path = std::env::temp_dir().join(format!("bison_db_test_{pid}_{n}.bison"));
let _ = std::fs::remove_file(&path);
path
}
fn doc(pairs: &[(&str, i64)]) -> Document {
let mut d = Document::new();
for (k, v) in pairs {
d.set(*k, *v);
}
d
}
#[test]
fn test_insert_get_roundtrip() {
let path = temp_path();
let mut db = Db::open(&path).unwrap();
let id = db.insert(doc(&[("a", 1), ("b", 2)])).unwrap();
let got = db.get(id).unwrap().unwrap();
assert_eq!(got.get("a").and_then(Value::as_int), Some(1));
let _ = std::fs::remove_file(&path);
}
#[test]
fn test_get_missing_returns_none() {
let path = temp_path();
let db = Db::open(&path).unwrap();
assert!(db.get(DocId::from(1)).unwrap().is_none());
let _ = std::fs::remove_file(&path);
}
#[test]
fn test_delete_removes_document() {
let path = temp_path();
let mut db = Db::open(&path).unwrap();
let id = db.insert(doc(&[("x", 9)])).unwrap();
assert!(db.delete(id).unwrap());
assert!(db.get(id).unwrap().is_none());
assert!(!db.delete(id).unwrap());
let _ = std::fs::remove_file(&path);
}
#[test]
fn test_update_changes_value() {
let path = temp_path();
let mut db = Db::open(&path).unwrap();
let id = db.insert(doc(&[("v", 1)])).unwrap();
assert!(db.update(id, doc(&[("v", 2)])).unwrap());
assert_eq!(
db.get(id)
.unwrap()
.unwrap()
.get("v")
.and_then(Value::as_int),
Some(2)
);
let _ = std::fs::remove_file(&path);
}
#[test]
fn test_update_absent_id_is_false() {
let path = temp_path();
let mut db = Db::open(&path).unwrap();
assert!(!db.update(DocId::from(7), Document::new()).unwrap());
let _ = std::fs::remove_file(&path);
}
#[test]
fn test_reopen_recovers_state() {
let path = temp_path();
let (a, b);
{
let mut db = Db::open(&path).unwrap();
a = db.insert(doc(&[("n", 10)])).unwrap();
b = db.insert(doc(&[("n", 20)])).unwrap();
db.delete(a).unwrap();
db.flush().unwrap();
}
let db = Db::open(&path).unwrap();
assert!(db.get(a).unwrap().is_none());
assert_eq!(
db.get(b).unwrap().unwrap().get("n").and_then(Value::as_int),
Some(20)
);
assert_eq!(db.len(), 1);
let _ = std::fs::remove_file(&path);
}
#[test]
fn test_reopen_continues_id_sequence() {
let path = temp_path();
let first;
{
let mut db = Db::open(&path).unwrap();
first = db.insert(Document::new()).unwrap();
}
let mut db = Db::open(&path).unwrap();
let second = db.insert(Document::new()).unwrap();
assert!(second.get() > first.get());
let _ = std::fs::remove_file(&path);
}
#[test]
fn test_open_rejects_foreign_file() {
let path = temp_path();
std::fs::write(&path, b"this is definitely not a bison-db file at all").unwrap();
assert!(matches!(Db::open(&path), Err(Error::BadMagic)));
let _ = std::fs::remove_file(&path);
}
#[test]
fn test_torn_tail_is_truncated_on_open() {
let path = temp_path();
let keep;
{
let mut db = Db::open(&path).unwrap();
keep = db.insert(doc(&[("ok", 1)])).unwrap();
db.flush().unwrap();
}
{
use std::io::Write;
let mut f = OpenOptions::new().append(true).open(&path).unwrap();
let mut frame = Vec::new();
frame.extend_from_slice(&999u32.to_le_bytes());
frame.extend_from_slice(&0u32.to_le_bytes());
frame.extend_from_slice(b"short");
f.write_all(&frame).unwrap();
f.flush().unwrap();
}
let db = Db::open(&path).unwrap();
assert!(db.get(keep).unwrap().is_some());
assert_eq!(db.len(), 1);
let _ = std::fs::remove_file(&path);
}
#[test]
fn test_stats_reflect_live_documents() {
let path = temp_path();
let mut db = Db::open(&path).unwrap();
db.insert(doc(&[("a", 1)])).unwrap();
let id = db.insert(doc(&[("b", 2)])).unwrap();
db.delete(id).unwrap();
let stats = db.stats();
assert_eq!(stats.live_documents, 1);
assert!(stats.file_bytes > HEADER_LEN);
let _ = std::fs::remove_file(&path);
}
fn sorted(mut ids: Vec<DocId>) -> Vec<u64> {
ids.sort();
ids.into_iter().map(DocId::get).collect()
}
#[test]
fn test_create_index_then_find() {
let path = temp_path();
let mut db = Db::open(&path).unwrap();
let a = db.insert(doc(&[("g", 1)])).unwrap();
let b = db.insert(doc(&[("g", 2)])).unwrap();
let c = db.insert(doc(&[("g", 1)])).unwrap();
db.create_index("g").unwrap();
assert_eq!(
sorted(db.find("g", &Value::from(1_i64)).unwrap()),
sorted(vec![a, c])
);
assert_eq!(
sorted(db.find("g", &Value::from(2_i64)).unwrap()),
vec![b.get()]
);
assert!(db.find("g", &Value::from(9_i64)).unwrap().is_empty());
let _ = std::fs::remove_file(&path);
}
#[test]
fn test_find_indexed_matches_scan() {
let path = temp_path();
let mut db = Db::open(&path).unwrap();
for n in [1, 2, 2, 3, 2] {
db.insert(doc(&[("k", n)])).unwrap();
}
let scan = sorted(db.find("k", &Value::from(2_i64)).unwrap()); db.create_index("k").unwrap();
let indexed = sorted(db.find("k", &Value::from(2_i64)).unwrap());
assert_eq!(scan, indexed);
assert_eq!(scan.len(), 3);
let _ = std::fs::remove_file(&path);
}
#[test]
fn test_range_query_inclusive_and_exclusive() {
let path = temp_path();
let mut db = Db::open(&path).unwrap();
for n in [10, 20, 30, 40] {
db.insert(doc(&[("age", n)])).unwrap();
}
db.create_index("age").unwrap();
assert_eq!(
db.range("age", Value::from(20_i64)..=Value::from(30_i64))
.unwrap()
.len(),
2
);
assert_eq!(
db.range("age", Value::from(20_i64)..Value::from(40_i64))
.unwrap()
.len(),
2
);
assert_eq!(db.range("age", Value::from(25_i64)..).unwrap().len(), 2);
let _ = std::fs::remove_file(&path);
}
#[test]
fn test_index_maintained_on_update_and_delete() {
let path = temp_path();
let mut db = Db::open(&path).unwrap();
let id = db.insert(doc(&[("status", 1)])).unwrap();
db.create_index("status").unwrap();
assert_eq!(db.find("status", &Value::from(1_i64)).unwrap(), vec![id]);
db.update(id, doc(&[("status", 2)])).unwrap();
assert!(db.find("status", &Value::from(1_i64)).unwrap().is_empty());
assert_eq!(db.find("status", &Value::from(2_i64)).unwrap(), vec![id]);
db.delete(id).unwrap();
assert!(db.find("status", &Value::from(2_i64)).unwrap().is_empty());
let _ = std::fs::remove_file(&path);
}
#[test]
fn test_indexes_listed_and_dropped() {
let path = temp_path();
let mut db = Db::open(&path).unwrap();
db.create_index("a").unwrap();
db.create_index("b").unwrap();
db.create_index("a").unwrap(); let mut names: Vec<&str> = db.indexes().collect();
names.sort_unstable();
assert_eq!(names, ["a", "b"]);
assert!(db.drop_index("a"));
assert!(!db.drop_index("a"));
assert_eq!(db.indexes().count(), 1);
let _ = std::fs::remove_file(&path);
}
#[test]
fn test_index_not_persisted_but_rebuildable_after_reopen() {
let path = temp_path();
let id;
{
let mut db = Db::open(&path).unwrap();
id = db.insert(doc(&[("city", 7)])).unwrap();
db.create_index("city").unwrap();
db.flush().unwrap();
}
let mut db = Db::open(&path).unwrap();
assert_eq!(db.indexes().count(), 0); db.create_index("city").unwrap(); assert_eq!(db.find("city", &Value::from(7_i64)).unwrap(), vec![id]);
let _ = std::fs::remove_file(&path);
}
#[test]
fn test_default_sync_policy_is_manual() {
let path = temp_path();
let db = Db::open(&path).unwrap();
assert_eq!(db.sync_policy(), SyncPolicy::Manual);
let _ = std::fs::remove_file(&path);
}
#[test]
fn test_options_set_always_sync_policy() {
let path = temp_path();
let mut db = Db::open_with(&path, DbOptions::new().sync(SyncPolicy::Always)).unwrap();
assert_eq!(db.sync_policy(), SyncPolicy::Always);
let id = db.insert(doc(&[("v", 1)])).unwrap();
assert!(db.get(id).unwrap().is_some());
drop(db);
let db = Db::open(&path).unwrap();
assert_eq!(db.len(), 1);
let _ = std::fs::remove_file(&path);
}
#[test]
fn test_always_sync_persists_without_explicit_flush() {
let path = temp_path();
let id;
{
let mut db = Db::open_with(&path, DbOptions::new().sync(SyncPolicy::Always)).unwrap();
id = db.insert(doc(&[("durable", 1)])).unwrap();
}
let db = Db::open(&path).unwrap();
assert!(db.get(id).unwrap().is_some());
let _ = std::fs::remove_file(&path);
}
#[test]
fn test_dboptions_open_matches_db_open() {
let path = temp_path();
let db = DbOptions::new().open(&path).unwrap();
assert_eq!(db.sync_policy(), SyncPolicy::Manual);
let _ = std::fs::remove_file(&path);
}
#[test]
fn test_compact_reclaims_space_and_preserves_data() {
let path = temp_path();
let mut db = Db::open(&path).unwrap();
let id = db.insert(doc(&[("v", 1)])).unwrap();
for n in 2..500 {
db.update(id, doc(&[("v", n)])).unwrap();
}
let before = db.stats().file_bytes;
db.compact().unwrap();
let after = db.stats().file_bytes;
assert!(
after < before,
"compaction should shrink the file: {after} !< {before}"
);
assert_eq!(db.len(), 1);
assert_eq!(
db.get(id)
.unwrap()
.unwrap()
.get("v")
.and_then(Value::as_int),
Some(499)
);
let _ = std::fs::remove_file(&path);
}
#[test]
fn test_compact_drops_deleted_records() {
let path = temp_path();
let mut db = Db::open(&path).unwrap();
let keep = db.insert(doc(&[("k", 1)])).unwrap();
let gone = db.insert(doc(&[("k", 2)])).unwrap();
db.delete(gone).unwrap();
db.compact().unwrap();
assert_eq!(db.len(), 1);
assert!(db.get(keep).unwrap().is_some());
assert!(db.get(gone).unwrap().is_none());
let _ = std::fs::remove_file(&path);
}
#[test]
fn test_compact_preserves_ids_and_indexes() {
let path = temp_path();
let mut db = Db::open(&path).unwrap();
let a = db.insert(doc(&[("city", 1)])).unwrap();
let b = db.insert(doc(&[("city", 2)])).unwrap();
db.create_index("city").unwrap();
db.compact().unwrap();
assert_eq!(db.find("city", &Value::from(1_i64)).unwrap(), vec![a]);
assert_eq!(db.find("city", &Value::from(2_i64)).unwrap(), vec![b]);
let _ = std::fs::remove_file(&path);
}
#[test]
fn test_compact_then_reopen_recovers() {
let path = temp_path();
let id;
{
let mut db = Db::open(&path).unwrap();
id = db.insert(doc(&[("v", 7)])).unwrap();
db.update(id, doc(&[("v", 8)])).unwrap();
db.compact().unwrap();
db.flush().unwrap();
}
let db = Db::open(&path).unwrap();
assert_eq!(db.len(), 1);
assert_eq!(
db.get(id)
.unwrap()
.unwrap()
.get("v")
.and_then(Value::as_int),
Some(8)
);
let _ = std::fs::remove_file(&path);
}
#[test]
fn test_compact_empty_db_is_ok() {
let path = temp_path();
let mut db = Db::open(&path).unwrap();
db.compact().unwrap();
assert!(db.is_empty());
let id = db.insert(doc(&[("x", 1)])).unwrap();
assert!(db.get(id).unwrap().is_some());
let _ = std::fs::remove_file(&path);
}
#[test]
fn test_compact_preserves_sync_policy_and_data() {
let path = temp_path();
let mut db = Db::open_with(&path, DbOptions::new().sync(SyncPolicy::Always)).unwrap();
let id = db.insert(doc(&[("v", 1)])).unwrap();
for n in 2..50 {
db.update(id, doc(&[("v", n)])).unwrap();
}
db.compact().unwrap();
assert_eq!(db.sync_policy(), SyncPolicy::Always);
assert_eq!(
db.get(id)
.unwrap()
.unwrap()
.get("v")
.and_then(Value::as_int),
Some(49)
);
let id2 = db.insert(doc(&[("w", 7)])).unwrap();
assert!(db.get(id2).unwrap().is_some());
let _ = std::fs::remove_file(&path);
}
#[test]
fn test_value_too_large_is_rejected_and_leaves_db_unchanged() {
let path = temp_path();
let mut db = Db::open(&path).unwrap();
let mut big = Document::new();
big.set("blob", Value::Bytes(vec![0u8; MAX_RECORD_BYTES + 1]));
assert!(matches!(db.insert(big), Err(Error::ValueTooLarge)));
assert_eq!(db.len(), 0);
let id = db.insert(doc(&[("ok", 1)])).unwrap();
assert_eq!(id.get(), 1, "a rejected insert must not consume an id");
let _ = std::fs::remove_file(&path);
}
#[test]
fn test_open_removes_stale_compacting_temp() {
let path = temp_path();
{
let mut db = Db::open(&path).unwrap();
db.insert(doc(&[("v", 1)])).unwrap();
db.flush().unwrap();
}
let stale = compacting_path(&path);
std::fs::write(&stale, b"garbage from an interrupted compaction").unwrap();
assert!(stale.exists());
let db = Db::open(&path).unwrap();
assert!(!stale.exists(), "open should remove the stale temp");
assert_eq!(db.len(), 1);
let _ = std::fs::remove_file(&path);
}
}