trine-kv 0.2.0

Embedded LSM MVCC key-value database.
Documentation
use std::{
    collections::BTreeMap,
    fs,
    path::PathBuf,
    time::{SystemTime, UNIX_EPOCH},
};

use trine_kv::{Bucket, BucketOptions, Db, DbOptions, Iter, KeyRange, Snapshot};

struct ModelSnapshot {
    snapshot: Snapshot,
    model: BTreeMap<Vec<u8>, Vec<u8>>,
    step: usize,
}

#[test]
fn persistent_snapshot_range_ignores_newer_point_delete() {
    let path = temp_db_path("snapshot-range-newer-delete");
    let options = DbOptions::persistent(&path);

    {
        let db = Db::open_sync(options).expect("persistent db opens");
        let bucket = db.default_bucket_sync().expect("bucket opens");
        bucket.put_sync(b"key-06", b"value-06").expect("put");
        db.flush_sync().expect("flush table");

        let snapshot = db.snapshot();
        bucket.delete_sync(b"key-06").expect("newer delete");

        assert_eq!(
            snapshot.get_sync(&bucket, b"key-06").expect("snapshot get"),
            Some(b"value-06".to_vec())
        );
        assert_eq!(
            collect_rows(
                snapshot
                    .range_sync(&bucket, &KeyRange::all())
                    .expect("snapshot range")
            ),
            vec![(b"key-06".to_vec(), b"value-06".to_vec())]
        );
    }

    fs::remove_dir_all(path).expect("cleanup test db");
}

#[test]
fn randomized_operations_match_mvcc_reference_across_reopen() {
    let path = temp_db_path("model-reference");
    let mut options = DbOptions::persistent(&path);
    options.max_l0_files = 2;
    options.target_table_bytes = 512;
    let bucket_options = BucketOptions {
        block_bytes: 128,
        ..BucketOptions::default()
    };
    options.default_bucket_options = bucket_options;
    let keys = model_keys();
    let mut rng = TestRng::new(0x6eed_5eed_d15e_a5e5);
    let mut model = BTreeMap::<Vec<u8>, Vec<u8>>::new();

    {
        let db = Db::open_sync(options.clone()).expect("persistent db opens");
        let bucket = db.default_bucket_sync().expect("bucket opens");
        let mut run = ModelRun::new(&db, &bucket, &keys, &mut rng, &mut model);
        run.run();
    }

    {
        let db = Db::open_sync(options).expect("persistent db reopens");
        let bucket = db.default_bucket_sync().expect("bucket reopens");
        assert_range(&bucket, &model);
    }

    fs::remove_dir_all(path).expect("cleanup test db");
}

struct ModelRun<'run> {
    db: &'run Db,
    bucket: &'run Bucket,
    keys: &'run [Vec<u8>],
    rng: &'run mut TestRng,
    model: &'run mut BTreeMap<Vec<u8>, Vec<u8>>,
    snapshots: Vec<ModelSnapshot>,
    history: Vec<String>,
}

impl<'run> ModelRun<'run> {
    fn new(
        db: &'run Db,
        bucket: &'run Bucket,
        keys: &'run [Vec<u8>],
        rng: &'run mut TestRng,
        model: &'run mut BTreeMap<Vec<u8>, Vec<u8>>,
    ) -> Self {
        Self {
            db,
            bucket,
            keys,
            rng,
            model,
            snapshots: Vec::new(),
            history: Vec::new(),
        }
    }

    fn run(&mut self) {
        for step in 0..320 {
            self.apply_random_operation(step);
            self.assert_periodic_checks(step);
        }
        self.assert_after_final_flush_and_compaction();
    }

    fn apply_random_operation(&mut self, step: usize) {
        match self.rng.usize(10) {
            0..=2 => self.put_random_key(step),
            3 => self.delete_random_key(step),
            4 => self.delete_random_range(step),
            5 => self.assert_random_get(step),
            6 => self.assert_full_range(step),
            7 => self.capture_snapshot(step),
            8 => self.assert_random_snapshot(step),
            _ => self.flush_and_maybe_compact(step),
        }
    }

    fn put_random_key(&mut self, step: usize) {
        let key = self.keys[self.rng.usize(self.keys.len())].clone();
        let value = format!("value-{step:03}-{}", String::from_utf8_lossy(&key)).into_bytes();
        self.bucket
            .put_sync(key.clone(), value.clone())
            .expect("put");
        self.history.push(format!(
            "{step}: put {} -> {}",
            String::from_utf8_lossy(&key),
            String::from_utf8_lossy(&value)
        ));
        self.model.insert(key, value);
    }

    fn delete_random_key(&mut self, step: usize) {
        let key = self.keys[self.rng.usize(self.keys.len())].clone();
        self.bucket.delete_sync(key.clone()).expect("point delete");
        self.history
            .push(format!("{step}: delete {}", String::from_utf8_lossy(&key)));
        self.model.remove(&key);
    }

    fn delete_random_range(&mut self, step: usize) {
        let (start, end) = random_key_span(self.rng, self.keys);
        self.bucket
            .delete_range_sync(KeyRange::half_open(
                self.keys[start].clone(),
                self.keys[end].clone(),
            ))
            .expect("range delete");
        self.history.push(format!(
            "{step}: delete_range {}..{}",
            String::from_utf8_lossy(&self.keys[start]),
            String::from_utf8_lossy(&self.keys[end])
        ));
        remove_model_range(self.model, &self.keys[start], &self.keys[end]);
    }

    fn assert_random_get(&mut self, step: usize) {
        self.history.push(format!("{step}: get"));
        assert_random_get(
            self.bucket,
            self.model,
            self.keys,
            self.rng,
            step,
            &self.history,
        );
    }

