marble 16.0.2

garbage-collecting on-disk object store
Documentation
use std::env::{self, VarError};
use std::iter::once;
use std::process::{exit, Child, Command, ExitStatus};
use std::thread;
use std::time::Duration;

use rand::Rng;

use marble::{Config, Marble};

mod common;

// test names, also used as dir names
const BATCHES_DIR: &str = "crash_batches";

const TESTS: &[(&str, fn())] = &[(BATCHES_DIR, crash_batches)];

const TEST_ENV_VAR: &str = "SLED_CRASH_TEST";
const N_TESTS: usize = 64;
const BATCH_SIZE: u32 = 13;
const CRASH_CHANCE: u32 = 250;

fn handle_child_wait_err(dir: &str, e: std::io::Error) {
    let _ = std::fs::remove_dir_all(dir);

    panic!("error waiting for {} test child: {}", dir, e);
}

fn handle_child_exit_status(dir: &str, status: ExitStatus) {
    let code = status.code();

    if code.is_none() || code.unwrap() != 9 {
        let _ = std::fs::remove_dir_all(dir);
        panic!("{} test child exited abnormally", dir);
    }
}

fn spawn_killah() {
    thread::spawn(|| {
        let runtime = rand::thread_rng().gen_range(0..600);
        thread::sleep(Duration::from_millis(runtime));
        exit(9);
    });
}

fn run_child_process(test_name: &str) -> Child {
    let bin = env::current_exe().expect("could not get test binary path");

    unsafe {
        env::set_var(TEST_ENV_VAR, test_name);
    }

    Command::new(bin)
        .env(TEST_ENV_VAR, test_name)
        .env("SLED_CRASH_CHANCE", CRASH_CHANCE.to_string())
        .spawn()
        .unwrap_or_else(|_| panic!("could not spawn child process for {} test", test_name))
}

fn crash_batches() {
    let dir = BATCHES_DIR;
    let _ = std::fs::remove_dir_all(dir);

    for _ in 0..N_TESTS {
        let mut child = run_child_process(dir);

        child
            .wait()
            .map(|status| handle_child_exit_status(dir, status))
            .map_err(|e| handle_child_wait_err(dir, e))
            .unwrap();
    }

    let _ = std::fs::remove_dir_all(dir);
}

fn run_crash_batches() {
    let crash_during_initialization = rand::thread_rng().gen_ratio(1, 10);

    if crash_during_initialization {
        spawn_killah();
    }

    let config = Config {
        path: BATCHES_DIR.into(),
        fsync_each_batch: false,
        ..Default::default()
    };

    let m = config.open().unwrap();

    verify_batches(&m);

    let concurrency = 4;
    let mut threads = vec![];
    for i in 0..concurrency {
        let m = m.clone();
        let thread = thread::spawn(move || write_batches_inner(1000 * i as u32, m));
        threads.push(thread);
    }

    if !crash_during_initialization {
        spawn_killah();
    }

    for thread in threads.into_iter() {
        if let Err(e) = thread.join() {
            log::error!("worker thread failed: {:?}", e);
            std::process::exit(15);
        }
    }
}

fn write_batches_inner(start: u32, m: Marble) {
    for i in start.. {
        let mut rng = rand::thread_rng();
        let value = if rng.gen_bool(0.1) {
            None
        } else {
            Some(i.to_le_bytes().to_vec())
        };

        let mut batch = vec![];
        for key in (0..BATCH_SIZE as u64).chain(once(u64::MAX)) {
            batch.push((key, value.clone()));
        }
        m.write_batch(batch).unwrap();
    }
}

/// Verifies that the keys in the tree are correctly
/// recovered (i.e., equal). Panics if they don't match up.
fn verify_batches(m: &Marble) {
    let values: Vec<Option<Box<[u8]>>> = (0..BATCH_SIZE as u64)
        .chain(once(u64::MAX))
        .map(|i| {
            let object_id = i;
            m.read(object_id).unwrap()
        })
        .collect();
    let equal = values.windows(2).all(|w| w[0] == w[1]);
    log::debug!("values: {:?}", values);

    assert!(equal, "values not equal: {:?}", values);
}

fn main() {
    common::setup_logger();

    match env::var(TEST_ENV_VAR) {
        Err(VarError::NotPresent) => {
            let filtered: Vec<(&'static str, fn())> = if let Some(filter) = std::env::args().nth(1)
            {
                TESTS
                    .iter()
                    .filter(|(name, _)| name.contains(&filter))
                    .cloned()
                    .collect()
            } else {
                TESTS.to_vec()
            };

            let filtered_len = filtered.len();

            log::info!(
                "running {} test{}",
                filtered.len(),
                if filtered.len() == 1 { "" } else { "s" },
            );

            let mut tests = vec![];
            for (test_name, test_fn) in filtered.into_iter() {
                let test = thread::spawn(move || {
                    let res = std::panic::catch_unwind(test_fn);
                    log::info!(
                        "test {} ... {}",
                        test_name,
                        if res.is_ok() { "ok" } else { "panicked" }
                    );
                    res.unwrap();
                });
                tests.push(test);
            }

            for test in tests.into_iter() {
                test.join().unwrap();
            }

            log::info!(
                "test result: ok. {} passed; {} filtered out",
                filtered_len,
                TESTS.len() - filtered_len,
            );
        }

        Ok(ref s) if s == BATCHES_DIR => run_crash_batches(),
        Ok(_) | Err(_) => panic!("invalid crash test case"),
    }
}