rollblock 0.4.1

A super-fast, block-oriented and rollbackable key-value store.
Documentation
use std::fs;
use std::thread;
use std::time::{Duration, Instant};

use rollblock::metrics::HealthState;
use rollblock::types::{Operation, StoreKey as Key};
use rollblock::Value;
use rollblock::{DurabilityMode, StoreError, StoreFacade, StoreResult};

use super::e2e_support::{
    apply_block, init_tracing, wait_for_durable, StoreHarness, DEFAULT_TIMEOUT,
};

#[test]
fn e2e_metrics_health() -> StoreResult<()> {
    init_tracing();

    let harness = StoreHarness::builder("metrics-health")
        .durability_mode(DurabilityMode::Synchronous)
        .initial_capacity(32)
        .build();
    let store = harness.open()?;

    let key_a: Key = [0xA1u8; Key::BYTES].into();
    let key_b: Key = [0xB2u8; Key::BYTES].into();

    apply_block(
        &store,
        1,
        vec![
            Operation {
                key: key_a,
                value: 10.into(),
            },
            Operation {
                key: key_b,
                value: 20.into(),
            },
        ],
    )?;
    wait_for_durable(&store, 1, DEFAULT_TIMEOUT)?;

    apply_block(
        &store,
        2,
        vec![
            Operation {
                key: key_a,
                value: 15.into(),
            },
            Operation {
                key: key_b,
                value: Value::empty(),
            },
        ],
    )?;
    wait_for_durable(&store, 2, DEFAULT_TIMEOUT)?;

    let metrics = store.metrics().expect("metrics should be available");
    let snapshot = metrics.snapshot();
    assert_eq!(snapshot.blocks_committed, 2);
    assert_eq!(snapshot.operations_applied, 4);
    assert_eq!(snapshot.set_operations_applied, 3);
    assert_eq!(snapshot.zero_value_deletes_applied, 1);
    assert_eq!(snapshot.current_block_height, 2);
    assert_eq!(snapshot.durable_block_height, 2);
    assert_eq!(snapshot.failed_operations, 0);

    let health = store.health().expect("health should be available");
    assert_eq!(health.state, HealthState::Healthy);

    let journal_dir = harness.data_dir().join("journal");
    if journal_dir.exists() {
        fs::remove_dir_all(&journal_dir)?;
    }

    let failure = apply_block(
        &store,
        3,
        vec![Operation {
            key: key_a,
            value: 99.into(),
        }],
    )
    .expect_err("set should fail when journal directory is missing");

    match failure {
        StoreError::Io(_) | StoreError::DurabilityFailure { .. } => {}
        other => panic!("unexpected error: {other:?}"),
    }

    let health = store
        .health()
        .expect("health should remain available after failure");
    assert_eq!(health.state, HealthState::Unhealthy);

    let failure_snapshot = store.metrics().unwrap().snapshot();
    assert!(
        failure_snapshot.failed_operations >= 1,
        "expected failed operations to be recorded"
    );

    drop(store);
    Ok(())
}

#[test]
fn e2e_error_propagation() -> StoreResult<()> {
    init_tracing();

    let harness = StoreHarness::builder("error-propagation")
        .durability_mode(DurabilityMode::Async {
            max_pending_blocks: 2,
        })
        .initial_capacity(32)
        .build();
    let store = harness.open()?;

    let key: Key = [0xC3u8; Key::BYTES].into();

    apply_block(
        &store,
        1,
        vec![Operation {
            key,
            value: 1.into(),
        }],
    )?;
    wait_for_durable(&store, 1, DEFAULT_TIMEOUT)?;

    let journal_dir = harness.data_dir().join("journal");
    let backup_dir = harness.data_dir().join("journal_backup");
    if backup_dir.exists() {
        fs::remove_dir_all(&backup_dir)?;
    }
    fs::rename(&journal_dir, &backup_dir)?;

    apply_block(
        &store,
        2,
        vec![Operation {
            key,
            value: 2.into(),
        }],
    )?;

    assert_eq!(store.get(key)?, 2);

    let deadline = Instant::now() + DEFAULT_TIMEOUT;
    let failure_err = loop {
        match store.ensure_healthy() {
            Ok(()) => {
                if Instant::now() >= deadline {
                    panic!("timed out waiting for durability failure to surface");
                }
                thread::sleep(Duration::from_millis(5));
            }
            Err(err) => break err,
        }
    };

    match failure_err {
        StoreError::DurabilityFailure { block, .. } => assert_eq!(block, 2),
        other => panic!("unexpected error: {other:?}"),
    }

    fs::rename(&backup_dir, &journal_dir)?;

    drop(store);

    let reopened = harness.reopen()?;
    assert_eq!(reopened.current_block()?, 1);
    assert_eq!(reopened.get(key)?, 1);
    reopened.ensure_healthy()?;
    reopened.close()?;
    Ok(())
}