    fn assert_full_range(&mut self, step: usize) {
        self.history.push(format!("{step}: range"));
        assert_range(self.bucket, self.model);
    }

    fn capture_snapshot(&mut self, step: usize) {
        self.history.push(format!("{step}: snapshot"));
        self.snapshots.push(ModelSnapshot {
            snapshot: self.db.snapshot(),
            model: (*self.model).clone(),
            step,
        });
        if self.snapshots.len() > 6 {
            self.snapshots.remove(0);
        }
    }

    fn assert_random_snapshot(&mut self, step: usize) {
        if self.snapshots.is_empty() {
            return;
        }
        let index = self.rng.usize(self.snapshots.len());
        self.history.push(format!("{step}: check snapshot {index}"));
        assert_snapshot(&self.snapshots[index], self.bucket, self.keys, step);
    }

    fn flush_and_maybe_compact(&mut self, step: usize) {
        self.history.push(format!("{step}: flush"));
        self.db.flush_sync().expect("flush succeeds");
        if self.rng.usize(2) == 0 {
            self.history.push(format!("{step}: compact_all"));
            self.db
                .compact_range_sync(KeyRange::all())
                .expect("compaction succeeds");
        }
    }

    fn assert_periodic_checks(&mut self, step: usize) {
        if step % 17 == 0 {
            assert_random_get(
                self.bucket,
                self.model,
                self.keys,
                self.rng,
                step,
                &self.history,
            );
        }
        if step % 41 == 0 {
            assert_range(self.bucket, self.model);
        }
    }

    fn assert_after_final_flush_and_compaction(&mut self) {
        assert_range(self.bucket, self.model);
        self.snapshots.clear();
        self.db.flush_sync().expect("final flush succeeds");
        self.db
            .compact_range_sync(KeyRange::all())
            .expect("final compaction succeeds");
        assert_range(self.bucket, self.model);
    }
}

fn assert_random_get(
    bucket: &Bucket,
    model: &BTreeMap<Vec<u8>, Vec<u8>>,
    keys: &[Vec<u8>],
    rng: &mut TestRng,
    step: usize,
    history: &[String],
) {
    let key = &keys[rng.usize(keys.len())];
    assert_eq!(
        bucket.get_sync(key).expect("point read"),
        model.get(key).cloned(),
        "point mismatch at step {step} for key {:?}\n{}",
        String::from_utf8_lossy(key),
        history_tail(history)
    );
}

fn assert_range(bucket: &Bucket, model: &BTreeMap<Vec<u8>, Vec<u8>>) {
    assert_eq!(
        collect_rows(bucket.range_sync(&KeyRange::all()).expect("range read")),
        model_rows(model)
    );
}

fn assert_snapshot(snapshot: &ModelSnapshot, bucket: &Bucket, keys: &[Vec<u8>], step: usize) {
    for key in keys {
        assert_eq!(
            snapshot
                .snapshot
                .get_sync(bucket, key)
                .expect("snapshot point read"),
            snapshot.model.get(key).cloned(),
            "snapshot point mismatch at step {step} for snapshot from step {} and key {:?}",
            snapshot.step,
            String::from_utf8_lossy(key)
        );
    }
    assert_eq!(
        collect_rows(
            snapshot
                .snapshot
                .range_sync(bucket, &KeyRange::all())
                .expect("snapshot range read")
        ),
        model_rows(&snapshot.model),
        "snapshot range mismatch at step {step} for snapshot from step {}",
        snapshot.step
    );
}

fn collect_rows(iter: Iter) -> Vec<(Vec<u8>, Vec<u8>)> {
    iter.map(|item| {
        let item = item.expect("iterator item reads");
        (item.key, item.value)
    })
    .collect()
}

fn model_rows(model: &BTreeMap<Vec<u8>, Vec<u8>>) -> Vec<(Vec<u8>, Vec<u8>)> {
    model
        .iter()
        .map(|(key, value)| (key.clone(), value.clone()))
        .collect()
}

fn remove_model_range(model: &mut BTreeMap<Vec<u8>, Vec<u8>>, start: &[u8], end: &[u8]) {
    let removed = model
        .range(start.to_vec()..end.to_vec())
        .map(|(key, _)| key.clone())
        .collect::<Vec<_>>();
    for key in removed {
        model.remove(&key);
    }
}

fn history_tail(history: &[String]) -> String {
    history
        .iter()
        .skip(history.len().saturating_sub(32))
        .cloned()
        .collect::<Vec<_>>()
        .join("\n")
}

fn random_key_span(rng: &mut TestRng, keys: &[Vec<u8>]) -> (usize, usize) {
    let start = rng.usize(keys.len() - 1);
    let end = start + 1 + rng.usize(keys.len() - start - 1);
    (start, end)
}

fn model_keys() -> Vec<Vec<u8>> {
    (0..12)
        .map(|index| format!("key-{index:02}").into_bytes())
        .collect()
}

fn temp_db_path(name: &str) -> PathBuf {
    let nonce = SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .expect("system time after epoch")
        .as_nanos();
    std::env::temp_dir().join(format!("trine-kv-{name}-{}-{nonce}", std::process::id()))
}

#[derive(Debug, Clone, Copy)]
struct TestRng {
    state: u64,
}

impl TestRng {
    const fn new(seed: u64) -> Self {
        Self { state: seed }
    }

    fn next(&mut self) -> u64 {
        self.state = self
            .state
            .wrapping_mul(6_364_136_223_846_793_005)
            .wrapping_add(1);
        self.state
    }

    fn usize(&mut self, upper: usize) -> usize {
        let upper = u64::try_from(upper).expect("test upper bound fits u64");
        usize::try_from(self.next() % upper).expect("bounded random value fits usize")
    }
}