#![allow(clippy::needless_lifetimes)]
extern crate pearl;
#[macro_use]
extern crate log;
mod generator;
mod statistics;
mod writer;
mod prelude {
pub(crate) use super::{
generator::Generator,
statistics::{Report, Statistics},
writer::Writer,
Key128,
};
pub(crate) use clap::{App, Arg, ArgMatches};
pub(crate) use env_logger::fmt::Color;
pub(crate) use futures::{
channel::mpsc::{channel, Sender},
stream::{FuturesUnordered, StreamExt},
};
pub(crate) use log::{Level, LevelFilter};
pub(crate) use pearl::{Builder, Key, Storage};
pub(crate) use rand::{rngs::ThreadRng, RngCore};
pub(crate) use std::{
io::Write,
ops::Add,
path::{Path, PathBuf},
sync::Arc,
time::{Duration, Instant},
};
}
use prelude::*;
#[tokio::main]
async fn main() {
println!("{:_^41}", "PEARL_BENCHMARK");
init_logger();
start_app().await;
}
async fn start_app() {
info!("Hello Async World");
info!("Prepare app matches");
let matches = prepare_matches();
info!("Create new generator");
let limit = matches.value_of("limit").unwrap().parse().unwrap();
let value_size_kb: u64 = matches.value_of("value_size").unwrap().parse().unwrap();
let mut generator = Generator::new(value_size_kb as usize * 1000, limit);
info!("Create new writer");
let mut writer: Writer<Key128> = Writer::new(
&matches
.value_of("dst_dir")
.unwrap()
.parse::<PathBuf>()
.unwrap(),
matches.value_of("max_size").unwrap().parse().unwrap(),
matches.value_of("max_data").unwrap().parse().unwrap(),
matches.is_present("allow_duplicates"),
);
info!("Init writer");
writer.init().await;
info!("Create new statistics");
let mut statistics = Statistics::new(matches.value_of("max_reports").unwrap().parse().unwrap());
info!("Start write cycle");
let (tx, rx) = channel::<statistics::Report>(1024);
let writer = Arc::new(writer);
let mut counter = 0;
let futures_limit: usize = matches.value_of("futures_limit").unwrap().parse().unwrap();
let prepared = (0..futures_limit).map(|_| generator.next().unwrap());
let mut futures_pool: FuturesUnordered<_> = prepared
.into_iter()
.map(|(key, data)| {
let ltx = tx.clone();
counter += 1;
writer.write(key, data, ltx)
})
.collect();
println!(
"{:<10}{:<10}{:<10}{:<10}{:<10}",
"Completed", "Active", "Limit", "Total", "%"
);
let write_limit = limit * 1000 / value_size_kb;
let mut prev_p = 0;
while futures_pool.next().await.is_some() {
debug!("#{}/{} future ready", counter, futures_pool.len());
let percent = counter * 1000 / write_limit;
if prev_p != percent {
print!(
"\r{:<10}{:<10}{:<10}{:<10}{:<10}",
counter,
futures_pool.len(),
futures_limit,
write_limit,
percent / 10
);
if percent % 50 == 0 {
println!();
}
}
prev_p = percent;
if futures_pool.len() < futures_limit {
if let Some((key, data)) = generator.next() {
let ltx = tx.clone();
counter += 1;
futures_pool.push(writer.write(key.into(), data, ltx));
}
}
debug!("#{}/{} next await", counter, futures_pool.len());
}
info!("start await ");
let _ = rx
.take(counter as usize)
.map(|r| statistics.add(r))
.collect::<Vec<_>>()
.await;
info!("end await ");
statistics.display();
writer.close().await;
}
fn prepare_matches() -> ArgMatches {
App::new("benchmark")
.arg(
Arg::new("value_size")
.short('v')
.default_value("90")
.help("KB, by default 90"),
)
.arg(
Arg::new("limit")
.short('l')
.default_value("100")
.help("MB, by default 100"),
)
.arg(
Arg::new("max_reports")
.short('m')
.default_value("0")
.help("0 - unlimited"),
)
.arg(Arg::new("dst_dir").short('d').default_value("/tmp"))
.arg(
Arg::new("max_size")
.short('s')
.default_value("1000")
.help("MB, limit of the blob file size"),
)
.arg(
Arg::new("max_data")
.short('x')
.default_value("1000")
.help("MB, limit of the records number in blob"),
)
.arg(
Arg::new("futures_limit")
.long("futures")
.default_value("10"),
)
.arg(
Arg::new("allow_duplicates")
.short('a')
.help("Disable existence checking on write"),
)
.get_matches()
}
fn init_logger() {
let _ = env_logger::Builder::new()
.format(|buf, record: &log::Record| {
let mut style = buf.style();
let color = match record.level() {
Level::Error => Color::Red,
Level::Warn => Color::Yellow,
Level::Info => Color::Green,
Level::Debug => Color::Cyan,
Level::Trace => Color::White,
};
style.set_color(color);
writeln!(
buf,
"[{}:{:^4} {:^5}] - {}",
record.module_path().unwrap_or(""),
record.line().unwrap_or(0),
style.value(record.level()),
record.args(),
)
})
.filter_module("benchmark", LevelFilter::Info)
.filter_module("pearl", LevelFilter::Info)
.try_init();
}
#[derive(Debug, Default, PartialOrd, Ord, PartialEq, Eq, Clone)]
pub struct Key128(Vec<u8>);
impl Key for Key128 {
const LEN: u16 = 8;
}
impl AsRef<Key128> for Key128 {
fn as_ref(&self) -> &Self {
self
}
}
impl From<Vec<u8>> for Key128 {
fn from(v: Vec<u8>) -> Self {
assert_eq!(Self::LEN as usize, v.len());
Self(v)
}
}
impl AsRef<[u8]> for Key128 {
fn as_ref(&self) -> &[u8] {
self.0.as_ref()
}
}