rollblock 0.4.1

A super-fast, block-oriented and rollbackable key-value store.
Documentation
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Barrier};
use std::thread;
use std::time::{Duration, Instant};

use rollblock::types::{Operation, StoreKey as Key};
use rollblock::{DurabilityMode, SimpleStoreFacade, StoreError, StoreFacade, StoreResult};

use super::e2e_support::{
    apply_block, init_tracing, wait_for_durable, StoreHarness, DEFAULT_TIMEOUT,
};

const FINAL_BLOCK: u64 = 5;

#[test]
fn e2e_multi_reader() -> StoreResult<()> {
    init_tracing();

    let harness = StoreHarness::builder("multi-reader")
        .initial_capacity(32)
        .durability_mode(DurabilityMode::Synchronous)
        .build();
    let store = harness.open()?;
    let key: Key = [0xE1u8; Key::BYTES].into();

    apply_block(
        &store,
        1,
        vec![Operation {
            key,
            value: 1.into(),
        }],
    )?;
    wait_for_durable(&store, 1, DEFAULT_TIMEOUT)?;

    let barrier = Arc::new(Barrier::new(3));
    let done = Arc::new(AtomicBool::new(false));

    let writer_store = store.clone();
    let writer_barrier = Arc::clone(&barrier);
    let writer_done = Arc::clone(&done);
    let writer_key = key;
    let writer_handle = thread::spawn(move || -> StoreResult<()> {
        writer_barrier.wait();

        for block in 2..=FINAL_BLOCK {
            apply_block(
                &writer_store,
                block,
                vec![Operation {
                    key: writer_key,
                    value: block.into(),
                }],
            )?;
            wait_for_durable(&writer_store, block, DEFAULT_TIMEOUT)?;
        }

        writer_store.ensure_healthy()?;
        writer_done.store(true, Ordering::SeqCst);
        Ok(())
    });

    let reader_loop = |reader_store: SimpleStoreFacade| {
        let reader_barrier = Arc::clone(&barrier);
        let reader_done = Arc::clone(&done);
        thread::spawn(move || -> StoreResult<Vec<u64>> {
            reader_barrier.wait();

            let mut observed: Vec<u64> = Vec::new();
            let mut last_seen = 1;
            let deadline = Instant::now() + DEFAULT_TIMEOUT;

            loop {
                let value = reader_store.get(key)?;
                if let Some(observed_value) = value.to_u64() {
                    if observed_value > last_seen {
                        last_seen = observed_value;
                        observed.push(observed_value);
                        if observed_value == FINAL_BLOCK {
                            break;
                        }
                    }
                }

                if reader_done.load(Ordering::SeqCst) && Instant::now() >= deadline {
                    panic!("reader timed out waiting for final value");
                }

                thread::sleep(Duration::from_millis(2));
            }

            reader_store.ensure_healthy()?;
            Ok(observed)
        })
    };

    let reader_one_handle = reader_loop(store.clone());
    let reader_two_handle = reader_loop(store.clone());

    writer_handle.join().expect("writer thread panicked")?;

    let reader_one_observed = reader_one_handle.join().expect("reader one panicked")?;
    let reader_two_observed = reader_two_handle.join().expect("reader two panicked")?;

    assert!(
        reader_one_observed.contains(&FINAL_BLOCK),
        "reader one did not observe final value"
    );
    assert!(
        reader_two_observed.contains(&FINAL_BLOCK),
        "reader two did not observe final value"
    );

    assert!(
        reader_one_observed
            .iter()
            .all(|value| *value >= 2 && *value <= FINAL_BLOCK),
        "reader one observed unexpected values: {:?}",
        reader_one_observed
    );
    assert!(
        reader_two_observed
            .iter()
            .all(|value| *value >= 2 && *value <= FINAL_BLOCK),
        "reader two observed unexpected values: {:?}",
        reader_two_observed
    );

    store.ensure_healthy()?;
    store.close()?;
    Ok(())
}

#[test]
fn e2e_lock_contention() -> StoreResult<()> {
    init_tracing();

    let harness = StoreHarness::builder("lock-contention")
        .initial_capacity(16)
        .build();
    let store = harness.open()?;
    let config_locked = harness.config();

    let err = match SimpleStoreFacade::new(config_locked.clone()) {
        Ok(_) => panic!("second writer should fail"),
        Err(err) => err,
    };
    match err {
        StoreError::DataDirLocked { requested, .. } => {
            assert_eq!(requested, "exclusive");
        }
        other => panic!("unexpected error: {other:?}"),
    }

    store.ensure_healthy()?;
    store.close()?;
    drop(store);

    let reopened = SimpleStoreFacade::new(config_locked)?;
    reopened.ensure_healthy()?;
    reopened.close()?;
    Ok(())
}

#[test]
fn e2e_parallel_execution() -> StoreResult<()> {
    init_tracing();

    const BATCH_SIZE: usize = 48;
    const BLOCK_COUNT: u64 = 4;

    let harness = StoreHarness::builder("parallel-execution")
        .thread_count(4)
        .initial_capacity(256)
        .build();
    let store = harness.open()?;

    let keys: Vec<Key> = (0..BATCH_SIZE)
        .map(|i| Key::from_u64_le(i as u64))
        .collect();

    let initial_operations: Vec<Operation> = keys
        .iter()
        .enumerate()
        .map(|(i, &key)| Operation {
            key,
            value: (100 + i as u64).into(),
        })
        .collect();
    apply_block(&store, 1, initial_operations)?;
    wait_for_durable(&store, 1, DEFAULT_TIMEOUT)?;

    for block in 2..=BLOCK_COUNT {
        let block_operations: Vec<Operation> = keys
            .iter()
            .enumerate()
            .map(|(i, &key)| Operation {
                key,
                value: (block * 1_000 + i as u64).into(),
            })
            .collect();
        apply_block(&store, block, block_operations)?;
        wait_for_durable(&store, block, DEFAULT_TIMEOUT)?;
    }

    store.ensure_healthy()?;

    assert_eq!(store.current_block()?, BLOCK_COUNT);
    assert_eq!(store.applied_block()?, BLOCK_COUNT);
    assert_eq!(store.durable_block()?, BLOCK_COUNT);

    for (i, &key) in keys.iter().enumerate() {
        let expected = BLOCK_COUNT * 1_000 + i as u64;
        assert_eq!(store.get(key)?, expected);
    }

    if let Some(metrics) = store.metrics() {
        let snapshot = metrics.snapshot();
        assert_eq!(snapshot.blocks_committed, BLOCK_COUNT);
        assert_eq!(
            snapshot.operations_applied,
            (BATCH_SIZE as u64) * BLOCK_COUNT
        );
        assert_eq!(snapshot.current_block_height, BLOCK_COUNT);
        assert_eq!(snapshot.durable_block_height, BLOCK_COUNT);
    }

    store.close()?;
    Ok(())
}