use std::fs::File;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use clap::Parser;
use rand::rngs::StdRng;
use rand::{RngExt, SeedableRng};
use tempfile::TempDir;
use tokio::time;
use xet_core_structures::merklehash::MerkleHash;
use xet_core_structures::metadata_shard::shard_file_manager::ShardFileManager;
use xet_core_structures::metadata_shard::shard_format::MDBShardInfo;
use xet_core_structures::metadata_shard::shard_format::test_routines::rng_hash;
use xet_core_structures::metadata_shard::shard_in_memory::MDBInMemoryShard;
use xet_core_structures::metadata_shard::xorb_structs::{MDBXorbInfo, XorbChunkSequenceEntry, XorbChunkSequenceHeader};
const XORB_BLOCK_SIZE: usize = 512;
const PAR_TASK: usize = 1;
fn make_shard(size: u64, seed: &mut u64) -> MDBInMemoryShard {
let mut shard = MDBInMemoryShard::default();
while shard.shard_file_size() < size {
let mut xorb_block = Vec::<_>::new();
let mut pos = 0u32;
for _ in 0..XORB_BLOCK_SIZE {
let h = rng_hash(*seed);
let r = (1000 + (&h as &[u64; 4])[0] % 1000) as u32;
xorb_block.push(XorbChunkSequenceEntry::new(rng_hash(*seed), r, pos));
pos += r;
*seed += 1;
}
shard
.add_xorb_block(MDBXorbInfo {
metadata: XorbChunkSequenceHeader::new(rng_hash(!(*seed)), XORB_BLOCK_SIZE, pos),
chunks: xorb_block,
})
.unwrap();
}
shard
}
async fn run_shard_benchmark(
shard_sizes: Vec<(u64, u64)>,
file_contiguity: usize,
contiguity: usize,
block_hit_proportion: f64,
dir: &Path,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut seed = 0u64;
eprintln!("Creating shards.");
for (n_shards, target_size) in shard_sizes {
for i in 0..n_shards {
let shard = make_shard(target_size, &mut seed);
let path = shard.write_to_directory(dir, None)?;
eprintln!(
"-> Target size {target_size:?}: Created shard {:?} / {n_shards:?} with {} XORB blocks and {} chunks",
i + 1,
shard.num_xorb_entries(),
shard.num_xorb_entries() * XORB_BLOCK_SIZE
);
MDBShardInfo::load_from_reader(&mut File::open(path)?)?.print_report();
}
}
eprintln!("Shards created.");
let counter = Arc::new(AtomicUsize::new(0));
let mdb = ShardFileManager::new_in_session_directory(dir, false).await?;
let start_time = Instant::now();
let mut tasks = Vec::new();
for t in 0..PAR_TASK {
let top = seed;
let counter_clone = counter.clone();
let mdb_ref = mdb.clone();
tasks.push(tokio::spawn(async move {
let mut rng = StdRng::seed_from_u64(t as u64);
eprintln!("Worker {t:?} running.");
loop {
let base_hash_val: u64 = rng.random();
let mut file_info = Vec::<MerkleHash>::with_capacity(file_contiguity);
let hit = rng.random_bool(block_hit_proportion);
for i in 0..file_contiguity {
let hash_val = base_hash_val + i as u64;
let h_seed = if hit { hash_val % top } else { hash_val };
file_info.push(rng_hash(h_seed));
}
let mut query_loc = 0;
while query_loc < file_contiguity {
let res = mdb_ref
.chunk_hash_dedup_query(&file_info[query_loc..(query_loc + contiguity).min(file_info.len())])
.await
.unwrap();
query_loc += match res {
Some((i, _)) => i,
None => 1,
};
}
counter_clone.fetch_add(query_loc, Ordering::Relaxed);
}
}));
}
let counter_clone = counter.clone();
let print_task = tokio::spawn({
async move {
loop {
time::sleep(Duration::from_secs(1)).await;
let elapsed_time = start_time.elapsed().as_secs_f64();
let count = counter_clone.load(Ordering::Relaxed);
println!("{count} queries, queries per second: {}", count as f64 / elapsed_time);
}
}
});
#[allow(clippy::never_loop)]
for task in tasks {
task.await?;
}
print_task.await?;
Ok(())
}
fn parse_arg(arg: &str) -> Result<(u64, u64), String> {
let parts: Vec<&str> = arg.split(':').collect();
if parts.len() != 2 {
return Err(format!("Failed to parse argument: {arg}"));
}
let size1 = u64::from_str(parts[0]).map_err(|e| format!("Failed to parse size1: {e:?}"))?;
let size2 = u64::from_str(parts[1]).map_err(|e| format!("Failed to parse size2: {e:?}"))?;
Ok((size1, size2))
}
#[derive(Debug, Parser)]
struct ShardBenchmarkArgs {
#[clap(id = "SIZE", value_parser = parse_arg)]
shard_sizes: Vec<(u64, u64)>,
#[clap(long, default_value = "1")]
contiguity: usize,
#[clap(long, default_value = "50")]
hit_percent: f64,
#[clap(long, default_value = "16")]
file_contiguity: usize,
#[clap(long)]
dir: Option<PathBuf>,
}
#[tokio::main]
async fn main() {
let args = ShardBenchmarkArgs::parse();
let temp_dir = TempDir::with_prefix("git-xet-shard").expect("Failed to create temp dir");
let dir = args.dir.unwrap_or_else(|| temp_dir.path().into());
eprintln!("Using dir {dir:?}");
let dir = std::fs::canonicalize(dir).unwrap();
eprintln!("Using dir {dir:?}");
assert!(dir.exists());
run_shard_benchmark(
args.shard_sizes,
args.contiguity,
args.file_contiguity,
args.hit_percent.clamp(0.0, 100.0) / 100.0,
&dir,
)
.await
.unwrap();
}