tempest-kv 0.0.2

Key-Value storage layer for TempestDB
Documentation
use std::path::PathBuf;

use tempest_core::test_utils::setup_tracing;
use tempest_io::VirtualIo;
use tempest_rt::block_on;

use std::rc::Rc;

use crate::{
    base::{DefaultComparer, InternalKey, SeqNum},
    config::SstWriteConfig,
    iterator::{StorageIterator, mock::MockIterator},
};

use super::*;

type Mock = MockIterator<DefaultComparer>;

fn path() -> PathBuf {
    PathBuf::from("/test.sst")
}

#[test]
fn test_write_single_block() {
    setup_tracing();

    block_on(VirtualIo::default(), async {
        let source = Mock::new().add_items([(0, 10, "a"), (1, 11, "b"), (2, 12, "c")]);

        let stats =
            write::<VirtualIo, DefaultComparer, _>(path(), 3, SstWriteConfig::default(), source)
                .await
                .unwrap();

        assert_eq!(stats.entry_count, 3);
        assert!(stats.file_size > 0);
        assert_eq!(stats.min_seqnum, SeqNum::new(10).unwrap());
        assert_eq!(stats.max_seqnum, SeqNum::new(12).unwrap());
    });
}

#[test]
fn test_write_seqnum_range() {
    setup_tracing();

    // seqnums are not sorted by key order, so min/max must be tracked across all entries
    block_on(VirtualIo::default(), async {
        let source =
            Mock::new().add_items([(0, 20, "a"), (1, 10, "b"), (2, 30, "c"), (3, 15, "d")]);

        let stats =
            write::<VirtualIo, DefaultComparer, _>(path(), 4, SstWriteConfig::default(), source)
                .await
                .unwrap();

        assert_eq!(stats.entry_count, 4);
        assert_eq!(stats.min_seqnum, SeqNum::new(10).unwrap());
        assert_eq!(stats.max_seqnum, SeqNum::new(30).unwrap());
    });
}

#[test]
fn test_write_multi_block() {
    setup_tracing();

    // small block size forces multiple block flushes
    let config = SstWriteConfig {
        block_target_size: 32,
        ..Default::default()
    };

    block_on(VirtualIo::default(), async {
        let source = Mock::new().add_items([
            (0, 10, "value-a"),
            (1, 11, "value-b"),
            (2, 12, "value-c"),
            (3, 13, "value-d"),
            (4, 14, "value-e"),
            (5, 15, "value-f"),
        ]);

        let stats = write::<VirtualIo, DefaultComparer, _>(path(), 6, config, source)
            .await
            .unwrap();

        assert_eq!(stats.entry_count, 6);
        assert!(stats.file_size > 0);
        assert_eq!(stats.min_seqnum, SeqNum::new(10).unwrap());
        assert_eq!(stats.max_seqnum, SeqNum::new(15).unwrap());
    });
}

#[test]
fn test_write_file_size_grows_with_entries() {
    setup_tracing();

    block_on(VirtualIo::default(), async {
        let small = Mock::new().add_items([(0, 10, "a"), (1, 11, "b")]);
        let stats_small = write::<VirtualIo, DefaultComparer, _>(
            PathBuf::from("/small.sst"),
            2,
            SstWriteConfig::default(),
            small,
        )
        .await
        .unwrap();

        let large = Mock::new().add_items([
            (0, 10, "a"),
            (1, 11, "b"),
            (2, 12, "c"),
            (3, 13, "d"),
            (4, 14, "e"),
            (5, 15, "f"),
            (6, 16, "g"),
            (7, 17, "h"),
        ]);
        let stats_large = write::<VirtualIo, DefaultComparer, _>(
            PathBuf::from("/large.sst"),
            8,
            SstWriteConfig::default(),
            large,
        )
        .await
        .unwrap();

        assert!(stats_large.file_size > stats_small.file_size);
    });
}

