ekvsb 0.0.4

Benchmark Tool for Embedded Key-Value Stores available in Rust
Documentation
extern crate byte_unit;
#[macro_use]
extern crate clap;
extern crate ekvsb;
extern crate indicatif;
extern crate rand;
extern crate serde;
#[macro_use]
extern crate serde_derive;
extern crate serde_json;
#[macro_use]
extern crate trackable;

use byte_unit::Byte;
use clap::{App, Arg, ArgMatches, SubCommand};
use ekvsb::kvs::KeyValueStore;
use ekvsb::task::{Key, Seconds, Task, TaskResult, ValueSpec};
use ekvsb::workload::{Workload, WorkloadExecutor};
use ekvsb::Result;
use indicatif::ProgressBar;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use std::collections::{BTreeMap, HashMap};
use std::io::{BufReader, BufWriter, Read, Write};
use trackable::error::{ErrorKindExt, Failed};

fn main() -> trackable::result::MainResult {
    let matches = app_from_crate!()
        .subcommand(
            SubCommand::with_name("run")
                .arg(
                    Arg::with_name("MEMORY_LOAD_SIZE")
                        .long("memory-load")
                        .takes_value(true)
                        .default_value("0GiB"),
                ).subcommand(
                    SubCommand::with_name("builtin::fs")
                        .arg(Arg::with_name("DIR").index(1).required(true)),
                ).subcommand(SubCommand::with_name("builtin::hashmap"))
                .subcommand(SubCommand::with_name("builtin::btreemap"))
                .subcommand(
                    SubCommand::with_name("rocksdb")
                        .arg(Arg::with_name("DIR").index(1).required(true)),
                ).subcommand(
                    SubCommand::with_name("sled")
                        .arg(Arg::with_name("DIR").index(1).required(true)),
                ),
        ).subcommand(
            SubCommand::with_name("workload")
                .subcommand(
                    workload_subcommand("PUT").arg(
                        Arg::with_name("VALUE_SIZE")
                            .long("value-size")
                            .takes_value(true)
                            .default_value("1KiB"),
                    ),
                ).subcommand(workload_subcommand("GET"))
                .subcommand(workload_subcommand("DELETE")),
        ).subcommand(SubCommand::with_name("summary"))
        .subcommand(
            SubCommand::with_name("plot-text")
                .arg(
                    Arg::with_name("SAMPLING_RATE")
                        .long("sampling-rate")
                        .takes_value(true)
                        .default_value("1.0"),
                ).arg(Arg::with_name("Y_MAX").long("y-max").takes_value(true)),
        ).subcommand(
            SubCommand::with_name("plot-svg")
                .arg(Arg::with_name("SVG_FILE").index(1).required(true))
                .arg(
                    Arg::with_name("SAMPLING_RATE")
                        .long("sampling-rate")
                        .takes_value(true)
                        .default_value("1.0"),
                ).arg(Arg::with_name("Y_MAX").long("y-max").takes_value(true)),
        ).get_matches();
    if let Some(matches) = matches.subcommand_matches("run") {
        track!(handle_run_subcommand(matches))?;
    } else if let Some(matches) = matches.subcommand_matches("workload") {
        track!(handle_workload_subcommand(matches))?;
    } else if let Some(matches) = matches.subcommand_matches("summary") {
        track!(handle_summary_subcommand(matches))?;
    } else if let Some(matches) = matches.subcommand_matches("plot-text") {
        track!(handle_plot_text_subcommand(matches))?;
    } else if let Some(matches) = matches.subcommand_matches("plot-svg") {
        track!(handle_plot_svg_subcommand(matches))?;
    } else {
        eprintln!("Usage: {}", matches.usage());
        std::process::exit(1);
    }
    Ok(())
}

fn workload_subcommand(name: &'static str) -> App<'static, 'static> {
    SubCommand::with_name(name)
        .arg(
            Arg::with_name("COUNT")
                .long("count")
                .takes_value(true)
                .default_value("1000"),
        ).arg(
            Arg::with_name("POPULATION_SIZE")
                .long("population-size")
                .takes_value(true),
        ).arg(
            Arg::with_name("KEY_SIZE")
                .long("key-size")
                .takes_value(true)
                .default_value("10"),
        ).arg(Arg::with_name("SEED").long("seed").takes_value(true))
        .arg(
            Arg::with_name("SHUFFLE")
                .long("shuffle")
                .takes_value(true)
                .value_name("SHUFFLE_SEED"),
        )
}

