rsdb 0.12.1

a flash-sympathetic persistent lock-free B+ tree, pagecache, and log
Documentation
#[macro_use]
extern crate serde_derive;
extern crate docopt;
extern crate chan_signal;
extern crate rand;
extern crate rsdb;

use std::mem;
use std::thread;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;

use chan_signal::Signal;
use docopt::Docopt;
use rand::{Rng, thread_rng};

const USAGE: &'static str = "
Usage: stress2 [--threads=<#>] [--burn-in] [--duration=<s>]

Options:
    --burn-in          Don't halt until we receive a signal.
    --duration=<s>     Seconds to run for [default: 10].
";

#[derive(Deserialize)]
struct Args {
    flag_burn_in: bool,
    flag_duration: u64,
}

fn report(shutdown: Arc<AtomicBool>, total: Arc<AtomicUsize>) {
    let mut last = 0;
    while !shutdown.load(Ordering::Relaxed) {
        thread::sleep(std::time::Duration::from_secs(1));
        let total = total.load(Ordering::Acquire);

        println!("did {} ops", total - last);

        last = total;
    }
}

fn byte() -> Vec<u8> {
    vec![thread_rng().gen::<u8>()]
}

fn do_set(tree: Arc<rsdb::Tree>, shutdown: Arc<AtomicBool>, total: Arc<AtomicUsize>) {
    while !shutdown.load(Ordering::Relaxed) {
        total.fetch_add(1, Ordering::Release);
        tree.set(byte(), byte());
    }
}

fn do_get(tree: Arc<rsdb::Tree>, shutdown: Arc<AtomicBool>, total: Arc<AtomicUsize>) {
    let mut k = 0;
    while !shutdown.load(Ordering::Relaxed) {
        total.fetch_add(1, Ordering::Release);
        tree.get(&*vec![k]);
        k = if k == 255 { 0 } else { k + 1 };
    }
}

fn do_del(tree: Arc<rsdb::Tree>, shutdown: Arc<AtomicBool>, total: Arc<AtomicUsize>) {
    while !shutdown.load(Ordering::Relaxed) {
        total.fetch_add(1, Ordering::Release);
        tree.del(&*byte());
    }
}

fn do_cas(tree: Arc<rsdb::Tree>, shutdown: Arc<AtomicBool>, total: Arc<AtomicUsize>) {
    while !shutdown.load(Ordering::Relaxed) {
        total.fetch_add(1, Ordering::Release);
        if let Err(_) = tree.cas(byte(), Some(byte()), Some(byte())) {};
    }
}

fn do_scan(tree: Arc<rsdb::Tree>, shutdown: Arc<AtomicBool>, total: Arc<AtomicUsize>) {
    while !shutdown.load(Ordering::Relaxed) {
        total.fetch_add(1, Ordering::Release);
        tree.scan(&*byte())
            .take(thread_rng().gen_range(1, 16))
            .collect::<Vec<_>>();
    }
}

fn prepopulate(tree: Arc<rsdb::Tree>, keys: usize) {
    for i in 0..keys {
        let bytes: [u8; 8] = unsafe { mem::transmute(i) };
        let k = bytes.to_vec();
        let v = vec![];
        tree.set(k, v);
    }
}

fn main() {
    let signal = chan_signal::notify(&[Signal::INT, Signal::TERM]);

    let args: Args = Docopt::new(USAGE)
        .and_then(|d| d.argv(std::env::args().into_iter()).deserialize())
        .unwrap_or_else(|e| e.exit());

    let total = Arc::new(AtomicUsize::new(0));
    let shutdown = Arc::new(AtomicBool::new(false));

    let config = rsdb::Config::default()
        .io_bufs(2)
        .io_buf_size(8_000_000)
        .blink_fanout(15)
        .page_consolidation_threshold(10)
        .cache_fixup_threshold(2)
        .cache_bits(6)
        .cache_capacity(128_000_000)
        .flush_every_ms(Some(100))
        .snapshot_after_ops(100000000);

    let tree = Arc::new(config.tree());

    macro_rules! cloned {
        ($f:expr) => {{
            let tree = tree.clone();
            let shutdown = shutdown.clone();
            let total = total.clone();
            thread::spawn(|| $f(tree, shutdown, total))
        }};
    }

    prepopulate(tree.clone(), 256);

    let threads = vec![
        cloned!(|_, shutdown, total| report(shutdown, total)),
        cloned!(|tree, shutdown, total| do_get(tree, shutdown, total)),
        cloned!(|tree, shutdown, total| do_set(tree, shutdown, total)),
        cloned!(|tree, shutdown, total| do_del(tree, shutdown, total)),
        cloned!(|tree, shutdown, total| do_cas(tree, shutdown, total)),
        cloned!(|tree, shutdown, total| do_scan(tree, shutdown, total)),
    ];

    let now = std::time::Instant::now();

    if args.flag_burn_in {
        signal.recv();
        println!("got shutdown signal, cleaning up...");
    } else {
        thread::sleep(std::time::Duration::from_secs(args.flag_duration));
    }

    shutdown.store(true, Ordering::SeqCst);

    for t in threads.into_iter() {
        t.join().unwrap();
    }

    let ops = total.load(Ordering::SeqCst);
    let time = now.elapsed().as_secs() as usize;

    rsdb::M.print_profile();

    println!(
        "did {} total ops in {} seconds. {} ops/s",
        ops,
        time,
        (ops * 1_000) / (time * 1_000)
    );
}