tempest-kv 0.0.2

Key-Value storage layer for TempestDB
Documentation
use std::pin::pin;

use futures::future::join_all;
use tempest_core::test_utils::setup_tracing;
use tempest_io::VirtualIo;
use tempest_rt::{Runtime, block_on, yield_now};

use super::*;

async fn tick_until<I: Io, F: Future>(wal: &mut Wal<I>, fut: F, max_ticks: usize) -> F::Output {
    let mut fut = pin!(fut);
    let mut n = 0;
    while n < max_ticks {
        trace!(n, "TICK");
        wal.tick().await.unwrap();
        yield_now().await;
        if let Poll::Ready(val) = poll_fn(|cx| Poll::Ready(fut.as_mut().poll(cx))).await {
            return val;
        }
        n += 1;
    }
    panic!("timed out: operation exceeded {} ticks", max_ticks);
}

#[test]
fn test_wal_append_acked() {
    setup_tracing();

    block_on(VirtualIo::default(), async {
        let mut wal = Wal::<VirtualIo>::init(PathBuf::from("/wal"), WalConfig::default(), |_| {})
            .await
            .unwrap();

        let ack = wal.append(b"hello").recv();
        let result = tick_until(&mut wal, ack, 32).await;
        result.expect("channel not closed").unwrap();
        wal.close().await.unwrap();
    });
}

#[test]
fn test_wal_multi_append() {
    setup_tracing();

    block_on(VirtualIo::default(), async {
        let mut wal = Wal::<VirtualIo>::init(PathBuf::from("/wal"), WalConfig::default(), |_| {})
            .await
            .unwrap();

        let acks = [
            wal.append(b"hello"),
            wal.append(b"there"),
            wal.append(b"world"),
        ]
        .map(|ack| ack.recv());
        let acks_future = join_all(acks);

        let results = tick_until(&mut wal, acks_future, 32).await;
        for res in results {
            res.expect("channel not closed").unwrap()
        }
        wal.close().await.unwrap();
    });
}

#[test]
fn test_wal_rotation() {
    setup_tracing();

    // the first appended record triggers rotation
    let config = WalConfig {
        rotate_file_size_threshold: WAL_HEADER_SIZE as u64 + 1,
        ..WalConfig::default()
    };

    let mut rt = Runtime::new(VirtualIo::default());
    rt.block_on(async {
        let mut wal = Wal::<VirtualIo>::init(PathBuf::from("/wal"), config, |_| {})
            .await
            .unwrap();

        let ack1 = wal.append(b"hello");
        let ack2 = wal.append(b"world");

        let results = tick_until(&mut wal, join_all([ack1, ack2].map(|a| a.recv())), 32).await;
        for res in results {
            res.expect("channel not closed").unwrap();
        }

        // acks arrive when the old file is fsynced; keep ticking until rotation settles
        while wal.is_idle() {
            wal.tick().await.unwrap();
            yield_now().await;
        }
        wal.close().await.unwrap();
    });

    let io = rt.inspect_io();
    let files = io.list_dir(Path::new("/wal/files")).unwrap();
    assert!(
        files
            .iter()
            .any(|e| e.path == PathBuf::from("/wal/files/1.wal")),
        "expected rotation to produce /wal/files/1.wal, got: {:?}",
        files.iter().map(|e| &e.path).collect::<Vec<_>>()
    );
}

#[test]
fn test_wal_recovery() {
    setup_tracing();

    let records: &[&[u8]] = &[b"hello", b"world", b"foo"];

    block_on(VirtualIo::default(), async {
        // instance 1: write records and ensure they are durable
        {
            let mut wal =
                Wal::<VirtualIo>::init(PathBuf::from("/wal"), WalConfig::default(), |_| {})
                    .await
                    .unwrap();

            let acks: Vec<_> = records.iter().map(|r| wal.append(r).recv()).collect();
            let results = tick_until(&mut wal, join_all(acks), 32).await;
            for res in results {
                res.expect("channel not closed").unwrap();
            }
            wal.close().await.unwrap();
        }

        // instance 2: re-init on same VirtualIo, records should be replayed
        {
            let mut recovered: Vec<Vec<u8>> = Vec::new();
            let wal = Wal::<VirtualIo>::init(PathBuf::from("/wal"), WalConfig::default(), |rec| {
                recovered.push(rec.to_vec())
            })
            .await
            .unwrap();

            assert_eq!(
                recovered.len(),
                records.len(),
                "recovered record count mismatch"
            );
            for (got, expected) in recovered.iter().zip(records.iter()) {
                assert_eq!(got.as_slice(), *expected, "recovered record data mismatch");
            }
            wal.close().await.unwrap();
        }
    });
}