use std::fs::File;
use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};
use std::path::Path;
pub fn partition<T>(v: Vec<T>, n: usize) -> Vec<Vec<T>> {
let n = n.max(1);
let total = v.len();
let chunk = total / n;
let mut iter = v.into_iter();
(0..n)
.map(|i| {
let take = if i + 1 == n { iter.len() } else { chunk };
iter.by_ref().take(take).collect()
})
.collect()
}
pub fn byte_range_reader(
path: &Path,
index: usize,
peers: usize,
) -> Option<(BufReader<File>, u64)> {
let mut file = match File::open(path) {
Ok(f) => f,
Err(e) => {
eprintln!(
"[flowlog-runtime::io] failed to open {}: {e}",
path.display()
);
return None;
}
};
let file_size = match file.metadata() {
Ok(m) => m.len(),
Err(e) => {
eprintln!(
"[flowlog-runtime::io] failed to stat {}: {e}",
path.display()
);
return None;
}
};
let chunk = file_size / peers as u64;
let start = chunk * index as u64;
let end = if index == peers - 1 {
file_size
} else {
chunk * (index + 1) as u64
};
if start >= end {
return Some((BufReader::new(file), 0));
}
if start == 0 {
return Some((BufReader::new(file), end));
}
if file.seek(SeekFrom::Start(start - 1)).is_err() {
return Some((BufReader::new(file), 0));
}
let mut reader = BufReader::new(file);
let mut peek = [0u8; 1];
if reader.read_exact(&mut peek).is_err() {
return Some((reader, 0));
}
if peek[0] == b'\n' {
return Some((reader, end - start));
}
let mut discard = Vec::new();
let skipped = reader.read_until(b'\n', &mut discard).unwrap_or(0);
Some((reader, (end - start).saturating_sub(skipped as u64)))
}
#[inline]
pub fn shard_int(first: i64, peers: usize, index: usize) -> bool {
first.rem_euclid(peers as i64) as usize == index
}
#[inline]
pub fn shard_str(first: &str, peers: usize, index: usize) -> bool {
let mut hash: u32 = 0x811c9dc5;
for &b in first.as_bytes() {
hash ^= b as u32;
hash = hash.wrapping_mul(0x01000193);
}
(hash as usize) % peers == index
}
#[inline]
pub fn shard_spur(first: lasso::Spur, peers: usize, index: usize) -> bool {
(first.into_inner().get() as usize) % peers == index
}