fn handle_run_subcommand(matches: &ArgMatches) -> Result<()> {
    let memory_load_size = track!(parse_size(
        matches.value_of("MEMORY_LOAD_SIZE").expect("never fails")
    ))?;
    let _reserved_memory: Vec<u8> = vec![1; memory_load_size];

    let workload: Workload = track_any_err!(
        serde_json::from_reader(stdin()),
        "Malformed input workload JSON"
    )?;

    if let Some(matches) = matches.subcommand_matches("builtin::fs") {
        let dir = matches.value_of("DIR").expect("never fails");
        let kvs = track!(ekvsb::kvs::FileSystemKvs::new(dir))?;
        track!(execute(kvs, workload))?;
    } else if let Some(_matches) = matches.subcommand_matches("builtin::hashmap") {
        let kvs = HashMap::new();
        track!(execute(kvs, workload))?;
    } else if let Some(_matches) = matches.subcommand_matches("builtin::btreemap") {
        let kvs = BTreeMap::new();
        track!(execute(kvs, workload))?;
    } else if let Some(matches) = matches.subcommand_matches("rocksdb") {
        let dir = matches.value_of("DIR").expect("never fails");
        let kvs = track!(ekvsb::kvs::RocksDb::new(dir))?;
        track!(execute(kvs, workload))?;
    } else if let Some(matches) = matches.subcommand_matches("sled") {
        let dir = matches.value_of("DIR").expect("never fails");
        let kvs = track!(ekvsb::kvs::SledTree::new(dir))?;
        track!(execute(kvs, workload))?;
    } else {
        eprintln!("Usage: {}", matches.usage());
        std::process::exit(1);
    }
    Ok(())
}

fn execute<T: KeyValueStore>(kvs: T, workload: Workload) -> Result<()> {
    let pb = ProgressBar::new(workload.len() as u64);
    let executor = WorkloadExecutor::new(kvs, workload);

    println!("[");
    for (i, result) in executor.enumerate() {
        if i != 0 {
            print!(",\n  ");
        } else {
            print!("  ");
        }
        pb.inc(1);
        track_any_err!(serde_json::to_writer(stdout(), &result))?;
    }
    println!("\n]");
    Ok(())
}

fn handle_workload_subcommand(matches: &ArgMatches) -> Result<()> {
    let tasks = if let Some(matches) = matches.subcommand_matches("PUT") {
        let value_size = track!(parse_size(
            matches.value_of("VALUE_SIZE").expect("never fails")
        ))?;
        track!(generate_tasks(matches, |key| Task::Put {
            key,
            value: ValueSpec::Random { size: value_size },
        }))?
    } else if let Some(matches) = matches.subcommand_matches("GET") {
        track!(generate_tasks(matches, |key| Task::Get { key }))?
    } else if let Some(matches) = matches.subcommand_matches("DELETE") {
        track!(generate_tasks(matches, |key| Task::Delete { key }))?
    } else {
        unreachable!();
    };
    track_any_err!(serde_json::to_writer(stdout(), &tasks))?;
    Ok(())
}

fn generate_tasks<F>(matches: &ArgMatches, f: F) -> Result<Vec<Task>>
where
    F: Fn(Key) -> Task,
{
    let count: usize = track_any_err!(matches.value_of("COUNT").expect("never fails").parse())?;
    let key_size = track!(parse_size(
        matches.value_of("KEY_SIZE").expect("never fails")
    ))?;
    let seed = matches.value_of("SEED");
    let mut population_size = count;
    if let Some(size) = matches.value_of("POPULATION_SIZE") {
        population_size = track_any_err!(size.parse())?;
        track_assert!(count <= population_size, Failed; count, population_size);
    }

    let mut rng = if let Some(seed) = seed {
        track_assert!(seed.len() <= 32, Failed; seed.len());
        let mut seed_bytes = [0; 32];
        for (i, b) in seed.bytes().enumerate() {
            seed_bytes[i] = b;
        }
        StdRng::from_seed(seed_bytes)
    } else {
        StdRng::from_seed(rand::thread_rng().gen())
    };

    const CHARS: &[u8; 62] = b"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
    let mut tasks = Vec::new();
    let mut key = vec![0u8; key_size];
    for _ in 0..population_size {
        for b in &mut key {
            *b = *rng.choose(&CHARS[..]).expect("never fails");
        }
        tasks.push(f(track!(Key::from_utf8(key.clone()))?));
    }

    if let Some(seed) = matches.value_of("SHUFFLE") {
        track_assert!(seed.len() <= 32, Failed; seed.len());
        let mut seed_bytes = [0; 32];
        for (i, b) in seed.bytes().enumerate() {
            seed_bytes[i] = b;
        }

        let mut shuffle_rng = StdRng::from_seed(seed_bytes);
        shuffle_rng.shuffle(&mut tasks);
    }

    tasks.truncate(count);

    Ok(tasks)
}

