emdb 0.7.0

A lightweight, high-performance embedded database for Rust.
Documentation
use std::collections::BTreeSet;
use std::sync::Arc;
use std::thread;

use emdb::{Emdb, Error, Result};

fn tmp_path(name: &str) -> std::path::PathBuf {
    let mut p = std::env::temp_dir();
    let nanos = std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .map_or(0_u128, |d| d.as_nanos());
    p.push(format!("emdb-concurrency-{name}-{nanos}.emdb"));
    p
}

fn read_header_last_tx_id(path: &std::path::Path) -> Result<u64> {
    let bytes = std::fs::read(path)?;
    let mut arr = [0_u8; 8];
    let offset = if bytes.get(0..8) == Some(b"EMDBPAGE") {
        28
    } else {
        32
    };
    arr.copy_from_slice(&bytes[offset..offset + 8]);
    Ok(u64::from_le_bytes(arr))
}

#[test]
fn many_readers_one_writer_no_deadlock_and_correct_state() -> Result<()> {
    let db = Arc::new(Emdb::open_in_memory());

    for i in 0_u32..100 {
        db.insert(format!("seed:{i}"), format!("v{i}"))?;
    }

    let writer_db = Arc::clone(&db);
    let writer = thread::spawn(move || -> Result<()> {
        for i in 0_u32..1_000 {
            writer_db.insert(format!("w:{i}"), format!("{i}"))?;
        }
        Ok(())
    });

    let mut readers = Vec::new();
    for _ in 0_u32..10 {
        let reader_db = Arc::clone(&db);
        readers.push(thread::spawn(move || -> Result<()> {
            let mut hits = 0_usize;
            for i in 0_u32..10_000 {
                let key = format!("seed:{}", i % 100);
                if reader_db.get(key)?.is_some() {
                    hits += 1;
                }
            }
            if hits == 0 {
                return Err(Error::TransactionAborted("reader observed no visible keys"));
            }
            Ok(())
        }));
    }

    let writer_result = writer.join();
    match writer_result {
        Ok(inner) => inner?,
        Err(_panic) => return Err(Error::TransactionAborted("writer thread panicked")),
    }

    for handle in readers {
        let join_result = handle.join();
        match join_result {
            Ok(inner) => inner?,
            Err(_panic) => return Err(Error::TransactionAborted("reader thread panicked")),
        }
    }

    assert_eq!(db.len()?, 1_100);
    Ok(())
}

#[test]
fn concurrent_transactions_are_serialized_with_monotonic_tx_ids() -> Result<()> {
    let path = tmp_path("tx-serialized");
    let db = Arc::new(Emdb::open(&path)?);

    let mut workers = Vec::new();
    for t in 0_u32..2 {
        let db = Arc::clone(&db);
        workers.push(thread::spawn(move || -> Result<()> {
            for i in 0_u32..100 {
                db.transaction(|tx| {
                    tx.insert(format!("t{t}:{i}"), format!("v{i}"))?;
                    Ok(())
                })?;
            }
            Ok(())
        }));
    }

    for worker in workers {
        let join_result = worker.join();
        match join_result {
            Ok(inner) => inner?,
            Err(_panic) => return Err(Error::TransactionAborted("transaction worker panicked")),
        }
    }

    assert_eq!(db.len()?, 200);
    assert_eq!(read_header_last_tx_id(path.as_path())?, 200);

    assert!(std::fs::remove_file(path).is_ok());
    Ok(())
}

#[test]
fn lockfile_contention_and_release_work() -> Result<()> {
    let path = tmp_path("lockfile-contention");

    let first = Emdb::open(&path)?;
    let second = Emdb::open(&path);
    if second.is_ok() {
        return Err(Error::TransactionAborted(
            "second open unexpectedly succeeded",
        ));
    }

    drop(first);

    let reopened = Emdb::open(&path)?;
    drop(reopened);

    let mut lock_path = path.as_os_str().to_owned();
    lock_path.push(".lock");
    let lock_path = std::path::PathBuf::from(lock_path);

    let _removed_db = std::fs::remove_file(path);
    let _removed_lock = std::fs::remove_file(lock_path);
    Ok(())
}

#[test]
fn lockfile_released_on_panic() -> Result<()> {
    let path = tmp_path("lockfile-panic");

    let unwind = std::panic::catch_unwind({
        let path = path.clone();
        move || {
            let opened = Emdb::open(&path);
            match opened {
                Ok(db) => {
                    let _keep_alive = db;
                }
                Err(err) => panic!("unexpected open error before panic path: {err}"),
            }
            panic!("intentional panic to test lockfile drop");
        }
    });
    assert!(unwind.is_err());

    let reopened = Emdb::open(&path)?;
    drop(reopened);

    let mut lock_path = path.as_os_str().to_owned();
    lock_path.push(".lock");
    let lock_path = std::path::PathBuf::from(lock_path);

    let _removed_db = std::fs::remove_file(path);
    let _removed_lock = std::fs::remove_file(lock_path);
    Ok(())
}

#[test]
fn clone_handle_across_threads_preserves_correctness() -> Result<()> {
    let db = Emdb::open_in_memory();

    let mut workers = Vec::new();
    for i in 0_u32..4 {
        let db = db.clone_handle();
        workers.push(thread::spawn(move || -> Result<()> {
            for j in 0_u32..250 {
                db.insert(format!("k{i}:{j}"), format!("v{j}"))?;
            }
            Ok(())
        }));
    }

    for worker in workers {
        let join_result = worker.join();
        match join_result {
            Ok(inner) => inner?,
            Err(_panic) => return Err(Error::TransactionAborted("clone worker panicked")),
        }
    }

    assert_eq!(db.len()?, 1_000);

    let mut keys = BTreeSet::new();
    for (k, _v) in db.iter()? {
        let _inserted = keys.insert(k);
    }
    assert_eq!(keys.len(), 1_000);
    Ok(())
}