async fn write_and_load(
    items: impl IntoIterator<Item = (u64, u64, &'static str)>,
) -> SstHandle<VirtualIo, DefaultComparer> {
    let source = Mock::new().add_items(items.into_iter());
    write::<VirtualIo, DefaultComparer, _>(path(), 0, SstWriteConfig::default(), source)
        .await
        .unwrap();
    load::<VirtualIo, DefaultComparer>(path()).await.unwrap()
}

#[test]
fn test_load_valid() {
    setup_tracing();

    block_on(VirtualIo::default(), async {
        let source = Mock::new().add_items([(0, 10, "a"), (1, 11, "b"), (2, 12, "c")]);
        write::<VirtualIo, DefaultComparer, _>(path(), 3, SstWriteConfig::default(), source)
            .await
            .unwrap();

        let handle = load::<VirtualIo, DefaultComparer>(path()).await.unwrap();
        handle.close().await.unwrap();
    });
}

#[test]
fn test_load_nonexistent() {
    setup_tracing();

    block_on(VirtualIo::default(), async {
        let result = load::<VirtualIo, DefaultComparer>(path()).await;
        assert!(result.is_err());
    });
}

#[test]
fn test_load_invalid_magic() {
    setup_tracing();

    block_on(VirtualIo::default(), async {
        // write a file with garbage content — no valid SST magic
        let garbage = bytes::BytesMut::zeroed(256);
        let fd = tempest_rt::open_file::<VirtualIo>(
            path(),
            tempest_io::OpenOptions::new().write(true).create(true),
        )
        .await
        .unwrap();
        tempest_rt::write_exact::<_, VirtualIo>(fd, garbage, 0)
            .await
            .0
            .unwrap();
        tempest_rt::close_file::<VirtualIo>(fd).await.unwrap();

        let result = load::<VirtualIo, DefaultComparer>(path()).await;
        assert!(result.is_err());
    });
}

#[test]
fn test_get_existing_key() {
    setup_tracing();

    block_on(VirtualIo::default(), async {
        let handle = write_and_load([(0, 10, "a"), (1, 11, "b"), (2, 12, "c")]).await;

        let key = InternalKey::test_with_seqnum(1, 11);
        let result = handle.get(&key).await.unwrap();
        let (_, value) = result.expect("key should be found");
        assert_eq!(value.as_ref(), b"b");
        handle.close().await.unwrap();
    });
}

#[test]
fn test_get_missing_key() {
    setup_tracing();

    block_on(VirtualIo::default(), async {
        let handle = write_and_load([(0, 10, "a"), (1, 11, "b")]).await;

        let key = InternalKey::test_with_seqnum(99, 10);
        let result = handle.get(&key).await.unwrap();
        assert!(result.is_none());
        handle.close().await.unwrap();
    });
}

#[test]
fn test_get_snapshot_visibility() {
    setup_tracing();

    // key 0 written at seqnum 20, searching at seqnum 10 should not see it
    block_on(VirtualIo::default(), async {
        let handle = write_and_load([(0, 20, "a"), (1, 10, "b")]).await;

        let key = InternalKey::test_with_seqnum(0, 10);
        let result = handle.get(&key).await.unwrap();
        assert!(
            result.is_none(),
            "should not see entry written after snapshot"
        );
        handle.close().await.unwrap();
    });
}

// -- iterator helpers --

type TestIter = SstIterator<VirtualIo, DefaultComparer>;

async fn next(iter: &mut TestIter) -> Option<(InternalKey<DefaultComparer>, bytes::Bytes)> {
    <TestIter as StorageIterator<VirtualIo, DefaultComparer>>::next(iter)
        .await
        .unwrap()
}

async fn seek(iter: &mut TestIter, key_id: u64) {
    <TestIter as StorageIterator<VirtualIo, DefaultComparer>>::seek(
        iter,
        InternalKey::test(key_id),
    )
    .await
    .unwrap();
}

async fn make_iter(items: impl IntoIterator<Item = (u64, u64, &'static str)>) -> TestIter {
    let handle = write_and_load(items).await;
    SstIterator::new(Rc::new(handle))
}

#[test]
fn test_iter_full_scan() {
    setup_tracing();

    block_on(VirtualIo::default(), async {
        let mut iter = make_iter([(0, 10, "a"), (1, 11, "b"), (2, 12, "c")]).await;

        let (k, v) = next(&mut iter).await.unwrap();
        assert_eq!(k.test_key_as_u64(), 0);
        assert_eq!(v.as_ref(), b"a");

        let (k, v) = next(&mut iter).await.unwrap();
        assert_eq!(k.test_key_as_u64(), 1);
        assert_eq!(v.as_ref(), b"b");

        let (k, v) = next(&mut iter).await.unwrap();
        assert_eq!(k.test_key_as_u64(), 2);
        assert_eq!(v.as_ref(), b"c");

        assert!(next(&mut iter).await.is_none());
        Rc::try_unwrap(iter.into_handle())
            .unwrap_or_else(|_| panic!("handle has extra refs"))
            .close()
            .await
            .unwrap();
    });
}

#[test]
fn test_iter_seek_forward() {
    setup_tracing();

    block_on(VirtualIo::default(), async {
        let mut iter = make_iter([(0, 10, "a"), (1, 11, "b"), (2, 12, "c")]).await;

        seek(&mut iter, 1).await;

        let (k, v) = next(&mut iter).await.unwrap();
        assert_eq!(k.test_key_as_u64(), 1);
        assert_eq!(v.as_ref(), b"b");

        let (k, _) = next(&mut iter).await.unwrap();
        assert_eq!(k.test_key_as_u64(), 2);

        assert!(next(&mut iter).await.is_none());
        Rc::try_unwrap(iter.into_handle())
            .unwrap_or_else(|_| panic!("handle has extra refs"))
            .close()
            .await
            .unwrap();
    });
}

#[test]
fn test_iter_seek_backward_noop() {
    setup_tracing();

    block_on(VirtualIo::default(), async {
        let mut iter = make_iter([(0, 10, "a"), (1, 11, "b"), (2, 12, "c")]).await;

        seek(&mut iter, 1).await;
        next(&mut iter).await; // consume 1

        // backward seek - no-op, next returns 2
        seek(&mut iter, 0).await;
        let (k, _) = next(&mut iter).await.unwrap();
        assert_eq!(k.test_key_as_u64(), 2);

        assert!(next(&mut iter).await.is_none());
        Rc::try_unwrap(iter.into_handle())
            .unwrap_or_else(|_| panic!("handle has extra refs"))
            .close()
            .await
            .unwrap();
    });
}

#[test]
fn test_iter_across_block_boundary() {
    setup_tracing();

    // small block size forces entries into separate blocks
    let config = SstWriteConfig {
        block_target_size: 32,
        ..Default::default()
    };

    block_on(VirtualIo::default(), async {
        let items = [
            (0, 10, "a"),
            (1, 11, "b"),
            (2, 12, "c"),
            (3, 13, "d"),
            (4, 14, "e"),
        ];
        let source = Mock::new().add_items(items);
        write::<VirtualIo, DefaultComparer, _>(path(), 0, config, source)
            .await
            .unwrap();
        let handle = load::<VirtualIo, DefaultComparer>(path()).await.unwrap();
        let mut iter = SstIterator::new(Rc::new(handle));

        let mut keys = Vec::new();
        while let Some((k, _)) = next(&mut iter).await {
            keys.push(k.test_key_as_u64());
        }

        assert_eq!(keys, vec![0, 1, 2, 3, 4]);
        Rc::try_unwrap(iter.into_handle())
            .unwrap_or_else(|_| panic!("handle has extra refs"))
            .close()
            .await
            .unwrap();
    });
}