use std::env;
use std::fs;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::process::ExitCode;
use std::time::{Duration, Instant};
use anyhow::{Context, Result};
use base64::Engine;
use datawal::DataWal;
use serde::Serialize;
const DEFAULT_KEY_LEN: usize = 16;
const DEFAULT_VALUE_LEN: usize = 96;
const DELETE_EVERY: u64 = 23;
const FSYNC_BATCH: u64 = 1;
struct Cli {
work_dir: PathBuf,
oracle: PathBuf,
seed: u64,
ops: u64,
duration: Duration,
key_len: usize,
value_len: usize,
}
fn parse_args() -> Result<Cli> {
let mut work_dir: Option<PathBuf> = None;
let mut oracle: Option<PathBuf> = None;
let mut seed: u64 = 1;
let mut ops: u64 = 0;
let mut duration_s: u64 = 0;
let mut key_len: usize = DEFAULT_KEY_LEN;
let mut value_len: usize = DEFAULT_VALUE_LEN;
let args: Vec<String> = env::args().skip(1).collect();
let mut i = 0;
while i < args.len() {
let arg = &args[i];
let next = || -> Result<&String> {
args.get(i + 1)
.with_context(|| format!("{} requires a value", arg))
};
match arg.as_str() {
"--work-dir" => {
work_dir = Some(PathBuf::from(next()?));
i += 2;
}
"--oracle" => {
oracle = Some(PathBuf::from(next()?));
i += 2;
}
"--seed" => {
seed = next()?.parse().context("parse --seed")?;
i += 2;
}
"--ops" => {
ops = next()?.parse().context("parse --ops")?;
i += 2;
}
"--duration" => {
duration_s = next()?.parse().context("parse --duration")?;
i += 2;
}
"--key-len" => {
key_len = next()?.parse().context("parse --key-len")?;
i += 2;
}
"--value-len" => {
value_len = next()?.parse().context("parse --value-len")?;
i += 2;
}
other => anyhow::bail!("unknown arg `{}`", other),
}
}
let work_dir = work_dir.context("--work-dir is required")?;
let oracle = oracle.context("--oracle is required")?;
if ops == 0 && duration_s == 0 {
anyhow::bail!("at least one of --ops or --duration must be non-zero");
}
if !(1..=512).contains(&key_len) {
anyhow::bail!("--key-len must be in 1..=512 (got {})", key_len);
}
if !(1..=65_536).contains(&value_len) {
anyhow::bail!("--value-len must be in 1..=65536 (got {})", value_len);
}
Ok(Cli {
work_dir,
oracle,
seed,
ops,
duration: Duration::from_secs(duration_s),
key_len,
value_len,
})
}
#[derive(Clone, Copy)]
struct SplitMix64(u64);
impl SplitMix64 {
fn new(seed: u64) -> Self {
Self(seed.wrapping_add(0x9E3779B97F4A7C15))
}
fn next_u64(&mut self) -> u64 {
let mut z = self.0.wrapping_add(0x9E3779B97F4A7C15);
self.0 = z;
z = (z ^ (z >> 30)).wrapping_mul(0xBF58476D1CE4E5B9);
z = (z ^ (z >> 27)).wrapping_mul(0x94D049BB133111EB);
z ^ (z >> 31)
}
fn fill(&mut self, out: &mut [u8]) {
let mut i = 0;
while i < out.len() {
let chunk = self.next_u64().to_le_bytes();
let take = (out.len() - i).min(8);
out[i..i + take].copy_from_slice(&chunk[..take]);
i += take;
}
}
}
fn key_for(seed: u64, op: u64, key_len: usize) -> Vec<u8> {
let key_id = op.wrapping_mul(0x100000001B3) & 0xFFF;
let mut rng = SplitMix64::new(seed ^ key_id);
let mut out = vec![0u8; key_len];
rng.fill(&mut out);
out
}
fn value_for(seed: u64, op: u64, value_len: usize) -> Vec<u8> {
let mut rng = SplitMix64::new(seed ^ op ^ 0xDEADBEEFCAFEBABE);
let mut out = vec![0u8; value_len];
rng.fill(&mut out);
out
}
#[derive(Serialize)]
#[serde(tag = "op")]
enum OracleLine<'a> {
#[serde(rename = "put")]
Put {
seq: u64,
key: &'a str,
payload: &'a str,
},
#[serde(rename = "del")]
Del { seq: u64, key: &'a str },
}
fn b64(b: &[u8]) -> String {
base64::engine::general_purpose::STANDARD.encode(b)
}
fn write_oracle_line(file: &mut fs::File, line: &OracleLine<'_>) -> Result<()> {
let mut buf = serde_json::to_vec(line).context("encode oracle line")?;
buf.push(b'\n');
file.write_all(&buf).context("write oracle line")?;
file.flush().context("flush oracle line")?;
file.sync_data().context("fdatasync oracle file")?;
Ok(())
}
fn ensure_parent_dir(p: &Path) -> Result<()> {
if let Some(parent) = p.parent() {
if !parent.as_os_str().is_empty() {
fs::create_dir_all(parent)
.with_context(|| format!("create parent of {}", p.display()))?;
}
}
Ok(())
}
fn main() -> ExitCode {
let cli = match parse_args() {
Ok(c) => c,
Err(e) => {
eprintln!("power_loss_workload: setup error: {:#}", e);
return ExitCode::from(2);
}
};
match run(cli) {
Ok(()) => ExitCode::SUCCESS,
Err(e) => {
eprintln!("power_loss_workload: failed: {:#}", e);
ExitCode::from(1)
}
}
}
#[allow(clippy::modulo_one)]
fn run(cli: Cli) -> Result<()> {
fs::create_dir_all(&cli.work_dir)
.with_context(|| format!("create work dir {}", cli.work_dir.display()))?;
ensure_parent_dir(&cli.oracle)?;
let mut oracle = fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&cli.oracle)
.with_context(|| format!("open oracle {}", cli.oracle.display()))?;
eprintln!(
"power_loss_workload: work_dir={} oracle={} seed={} ops={} duration={}s key_len={} value_len={}",
cli.work_dir.display(),
cli.oracle.display(),
cli.seed,
cli.ops,
cli.duration.as_secs(),
cli.key_len,
cli.value_len,
);
let mut log = DataWal::open(&cli.work_dir).context("DataWal::open work_dir")?;
let start = Instant::now();
let ops_budget = if cli.ops == 0 { u64::MAX } else { cli.ops };
let duration_budget = cli.duration;
let mut seq: u64 = 0;
let mut puts: u64 = 0;
let mut dels: u64 = 0;
loop {
if seq >= ops_budget {
break;
}
if !duration_budget.is_zero() && start.elapsed() >= duration_budget {
break;
}
let do_delete = seq % DELETE_EVERY == DELETE_EVERY - 1;
let key = key_for(cli.seed, seq, cli.key_len);
if do_delete {
log.delete(&key).context("DataWal::delete")?;
if seq % FSYNC_BATCH == 0 {
log.fsync().context("DataWal::fsync (delete)")?;
let key_b64 = b64(&key);
write_oracle_line(&mut oracle, &OracleLine::Del { seq, key: &key_b64 })?;
dels += 1;
}
} else {
let value = value_for(cli.seed, seq, cli.value_len);
log.put(&key, &value).context("DataWal::put")?;
if seq % FSYNC_BATCH == 0 {
log.fsync().context("DataWal::fsync (put)")?;
let key_b64 = b64(&key);
let payload_b64 = b64(&value);
write_oracle_line(
&mut oracle,
&OracleLine::Put {
seq,
key: &key_b64,
payload: &payload_b64,
},
)?;
puts += 1;
}
}
seq = seq.wrapping_add(1);
}
log.fsync().context("final fsync (wal)")?;
oracle.sync_data().context("final fdatasync (oracle)")?;
eprintln!(
"power_loss_workload: clean exit ops={} puts={} dels={} elapsed={}s",
seq,
puts,
dels,
start.elapsed().as_secs()
);
Ok(())
}