rsdb 0.12.1

a flash-sympathetic persistent lock-free B+ tree, pagecache, and log
Documentation
#[macro_use]
extern crate serde_derive;
extern crate docopt;
extern crate chan_signal;
extern crate rand;
extern crate rsdb;

use std::thread;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;

use chan_signal::Signal;
use docopt::Docopt;
use rand::{Rng, thread_rng};

const USAGE: &'static str = "
Usage: stress [--threads=<#>] [--burn-in] [--duration=<s>]

Options:
    --threads=<#>      Number of threads [default: 4].
    --burn-in          Don't halt until we receive a signal.
    --duration=<s>     Seconds to run for [default: 10].
";

#[derive(Deserialize)]
struct Args {
    flag_threads: usize,
    flag_burn_in: bool,
    flag_duration: u64,
}

fn report(shutdown: Arc<AtomicBool>, total: Arc<AtomicUsize>) {
    let mut last = 0;
    while !shutdown.load(Ordering::Relaxed) {
        thread::sleep(std::time::Duration::from_secs(1));
        let total = total.load(Ordering::Acquire);

        println!("did {} ops", total - last);

        last = total;
    }
}

fn run(tree: Arc<rsdb::Tree>, shutdown: Arc<AtomicBool>, total: Arc<AtomicUsize>) {
    let mut rng = thread_rng();
    let mut byte = || vec![rng.gen::<u8>()];
    let mut rng = thread_rng();

    while !shutdown.load(Ordering::Relaxed) {
        total.fetch_add(1, Ordering::Release);
        let choice = rng.gen_range(0, 5);

        match choice {
            0 => {
                tree.set(byte(), byte());
            }
            1 => {
                tree.get(&*byte());
            }
            2 => {
                tree.del(&*byte());
            }
            3 => {
                if let Err(_) = tree.cas(byte(), Some(byte()), Some(byte())) {};
            }
            4 => {
                tree.scan(&*byte())
                    .take(rng.gen_range(0, 15))
                    .collect::<Vec<_>>();
            }
            _ => panic!("impossible choice"),
        }

    }
}

fn main() {
    let signal = chan_signal::notify(&[Signal::INT, Signal::TERM]);

    let args: Args = Docopt::new(USAGE)
        .and_then(|d| d.argv(std::env::args().into_iter()).deserialize())
        .unwrap_or_else(|e| e.exit());

    let total = Arc::new(AtomicUsize::new(0));
    let shutdown = Arc::new(AtomicBool::new(false));

    let nonce: String = thread_rng().gen_ascii_chars().take(10).collect();
    let path = format!("rsdb_stress_{}", nonce);
    let config = rsdb::Config::default()
        .io_bufs(2)
        .io_buf_size(8_000_000)
        .blink_fanout(32)
        .page_consolidation_threshold(10)
        .cache_bits(6)
        .cache_capacity(1_000_000)
        .flush_every_ms(Some(100))
        .snapshot_after_ops(1000000)
        .path(Some(path));
    let tree = Arc::new(config.tree());

    let mut threads = vec![];

    let now = std::time::Instant::now();

    let n_threads = args.flag_threads;

    for i in 0..n_threads + 1 {
        let tree = tree.clone();
        let shutdown = shutdown.clone();
        let total = total.clone();

        let t = if i == 0 {
            thread::spawn(move || report(shutdown, total))
        } else {
            thread::spawn(move || run(tree, shutdown, total))
        };

        threads.push(t);
    }

    if args.flag_burn_in {
        println!("waiting on signal");
        signal.recv();
        println!("got shutdown signal, cleaning up...");
    } else {
        thread::sleep(std::time::Duration::from_secs(args.flag_duration));
    }

    shutdown.store(true, Ordering::SeqCst);

    for t in threads.into_iter() {
        t.join().unwrap();
    }

    let ops = total.load(Ordering::SeqCst);
    let time = now.elapsed().as_secs() as usize;

    println!(
        "did {} total ops in {} seconds. {} ops/s",
        ops,
        time,
        (ops * 1_000) / (time * 1_000)
    );
}