tempest-kv 0.0.2

Key-Value storage layer for TempestDB
Documentation
use std::{ops::Bound, path::PathBuf};

use bytes::Bytes;
use futures::TryStreamExt;
use tempest_core::test_utils::setup_tracing;
use tempest_io::VirtualIo;
use tempest_rt::{JoinHandle, block_on};

use crate::{DefaultStrategy, Storage, StorageHandle, batch::WriteBatch, config::StorageConfig};

fn storage() -> (StorageHandle, JoinHandle<()>) {
    Storage::<VirtualIo, DefaultStrategy>::init(
        PathBuf::from("/storage"),
        StorageConfig::for_testing(),
    )
}

/// Drop the handle (closes all channels) then await the worker so it can flush
/// and close its file descriptors before VirtualIo is dropped.
async fn shutdown(storage: StorageHandle, join: JoinHandle<()>) {
    drop(storage);
    join.await;
}

fn b(s: &'static str) -> Bytes {
    Bytes::from_static(s.as_bytes())
}

// -- write tests --

#[test]
fn test_storage_write_empty_batch() {
    setup_tracing();
    block_on(VirtualIo::default(), async {
        let (storage, join) = storage();
        storage.write(WriteBatch::new()).await.unwrap();
        shutdown(storage, join).await;
    });
}

#[test]
fn test_storage_write_single_put() {
    setup_tracing();
    block_on(VirtualIo::default(), async {
        let (storage, join) = storage();
        let mut batch = WriteBatch::new();
        batch.put(b"apple", b"red");
        storage.write(batch).await.unwrap();
        shutdown(storage, join).await;
    });
}

#[test]
fn test_storage_write_multiple_puts() {
    setup_tracing();
    block_on(VirtualIo::default(), async {
        let (storage, join) = storage();
        let mut batch = WriteBatch::new();
        batch.put(b"apple", b"red");
        batch.put(b"banana", b"yellow");
        batch.put(b"cherry", b"red");
        storage.write(batch).await.unwrap();
        shutdown(storage, join).await;
    });
}

#[test]
fn test_storage_write_sequential_batches() {
    setup_tracing();
    block_on(VirtualIo::default(), async {
        let (storage, join) = storage();
        let mut batch = WriteBatch::new();
        batch.put(b"a", b"1");
        storage.write(batch).await.unwrap();

        let mut batch = WriteBatch::new();
        batch.put(b"b", b"2");
        storage.write(batch).await.unwrap();
        shutdown(storage, join).await;
    });
}

// -- get tests --

#[test]
fn test_storage_get_missing_key() {
    setup_tracing();
    block_on(VirtualIo::default(), async {
        let (storage, join) = storage();
        let result = storage.get(b("apple")).await.unwrap();
        assert_eq!(result, None);
        shutdown(storage, join).await;
    });
}

#[test]
fn test_storage_get_existing_key() {
    setup_tracing();
    block_on(VirtualIo::default(), async {
        let (storage, join) = storage();
        let mut batch = WriteBatch::new();
        batch.put(b"apple", b"red");
        storage.write(batch).await.unwrap();

        let result = storage.get(b("apple")).await.unwrap();
        assert_eq!(result, Some(b("red")));
        shutdown(storage, join).await;
    });
}

#[test]
fn test_storage_get_overwritten_key() {
    setup_tracing();
    block_on(VirtualIo::default(), async {
        let (storage, join) = storage();

        let mut batch = WriteBatch::new();
        batch.put(b"apple", b"old");
        storage.write(batch).await.unwrap();

        let mut batch = WriteBatch::new();
        batch.put(b"apple", b"new");
        storage.write(batch).await.unwrap();

        let result = storage.get(b("apple")).await.unwrap();
        assert_eq!(result, Some(b("new")));
        shutdown(storage, join).await;
    });
}

#[test]
fn test_storage_get_deleted_key() {
    setup_tracing();
    block_on(VirtualIo::default(), async {
        let (storage, join) = storage();

        let mut batch = WriteBatch::new();
        batch.put(b"apple", b"red");
        storage.write(batch).await.unwrap();

        let mut batch = WriteBatch::new();
        batch.delete(b"apple");
        storage.write(batch).await.unwrap();

        let result = storage.get(b("apple")).await.unwrap();
        assert_eq!(result, None);
        shutdown(storage, join).await;
    });
}

#[test]
fn test_storage_get_unrelated_key_after_delete() {
    setup_tracing();
    block_on(VirtualIo::default(), async {
        let (storage, join) = storage();

        let mut batch = WriteBatch::new();
        batch.put(b"apple", b"red");
        batch.put(b"banana", b"yellow");
        storage.write(batch).await.unwrap();

        let mut batch = WriteBatch::new();
        batch.delete(b"apple");
        storage.write(batch).await.unwrap();

        assert_eq!(storage.get(b("apple")).await.unwrap(), None);
        assert_eq!(storage.get(b("banana")).await.unwrap(), Some(b("yellow")));
        shutdown(storage, join).await;
    });
}

// -- scan tests --

#[test]
fn test_storage_scan_unbounded() {
    setup_tracing();
    block_on(VirtualIo::default(), async {
        let (storage, join) = storage();

        let mut batch = WriteBatch::new();
        batch.put(b"a", b"1");
        batch.put(b"b", b"2");
        batch.put(b"c", b"3");
        storage.write(batch).await.unwrap();

        let scan = storage
            .scan(Bound::Unbounded, Bound::Unbounded)
            .await
            .unwrap();
        let results: Vec<_> = scan.try_collect().await.unwrap();
        assert_eq!(
            results,
            vec![(b("a"), b("1")), (b("b"), b("2")), (b("c"), b("3"))]
        );
        shutdown(storage, join).await;
    });
}

