#![cfg(feature = "persistent-artrie")]
use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Barrier};
use std::thread;
use std::time::{Duration, Instant};
use libdictenstein::persistent_artrie_char::SharedCharARTrie;
use libdictenstein::persistent_artrie_core::shared_access::SharedTrieAccess;
use libdictenstein::{ARTrie, Dictionary};
const KEY_COUNT: usize = 5_000;
const OPS_PER_READER: usize = 20_000;
const READER_COUNTS: &[usize] = &[4, 8];
const SAMPLES_PER_ARM: usize = 60; const WARMUP: usize = 3;
const CHECKPOINT_THROTTLE: Duration = Duration::from_millis(5);
const MAX_CHECKPOINT_ROUNDS: usize = 1_000;
const SCRATCH_CEILING_BYTES: u64 = 2 * 1024 * 1024 * 1024;
#[derive(Clone, Copy)]
enum Arm {
Control,
Treatment,
}
impl Arm {
fn label(self) -> &'static str {
match self {
Arm::Control => "control",
Arm::Treatment => "treatment",
}
}
fn checkpoint_round(self, trie: &SharedCharARTrie<i64>, round: usize) {
ARTrie::insert_with_value(trie, &key_for(round % KEY_COUNT), round as i64);
match self {
Arm::Control => {
let mut g = trie.write();
g.checkpoint().expect("control checkpoint");
}
Arm::Treatment => {
ARTrie::checkpoint(trie).expect("treatment checkpoint");
}
}
}
}
fn key_for(i: usize) -> String {
format!("term-{:08}-キー", i)
}
fn scratch_dir() -> std::path::PathBuf {
let d = Path::new(env!("CARGO_MANIFEST_DIR")).join("target/bench-scratch");
std::fs::create_dir_all(&d).expect("create real-disk scratch dir");
d
}
fn dir_size(path: &Path) -> u64 {
let mut total = 0;
if let Ok(entries) = std::fs::read_dir(path) {
for e in entries.flatten() {
let p = e.path();
if p.is_dir() {
total += dir_size(&p);
} else if let Ok(m) = e.metadata() {
total += m.len();
}
}
}
total
}
fn build_trie(dir: &Path) -> SharedCharARTrie<i64> {
let path = dir.join("exp_checkpoint.part");
let trie: SharedCharARTrie<i64> = ARTrie::create(&path).expect("create shared char trie");
for i in 0..KEY_COUNT {
ARTrie::insert_with_value(&trie, &key_for(i), i as i64);
}
ARTrie::checkpoint(&trie).expect("initial checkpoint");
trie
}
fn sample_throughput(trie: &SharedCharARTrie<i64>, n_readers: usize, arm: Arm) -> f64 {
let stop = Arc::new(AtomicBool::new(false));
let barrier = Arc::new(Barrier::new(n_readers + 2));
let mut readers = Vec::with_capacity(n_readers);
for t in 0..n_readers {
let trie = Arc::clone(trie);
let barrier = Arc::clone(&barrier);
readers.push(thread::spawn(move || {
barrier.wait();
let mut hits = 0usize;
for op in 0..OPS_PER_READER {
let idx = op.wrapping_mul(2_654_435_761).wrapping_add(t * 7) % KEY_COUNT;
if Dictionary::contains(&trie, &key_for(idx)) {
hits += 1;
}
}
std::hint::black_box(hits);
}));
}
let checkpointer = {
let trie = Arc::clone(trie);
let barrier = Arc::clone(&barrier);
let stop = Arc::clone(&stop);
thread::spawn(move || {
barrier.wait();
let mut rounds = 0usize;
while !stop.load(Ordering::Relaxed) && rounds < MAX_CHECKPOINT_ROUNDS {
arm.checkpoint_round(&trie, rounds);
rounds += 1;
thread::sleep(CHECKPOINT_THROTTLE);
}
})
};
barrier.wait();
let start = Instant::now();
for r in readers {
r.join().expect("reader thread");
}
let elapsed = start.elapsed();
stop.store(true, Ordering::Relaxed);
checkpointer.join().expect("checkpointer thread");
let ops = (n_readers * OPS_PER_READER) as f64;
ops / elapsed.as_secs_f64() / 1.0e6
}
fn main() {
let dir = scratch_dir();
eprintln!(
"# exp_checkpoint_throughput: KEY_COUNT={KEY_COUNT} OPS_PER_READER={OPS_PER_READER} \
samples/arm={SAMPLES_PER_ARM} warmup={WARMUP} scratch={}",
dir.display()
);
println!("arm reader_count sample throughput_melem_per_s is_warmup");
for &n in READER_COUNTS {
let ctl_dir = tempfile::Builder::new()
.prefix(&format!("exp_control_{n}"))
.tempdir_in(&dir)
.expect("control tempdir");
let trt_dir = tempfile::Builder::new()
.prefix(&format!("exp_treatment_{n}"))
.tempdir_in(&dir)
.expect("treatment tempdir");
let ctl = build_trie(ctl_dir.path());
let trt = build_trie(trt_dir.path());
for s in 0..SAMPLES_PER_ARM {
let used = dir_size(&dir);
if used > SCRATCH_CEILING_BYTES {
eprintln!(
"# ABORT: scratch dir {} exceeded ceiling ({} > {} bytes)",
dir.display(),
used,
SCRATCH_CEILING_BYTES
);
std::process::exit(2);
}
let warm = s < WARMUP;
let c = sample_throughput(&ctl, n, Arm::Control);
println!("control {} {} {:.6} {}", n, s, c, warm);
let t = sample_throughput(&trt, n, Arm::Treatment);
println!("treatment {} {} {:.6} {}", n, s, t, warm);
}
}
eprintln!("# done");
}