use ahash::AHashMap;
use clap::{Parser, ValueEnum};
use rand::Rng;
use rustc_hash::FxHashMap;
use std::collections::{BTreeMap, HashMap, VecDeque};
use std::fs::{self, File};
use std::hint::black_box;
use std::io::{BufRead, BufReader};
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};
use tikv_jemalloc_ctl::{epoch, stats};
use nibblerun::Encoder;
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
#[derive(Debug, Clone, ValueEnum)]
enum BenchmarkType {
Naive,
NibblerunHashmap,
NibblerunAhash,
NibblerunFxhash,
NibblerunBtreemap,
NibblerunFrozen,
}
#[derive(Parser)]
#[command(name = "nbl-bench")]
#[command(about = "Benchmark memory usage of different storage methods")]
struct Args {
dir: PathBuf,
#[arg(short, long, default_value = "0")]
max_files: usize,
#[arg(short, long, value_enum)]
benchmark: BenchmarkType,
}
fn get_allocated() -> usize {
epoch::advance().unwrap();
stats::allocated::read().unwrap()
}
fn load_readings(dir: &Path, max_files: usize) -> HashMap<u32, VecDeque<(u32, i8)>> {
let entries: Vec<_> = fs::read_dir(dir)
.expect("Failed to read directory")
.filter_map(Result::ok)
.filter(|e| e.path().extension().is_some_and(|ext| ext == "csv"))
.collect();
let mut result: HashMap<u32, VecDeque<(u32, i8)>> = HashMap::new();
for (idx, entry) in entries.into_iter().enumerate() {
if max_files > 0 && result.len() >= max_files {
break;
}
let path = entry.path();
let device_id = path
.file_stem()
.and_then(|s| s.to_str())
.and_then(|s| u32::from_str_radix(s.trim(), 16).ok())
.unwrap_or(idx as u32);
let file = match File::open(&path) {
Ok(f) => f,
Err(_) => continue,
};
let reader = BufReader::new(file);
let mut readings = VecDeque::new();
for line in reader.lines().map_while(Result::ok) {
let trimmed = line.trim();
if trimmed.is_empty() || trimmed.starts_with("ts") || trimmed.starts_with('#') {
continue;
}
let parts: Vec<&str> = trimmed.split(',').collect();
if parts.len() < 2 {
continue;
}
let ts: u32 = match parts[0].trim().parse() {
Ok(v) => v,
Err(_) => continue,
};
let value: i32 = match parts[1].trim().parse() {
Ok(v) => v,
Err(_) => continue,
};
if value == -1000 {
continue;
}
let value_i8 = value.clamp(-128, 127) as i8;
readings.push_back((ts, value_i8));
}
if !readings.is_empty() {
result.insert(device_id, readings);
}
}
result
}
fn interleave_readings(mut pending: HashMap<u32, VecDeque<(u32, i8)>>) -> Vec<(u32, u32, i8)> {
let mut result = Vec::new();
let mut rng = rand::rng();
let total: usize = pending.values().map(|v| v.len()).sum();
result.reserve(total);
let mut device_ids: Vec<u32> = pending.keys().copied().collect();
while !device_ids.is_empty() {
let idx = rng.random_range(0..device_ids.len());
let device_id = device_ids[idx];
let readings = pending.get_mut(&device_id).unwrap();
let (ts, val) = readings.pop_front().unwrap();
result.push((device_id, ts, val));
if readings.is_empty() {
pending.remove(&device_id);
device_ids.swap_remove(idx);
}
}
result
}
fn run_naive(readings: &[(u32, u32, i8)]) -> HashMap<u32, Vec<(u32, i8)>> {
let mut storage: HashMap<u32, Vec<(u32, i8)>> = HashMap::new();
for &(device_id, ts, val) in readings {
storage.entry(device_id).or_default().push((ts, val));
}
println!(" Stored {} devices", storage.len());
storage
}
fn run_nibblerun_hashmap(readings: &[(u32, u32, i8)]) -> (HashMap<u32, Encoder<i8, 300>>, usize) {
let mut storage: HashMap<u32, Encoder<i8, 300>> = HashMap::new();
for &(device_id, ts, val) in readings {
let encoder = storage.entry(device_id).or_insert_with(Encoder::new);
let _ = encoder.append(ts, val);
}
let total_data: usize = storage.values().map(|e| e.size()).sum();
let avg_data = total_data / storage.len();
println!(" Stored {} devices (HashMap capacity: {})", storage.len(), storage.capacity());
println!(" Compressed data: {:.1} MB total, {} bytes avg per device", total_data as f64 / (1024.0 * 1024.0), avg_data);
(storage, total_data)
}
fn run_nibblerun_ahash(readings: &[(u32, u32, i8)]) -> (AHashMap<u32, Encoder<i8, 300>>, usize) {
let mut storage: AHashMap<u32, Encoder<i8, 300>> = AHashMap::new();
for &(device_id, ts, val) in readings {
let encoder = storage.entry(device_id).or_insert_with(Encoder::new);
let _ = encoder.append(ts, val);
}
let total_data: usize = storage.values().map(|e| e.size()).sum();
let avg_data = total_data / storage.len();
println!(" Stored {} devices (AHashMap capacity: {})", storage.len(), storage.capacity());
println!(" Compressed data: {:.1} MB total, {} bytes avg per device", total_data as f64 / (1024.0 * 1024.0), avg_data);
(storage, total_data)
}
fn run_nibblerun_fxhash(readings: &[(u32, u32, i8)]) -> (FxHashMap<u32, Encoder<i8, 300>>, usize) {
let mut storage: FxHashMap<u32, Encoder<i8, 300>> = FxHashMap::default();
for &(device_id, ts, val) in readings {
let encoder = storage.entry(device_id).or_insert_with(Encoder::new);
let _ = encoder.append(ts, val);
}
let total_data: usize = storage.values().map(|e| e.size()).sum();
let avg_data = total_data / storage.len();
println!(" Stored {} devices (FxHashMap capacity: {})", storage.len(), storage.capacity());
println!(" Compressed data: {:.1} MB total, {} bytes avg per device", total_data as f64 / (1024.0 * 1024.0), avg_data);
(storage, total_data)
}
fn run_nibblerun_btreemap(readings: &[(u32, u32, i8)]) -> (BTreeMap<u32, Encoder<i8, 300>>, usize) {
let mut storage: BTreeMap<u32, Encoder<i8, 300>> = BTreeMap::new();
for &(device_id, ts, val) in readings {
let encoder = storage.entry(device_id).or_insert_with(Encoder::new);
let _ = encoder.append(ts, val);
}
let total_data: usize = storage.values().map(|e| e.size()).sum();
let avg_data = total_data / storage.len();
println!(" Stored {} devices", storage.len());
println!(" Compressed data: {:.1} MB total, {} bytes avg per device", total_data as f64 / (1024.0 * 1024.0), avg_data);
(storage, total_data)
}
fn run_nibblerun_frozen(readings: &[(u32, u32, i8)]) -> (AHashMap<u32, Vec<u8>>, usize) {
let mut temp_storage: AHashMap<u32, Encoder<i8, 300>> = AHashMap::new();
for &(device_id, ts, val) in readings {
let encoder = temp_storage.entry(device_id).or_insert_with(Encoder::new);
let _ = encoder.append(ts, val);
}
let mut storage: AHashMap<u32, Vec<u8>> = AHashMap::new();
for (device_id, encoder) in temp_storage {
let frozen = encoder.freeze();
storage.insert(device_id, frozen);
}
let total_data: usize = storage.values().map(|v| v.len()).sum();
let total_capacity: usize = storage.values().map(|v| v.capacity()).sum();
let avg_data = total_data / storage.len();
let avg_capacity = total_capacity / storage.len();
println!(" Stored {} devices (AHashMap capacity: {})", storage.len(), storage.capacity());
println!(" Frozen data: {:.1} MB total, {} bytes avg per device", total_data as f64 / (1024.0 * 1024.0), avg_data);
println!(" Vec capacity: {:.1} MB total, {} bytes avg per device", total_capacity as f64 / (1024.0 * 1024.0), avg_capacity);
(storage, total_data)
}
fn main() {
let args = Args::parse();
let dir = if args.dir.starts_with("~") {
let home = std::env::var("HOME").expect("HOME not set");
PathBuf::from(home).join(args.dir.strip_prefix("~").unwrap())
} else {
args.dir.clone()
};
println!("Loading readings from {}...", dir.display());
let pending = load_readings(&dir, args.max_files);
let num_devices = pending.len();
let total_readings: usize = pending.values().map(|v| v.len()).sum();
println!("Loaded {} readings from {} devices", total_readings, num_devices);
println!("Interleaving readings...");
let interleaved = interleave_readings(pending);
println!("Interleaved {} readings", interleaved.len());
println!();
let expected_bytes = interleaved.len() * 8;
println!("Expected naive size: {:.1} MB ({} readings * 8 bytes)", expected_bytes as f64 / (1024.0 * 1024.0), interleaved.len());
println!();
let before = get_allocated();
let start = Instant::now();
match args.benchmark {
BenchmarkType::Naive => {
println!("Running naive benchmark...");
let storage = run_naive(&interleaved);
let encode_duration = start.elapsed();
let after = get_allocated();
let allocated = after.saturating_sub(before);
print_results("naive", expected_bytes, allocated, None, interleaved.len(), encode_duration);
black_box(storage);
}
BenchmarkType::NibblerunHashmap => {
println!("Running nibblerun-hashmap benchmark...");
let (storage, data_size) = run_nibblerun_hashmap(&interleaved);
let encode_duration = start.elapsed();
let after = get_allocated();
let allocated = after.saturating_sub(before);
print_results("nibblerun-hashmap", expected_bytes, allocated, Some(data_size), interleaved.len(), encode_duration);
black_box(storage);
}
BenchmarkType::NibblerunAhash => {
println!("Running nibblerun-ahash benchmark...");
let (storage, data_size) = run_nibblerun_ahash(&interleaved);
let encode_duration = start.elapsed();
let after = get_allocated();
let allocated = after.saturating_sub(before);
print_results("nibblerun-ahash", expected_bytes, allocated, Some(data_size), interleaved.len(), encode_duration);
black_box(storage);
}
BenchmarkType::NibblerunFxhash => {
println!("Running nibblerun-fxhash benchmark...");
let (storage, data_size) = run_nibblerun_fxhash(&interleaved);
let encode_duration = start.elapsed();
let after = get_allocated();
let allocated = after.saturating_sub(before);
print_results("nibblerun-fxhash", expected_bytes, allocated, Some(data_size), interleaved.len(), encode_duration);
black_box(storage);
}
BenchmarkType::NibblerunBtreemap => {
println!("Running nibblerun-btreemap benchmark...");
let (storage, data_size) = run_nibblerun_btreemap(&interleaved);
let encode_duration = start.elapsed();
let after = get_allocated();
let allocated = after.saturating_sub(before);
print_results("nibblerun-btreemap", expected_bytes, allocated, Some(data_size), interleaved.len(), encode_duration);
black_box(storage);
}
BenchmarkType::NibblerunFrozen => {
println!("Running nibblerun-frozen benchmark...");
let (storage, data_size) = run_nibblerun_frozen(&interleaved);
let encode_duration = start.elapsed();
let after = get_allocated();
let allocated = after.saturating_sub(before);
print_results("nibblerun-frozen", expected_bytes, allocated, Some(data_size), interleaved.len(), encode_duration);
black_box(storage);
}
};
}
fn print_results(name: &str, expected_bytes: usize, allocated: usize, compressed_data: Option<usize>, total_readings: usize, encode_duration: Duration) {
let ratio = expected_bytes as f64 / allocated as f64;
let encode_rate_m = total_readings as f64 / encode_duration.as_secs_f64() / 1_000_000.0;
let expected_mb = expected_bytes as f64 / (1024.0 * 1024.0);
let allocated_mb = allocated as f64 / (1024.0 * 1024.0);
println!();
if let Some(data_size) = compressed_data {
let overhead = allocated.saturating_sub(data_size);
let data_ratio = expected_bytes as f64 / data_size as f64;
let data_mb = data_size as f64 / (1024.0 * 1024.0);
let overhead_mb = overhead as f64 / (1024.0 * 1024.0);
println!("Benchmark: {} | Expected: {:.1} MB | Allocated: {:.1} MB | Ratio: {:.1}x",
name, expected_mb, allocated_mb, ratio);
println!("Compressed data: {:.1} MB | Overhead: {:.1} MB | Data ratio: {:.1}x",
data_mb, overhead_mb, data_ratio);
} else {
println!("Benchmark: {} | Expected: {:.1} MB | Allocated: {:.1} MB | Ratio: {:.1}x",
name, expected_mb, allocated_mb, ratio);
}
println!("Encode rate: {:.1}M readings/sec ({:.2}s for {} readings)",
encode_rate_m, encode_duration.as_secs_f64(), total_readings);
}