#[test]
fn test_storage_scan_included_start() {
    setup_tracing();
    block_on(VirtualIo::default(), async {
        let (storage, join) = storage();

        let mut batch = WriteBatch::new();
        batch.put(b"a", b"1");
        batch.put(b"b", b"2");
        batch.put(b"c", b"3");
        storage.write(batch).await.unwrap();

        let scan = storage
            .scan(Bound::Included(b("b")), Bound::Unbounded)
            .await
            .unwrap();
        let results: Vec<_> = scan.try_collect().await.unwrap();
        assert_eq!(results, vec![(b("b"), b("2")), (b("c"), b("3"))]);
        shutdown(storage, join).await;
    });
}

#[test]
fn test_storage_scan_excluded_end() {
    setup_tracing();
    block_on(VirtualIo::default(), async {
        let (storage, join) = storage();

        let mut batch = WriteBatch::new();
        batch.put(b"a", b"1");
        batch.put(b"b", b"2");
        batch.put(b"c", b"3");
        storage.write(batch).await.unwrap();

        let scan = storage
            .scan(Bound::Unbounded, Bound::Excluded(b("c")))
            .await
            .unwrap();
        let results: Vec<_> = scan.try_collect().await.unwrap();
        assert_eq!(results, vec![(b("a"), b("1")), (b("b"), b("2"))]);
        shutdown(storage, join).await;
    });
}

#[test]
fn test_storage_scan_both_bounds() {
    setup_tracing();
    block_on(VirtualIo::default(), async {
        let (storage, join) = storage();

        let mut batch = WriteBatch::new();
        batch.put(b"apple", b"red");
        batch.put(b"banana", b"yellow");
        batch.put(b"cherry", b"red");
        batch.put(b"date", b"brown");
        storage.write(batch).await.unwrap();

        let scan = storage
            .scan(Bound::Included(b("banana")), Bound::Excluded(b("date")))
            .await
            .unwrap();
        let results: Vec<_> = scan.try_collect().await.unwrap();
        assert_eq!(
            results,
            vec![(b("banana"), b("yellow")), (b("cherry"), b("red"))]
        );
        shutdown(storage, join).await;
    });
}

#[test]
fn test_storage_scan_empty_range() {
    setup_tracing();
    block_on(VirtualIo::default(), async {
        let (storage, join) = storage();

        let mut batch = WriteBatch::new();
        batch.put(b"a", b"1");
        batch.put(b"z", b"26");
        storage.write(batch).await.unwrap();

        let scan = storage
            .scan(Bound::Included(b("m")), Bound::Excluded(b("n")))
            .await
            .unwrap();
        let results: Vec<_> = scan.try_collect().await.unwrap();
        assert!(results.is_empty());
        shutdown(storage, join).await;
    });
}

#[test]
fn test_storage_scan_snapshot_isolation() {
    setup_tracing();
    block_on(VirtualIo::default(), async {
        let (storage, join) = storage();

        // batch A - the baseline visible to the scan
        let mut batch = WriteBatch::new();
        batch.put(b"banana", b"yellow");
        batch.put(b"kiwi", b"green");
        batch.put(b"lemon", b"yellow");
        storage.write(batch).await.unwrap();

        // start scan - snapshot is captured here
        let scan = storage
            .scan(Bound::Included(b("banana")), Bound::Excluded(b("orange")))
            .await
            .unwrap();

        // batch B - written after scan start, must be invisible to the scan
        let mut batch = WriteBatch::new();
        batch.put(b"banana", b"wrong color");
        batch.put(b"cherry", b"red");
        batch.delete(b"lemon");
        storage.write(batch).await.unwrap();

        let results: Vec<_> = scan.try_collect().await.unwrap();
        assert_eq!(
            results,
            vec![
                (b("banana"), b("yellow")),
                (b("kiwi"), b("green")),
                (b("lemon"), b("yellow")),
            ]
        );
        shutdown(storage, join).await;
    });
}

#[test]
fn test_storage_scan_dedup_latest_version() {
    setup_tracing();
    block_on(VirtualIo::default(), async {
        let (storage, join) = storage();

        let mut batch = WriteBatch::new();
        batch.put(b"apple", b"old");
        storage.write(batch).await.unwrap();

        let mut batch = WriteBatch::new();
        batch.put(b"apple", b"new");
        storage.write(batch).await.unwrap();

        let scan = storage
            .scan(Bound::Unbounded, Bound::Unbounded)
            .await
            .unwrap();
        let results: Vec<_> = scan.try_collect().await.unwrap();
        // only one entry, with the latest value
        assert_eq!(results, vec![(b("apple"), b("new"))]);
        shutdown(storage, join).await;
    });
}

#[test]
fn test_storage_scan_deleted_key_absent() {
    setup_tracing();
    block_on(VirtualIo::default(), async {
        let (storage, join) = storage();

        let mut batch = WriteBatch::new();
        batch.put(b"a", b"1");
        batch.put(b"b", b"2");
        batch.put(b"c", b"3");
        storage.write(batch).await.unwrap();

        let mut batch = WriteBatch::new();
        batch.delete(b"b");
        storage.write(batch).await.unwrap();

        let scan = storage
            .scan(Bound::Unbounded, Bound::Unbounded)
            .await
            .unwrap();
        let results: Vec<_> = scan.try_collect().await.unwrap();
        assert_eq!(results, vec![(b("a"), b("1")), (b("c"), b("3"))]);
        shutdown(storage, join).await;
    });
}