use std::collections::HashSet;
use std::future::Future;
use std::time::Duration;
use clap::Args;
use rand::Rng;
use rand::seq::SliceRandom;
use tokio::time::sleep;
use crate::common::sparse_fill_visualizer::render_sparse_fill_bar;
#[derive(Args, Debug, Clone)]
pub struct SparseMaterializationOptions {
#[arg(long, default_value_t = 262144)]
pub chunk_size: usize,
#[arg(long, default_value_t = 100.0)]
pub fill_percent: f64,
#[arg(long, default_value_t = 0)]
pub sleep_ms: u64,
#[arg(long, default_value_t = 32)]
pub progress_width: usize,
}
impl SparseMaterializationOptions {
pub fn validate(&self) -> std::io::Result<()> {
if self.chunk_size == 0 {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"--chunk-size must be greater than zero",
));
}
if !(self.fill_percent > 0.0 && self.fill_percent <= 100.0) {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"--fill-percent must be in the range (0, 100]",
));
}
if self.progress_width == 0 {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"--progress-width must be greater than zero",
));
}
Ok(())
}
}
pub struct SparseMaterializationConfig<'a> {
pub len: usize,
pub options: &'a SparseMaterializationOptions,
}
pub struct SparseMaterializationResult {
pub logical_offsets: Vec<usize>,
pub filled_offsets: HashSet<usize>,
}
pub struct SparseMaterializationStep<'a> {
pub index: usize,
pub total_steps: usize,
pub requested_offset: usize,
pub normalized_offset: usize,
pub chunk_len: usize,
pub materialized_bytes: usize,
pub progress_percent: f64,
pub len: usize,
pub chunk_size: usize,
pub logical_offsets: &'a [usize],
pub filled_offsets: &'a HashSet<usize>,
}
pub async fn run_sparse_materialization<F, Fut, G>(
config: SparseMaterializationConfig<'_>,
mut materialize_chunk: F,
mut after_chunk: G,
) -> std::io::Result<SparseMaterializationResult>
where
F: FnMut(usize) -> Fut,
Fut: Future<Output = std::io::Result<usize>>,
G: FnMut(SparseMaterializationStep<'_>) -> std::io::Result<()>,
{
config.options.validate()?;
let len = config.len;
let chunk_size = config.options.chunk_size;
let logical_offsets: Vec<usize> = (0..len).step_by(chunk_size).collect();
let mut rng = rand::thread_rng();
let mut randomized_offsets = logical_offsets.clone();
shuffle_offsets(&mut randomized_offsets, &mut rng);
let offsets_all = jittered_offsets(&randomized_offsets, chunk_size, len, &mut rng);
let fill_chunks = ((offsets_all.len() as f64) * (config.options.fill_percent / 100.0)).ceil() as usize;
let fill_chunks = fill_chunks.max(1).min(offsets_all.len());
let offsets: Vec<usize> = offsets_all.into_iter().take(fill_chunks).collect();
println!(
"materializing {:.2}% of {} bytes: {} / {} chunk(s), random order",
config.options.fill_percent,
len,
offsets.len(),
logical_offsets.len()
);
println!("randomized read points (selected): {:?}", offsets);
println!(
"sparse map width={} sleep={}ms between steps",
config.options.progress_width, config.options.sleep_ms
);
let mut materialized_bytes = 0usize;
let mut filled_offsets = HashSet::new();
for (index, offset) in offsets.iter().copied().enumerate() {
let chunk_len = materialize_chunk(offset).await?;
let normalized_offset = offset - (offset % chunk_size);
filled_offsets.insert(normalized_offset);
materialized_bytes += chunk_len;
let progress_step = index + 1;
let progress_percent = (progress_step as f64 * 100.0) / offsets.len() as f64;
let sparse_fill_bar = render_sparse_fill_bar(&filled_offsets, &logical_offsets, config.options.progress_width);
println!(
"filled chunk {} from requested offset {} -> normalized {} ({} bytes)",
index, offset, normalized_offset, chunk_len
);
println!(
"sparse fill map [{}] {:>6.2}% ({}/{})",
sparse_fill_bar,
progress_percent,
filled_offsets.len(),
logical_offsets.len()
);
println!("materialized payload: {} / {} bytes", materialized_bytes, len);
after_chunk(SparseMaterializationStep {
index,
total_steps: offsets.len(),
requested_offset: offset,
normalized_offset,
chunk_len,
materialized_bytes,
progress_percent,
len,
chunk_size,
logical_offsets: &logical_offsets,
filled_offsets: &filled_offsets,
})?;
if config.options.sleep_ms > 0 {
sleep(Duration::from_millis(config.options.sleep_ms)).await;
}
}
Ok(SparseMaterializationResult {
logical_offsets,
filled_offsets,
})
}
fn jittered_offsets<R: Rng + ?Sized>(offsets: &[usize], chunk_size: usize, len: usize, rng: &mut R) -> Vec<usize> {
offsets
.iter()
.map(|offset| {
let remaining = len.saturating_sub(*offset);
let max_jitter = remaining.min(chunk_size).saturating_sub(1);
let jitter = if max_jitter == 0 {
0
} else {
rng.gen_range(0..=max_jitter)
};
offset + jitter
})
.collect()
}
fn shuffle_offsets<R: Rng + ?Sized>(offsets: &mut [usize], rng: &mut R) {
if offsets.len() <= 1 {
return;
}
offsets.shuffle(rng);
}