fn handle_summary_subcommand(_matches: &ArgMatches) -> Result<()> {
    let results: Vec<TaskResult> = track_any_err!(
        serde_json::from_reader(stdin()),
        "Malformed run result JSON"
    )?;

    let errors = results.iter().filter(|r| r.error.is_some()).count();
    let oks = results.len() - errors;
    let elapsed = results.iter().map(|r| r.elapsed.as_f64()).sum();
    let ops = results.len() as f64 / elapsed;
    let latency = Latency::new(&results);
    let summary = Summary {
        oks,
        errors,
        elapsed,
        ops,
        latency,
    };
    track_any_err!(serde_json::to_writer_pretty(stdout(), &summary))?;
    println!();

    Ok(())
}

#[derive(Serialize)]
struct Summary {
    oks: usize,
    errors: usize,
    elapsed: f64,
    ops: f64,
    latency: Latency,
}

#[derive(Serialize)]
struct Latency {
    min: Seconds,
    median: Seconds,
    p95: Seconds,
    p99: Seconds,
    max: Seconds,
}
impl Latency {
    fn new(results: &[TaskResult]) -> Self {
        let mut times = results.iter().map(|r| r.elapsed).collect::<Vec<_>>();
        times.sort();
        Latency {
            min: times.iter().min().cloned().unwrap_or_default(),
            median: times.get(times.len() / 2).cloned().unwrap_or_default(),
            p95: times
                .get(times.len() * 95 / 100)
                .cloned()
                .unwrap_or_default(),
            p99: times
                .get(times.len() * 99 / 100)
                .cloned()
                .unwrap_or_default(),
            max: times.iter().max().cloned().unwrap_or_default(),
        }
    }
}

fn handle_plot_text_subcommand(matches: &ArgMatches) -> Result<()> {
    let mut options = ekvsb::plot::PlotOptions::new();
    options.sampling_rate(track_any_err!(
        matches
            .value_of("SAMPLING_RATE")
            .expect("never fails")
            .parse()
    )?);
    if let Some(y_max) = matches.value_of("Y_MAX") {
        options.y_max(track_any_err!(y_max.parse())?);
    }
    let results: Vec<TaskResult> = track_any_err!(
        serde_json::from_reader(stdin()),
        "Malformed run result JSON"
    )?;
    let text = track!(options.plot_text(&results))?;
    println!("{}", text);
    Ok(())
}

fn handle_plot_svg_subcommand(matches: &ArgMatches) -> Result<()> {
    let svg_file = matches.value_of("SVG_FILE").expect("never fails");
    let mut options = ekvsb::plot::PlotOptions::new();
    options.sampling_rate(track_any_err!(
        matches
            .value_of("SAMPLING_RATE")
            .expect("never fails")
            .parse()
    )?);
    if let Some(y_max) = matches.value_of("Y_MAX") {
        options.y_max(track_any_err!(y_max.parse())?);
    }
    let results: Vec<TaskResult> = track_any_err!(
        serde_json::from_reader(stdin()),
        "Malformed run result JSON"
    )?;
    track!(options.plot_svg(&results, svg_file))?;
    println!("SVG FILE: {}", svg_file);
    Ok(())
}

fn parse_size(s: &str) -> Result<usize> {
    let size = Byte::from_string(s)
        .map_err(|e| track!(Failed.cause(format!("Parse Error: {:?} ({:?})", s, e))))?;
    Ok(size.get_bytes() as usize)
}

fn stdin() -> impl Read {
    BufReader::new(std::io::stdin())
}

fn stdout() -> impl Write {
    BufWriter::new(std::io::stdout())
}