use std::fs::{self, File};
use std::io::{BufReader, BufWriter, Read, Write};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
use memmap2::Mmap;
use rayon::prelude::*;
use tracing::{debug, info};
use super::minimizer_tuples::MinimizerTuple;
pub const TUPLE_SIZE_BYTES: usize = 18;
pub const GIB: usize = 1024 * 1024 * 1024;
const READER_BUF_SIZE: usize = 4 * 1024 * 1024;
#[repr(C, packed(2))]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct MinimizerTupleExternal {
pub minimizer: u64,
pub pos_in_seq: u64,
pub pos_in_kmer: u8,
pub num_kmers_in_super_kmer: u8,
}
impl MinimizerTupleExternal {
pub fn from_internal(t: &MinimizerTuple) -> Self {
Self {
minimizer: t.minimizer,
pos_in_seq: t.pos_in_seq,
pos_in_kmer: t.pos_in_kmer,
num_kmers_in_super_kmer: t.num_kmers_in_super_kmer,
}
}
pub fn to_internal(&self) -> MinimizerTuple {
MinimizerTuple {
minimizer: self.minimizer,
pos_in_seq: self.pos_in_seq,
pos_in_kmer: self.pos_in_kmer,
num_kmers_in_super_kmer: self.num_kmers_in_super_kmer,
}
}
#[inline]
pub unsafe fn from_bytes(bytes: *const u8) -> Self {
unsafe { std::ptr::read_unaligned(bytes as *const Self) }
}
pub fn to_bytes(&self) -> [u8; TUPLE_SIZE_BYTES] {
let mut buf = [0u8; TUPLE_SIZE_BYTES];
unsafe {
std::ptr::copy_nonoverlapping(
self as *const Self as *const u8,
buf.as_mut_ptr(),
TUPLE_SIZE_BYTES,
);
}
buf
}
fn read_from<R: Read>(reader: &mut R) -> std::io::Result<Option<Self>> {
let mut buf = [0u8; TUPLE_SIZE_BYTES];
match reader.read_exact(&mut buf) {
Ok(()) => Ok(Some(unsafe { Self::from_bytes(buf.as_ptr()) })),
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => Ok(None),
Err(e) => Err(e),
}
}
}
impl PartialOrd for MinimizerTupleExternal {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for MinimizerTupleExternal {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
let self_min = self.minimizer;
let other_min = other.minimizer;
let self_pos = self.pos_in_seq;
let other_pos = other.pos_in_seq;
match self_min.cmp(&other_min) {
std::cmp::Ordering::Equal => self_pos.cmp(&other_pos),
ord => ord,
}
}
}
pub struct ExternalSorter {
tmp_dir: PathBuf,
run_id: u64,
num_files: AtomicU64,
ram_limit_gib: usize,
num_threads: usize,
verbose: bool,
merged_file_handed_off: std::sync::atomic::AtomicBool,
}
impl ExternalSorter {
pub fn new(tmp_dir: impl AsRef<Path>, ram_limit_gib: usize, num_threads: usize, verbose: bool) -> std::io::Result<Self> {
let tmp_dir = tmp_dir.as_ref().to_path_buf();
fs::create_dir_all(&tmp_dir)?;
let run_id = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos() as u64;
Ok(Self {
tmp_dir,
run_id,
num_files: AtomicU64::new(0),
ram_limit_gib,
num_threads,
verbose,
merged_file_handed_off: std::sync::atomic::AtomicBool::new(false),
})
}
pub fn buffer_size_per_thread(&self) -> usize {
let total_bytes = self.ram_limit_gib * GIB;
let bytes_per_thread = total_bytes / (2 * self.num_threads.max(1));
bytes_per_thread / TUPLE_SIZE_BYTES
}
fn temp_file_path(&self, id: u64) -> PathBuf {
self.tmp_dir.join(format!(
"sshash.tmp.run_{}.minimizers.{}.bin",
self.run_id, id
))
}
fn merged_file_path(&self) -> PathBuf {
self.tmp_dir.join(format!(
"sshash.tmp.run_{}.minimizers.bin",
self.run_id
))
}
pub fn sort_and_flush(&self, buffer: &mut Vec<MinimizerTupleExternal>) -> std::io::Result<u64> {
buffer.par_sort_unstable();
let file_id = self.num_files.fetch_add(1, Ordering::SeqCst);
let path = self.temp_file_path(file_id);
if self.verbose {
debug!("Flushing {} tuples to {:?}", buffer.len(), path);
}
let file = File::create(&path)?;
let mut writer = BufWriter::with_capacity(1024 * 1024, file);
for tuple in buffer.iter() {
writer.write_all(&tuple.to_bytes())?;
}
writer.flush()?;
buffer.clear();
Ok(file_id)
}
pub fn num_files(&self) -> u64 {
self.num_files.load(Ordering::SeqCst)
}
pub fn merge(&self) -> std::io::Result<MergeResult> {
let num_files = self.num_files();
if num_files == 0 {
return Ok(MergeResult::default());
}
if num_files == 1 {
let src = self.temp_file_path(0);
let dst = self.merged_file_path();
fs::rename(&src, &dst)?;
return self.scan_merged_file();
}
info!("Merging {} temp files...", num_files);
let mut merger = FileMergingIterator::new(
(0..num_files).map(|id| self.temp_file_path(id)).collect(),
)?;
let merged_path = self.merged_file_path();
let file = File::create(&merged_path)?;
let mut writer = BufWriter::with_capacity(4 * 1024 * 1024, file);
let mut result = MergeResult::default();
let mut prev_minimizer = u64::MAX;
let mut prev_pos_in_seq = u64::MAX;
while merger.has_next() {
let tuple = merger.current();
if tuple.minimizer != prev_minimizer {
prev_minimizer = tuple.minimizer;
result.num_minimizers += 1;
result.num_positions += 1;
} else if tuple.pos_in_seq != prev_pos_in_seq {
result.num_positions += 1;
}
prev_pos_in_seq = tuple.pos_in_seq;
result.num_super_kmers += 1;
writer.write_all(&tuple.to_bytes())?;
merger.next();
if self.verbose && result.num_super_kmers % 100_000_000 == 0 {
info!("Merged {} tuples...", result.num_super_kmers);
}
}
writer.flush()?;
drop(merger);
for id in 0..num_files {
let _ = fs::remove_file(self.temp_file_path(id));
}
info!(
"Merge complete: {} minimizers, {} positions, {} super-kmers",
result.num_minimizers, result.num_positions, result.num_super_kmers
);
Ok(result)
}
fn scan_merged_file(&self) -> std::io::Result<MergeResult> {
let path = self.merged_file_path();
let file = File::open(&path)?;
let mut reader = BufReader::with_capacity(READER_BUF_SIZE, file);
let mut result = MergeResult::default();
let mut prev_minimizer = u64::MAX;
let mut prev_pos_in_seq = u64::MAX;
while let Some(tuple) = MinimizerTupleExternal::read_from(&mut reader)? {
if tuple.minimizer != prev_minimizer {
prev_minimizer = tuple.minimizer;
result.num_minimizers += 1;
result.num_positions += 1;
} else if tuple.pos_in_seq != prev_pos_in_seq {
result.num_positions += 1;
}
prev_pos_in_seq = tuple.pos_in_seq;
result.num_super_kmers += 1;
}
Ok(result)
}
pub fn read_merged_tuples(&self) -> std::io::Result<Vec<MinimizerTuple>> {
let path = self.merged_file_path();
let file = File::open(&path)?;
let mmap = unsafe { Mmap::map(&file)? };
let num_tuples = mmap.len() / TUPLE_SIZE_BYTES;
let mut tuples = Vec::with_capacity(num_tuples);
for i in 0..num_tuples {
let offset = i * TUPLE_SIZE_BYTES;
let ext = unsafe { MinimizerTupleExternal::from_bytes(mmap.as_ptr().add(offset)) };
tuples.push(ext.to_internal());
}
Ok(tuples)
}
pub fn open_merged_file(&self) -> std::io::Result<FileTuples> {
let path = self.merged_file_path();
let file_len = fs::metadata(&path)?.len() as usize;
let num_tuples = file_len / TUPLE_SIZE_BYTES;
self.merged_file_handed_off.store(true, Ordering::SeqCst);
info!("Opened merged file: {} tuples ({:.2} GB on disk)",
num_tuples, file_len as f64 / GIB as f64);
Ok(FileTuples {
path: path.clone(),
num_tuples,
})
}
pub fn remove_merged_file(&self) -> std::io::Result<()> {
let path = self.merged_file_path();
if path.exists() {
fs::remove_file(path)?;
}
Ok(())
}
}
impl Drop for ExternalSorter {
fn drop(&mut self) {
for id in 0..self.num_files() {
let _ = fs::remove_file(self.temp_file_path(id));
}
if !self.merged_file_handed_off.load(Ordering::SeqCst) {
let _ = fs::remove_file(self.merged_file_path());
}
}
}
pub struct FileTuples {
path: PathBuf,
num_tuples: usize,
}
impl FileTuples {
#[inline]
pub fn num_tuples(&self) -> usize {
self.num_tuples
}
#[inline]
pub fn path(&self) -> &Path {
&self.path
}
pub fn bucket_iter(&self) -> std::io::Result<FileBucketIter> {
let file = File::open(&self.path)?;
let reader = BufReader::with_capacity(READER_BUF_SIZE, file);
Ok(FileBucketIter {
reader,
pos: 0,
num_tuples: self.num_tuples,
lookahead: None,
})
}
pub fn sequential_reader(&self) -> std::io::Result<SequentialTupleReader> {
let file = File::open(&self.path)?;
let reader = BufReader::with_capacity(READER_BUF_SIZE, file);
Ok(SequentialTupleReader { reader })
}
}
impl Drop for FileTuples {
fn drop(&mut self) {
let _ = fs::remove_file(&self.path);
}
}
pub struct SequentialTupleReader {
reader: BufReader<File>,
}
impl SequentialTupleReader {
#[inline]
pub fn read_next(&mut self) -> std::io::Result<Option<MinimizerTupleExternal>> {
MinimizerTupleExternal::read_from(&mut self.reader)
}
}
pub struct FileBucketIter {
reader: BufReader<File>,
pos: usize,
num_tuples: usize,
lookahead: Option<MinimizerTupleExternal>,
}
impl Iterator for FileBucketIter {
type Item = BucketScan;
fn next(&mut self) -> Option<BucketScan> {
if self.pos >= self.num_tuples {
return None;
}
let first = if let Some(la) = self.lookahead.take() {
la
} else {
match MinimizerTupleExternal::read_from(&mut self.reader) {
Ok(Some(t)) => t,
_ => return None,
}
};
let start = self.pos;
let minimizer = first.minimizer;
let mut cached_size = 1usize;
let mut prev_pos_in_seq = first.pos_in_seq;
let mut num_kmers = first.num_kmers_in_super_kmer as u64;
self.pos += 1;
while self.pos < self.num_tuples {
match MinimizerTupleExternal::read_from(&mut self.reader) {
Ok(Some(t)) => {
if t.minimizer != minimizer {
self.lookahead = Some(t);
break;
}
if t.pos_in_seq != prev_pos_in_seq {
cached_size += 1;
prev_pos_in_seq = t.pos_in_seq;
}
num_kmers += t.num_kmers_in_super_kmer as u64;
self.pos += 1;
}
_ => break,
}
}
Some(BucketScan {
minimizer,
cached_size,
start_tuple_idx: start as u64,
num_tuples: (self.pos - start) as u32,
num_kmers,
})
}
}
#[derive(Debug, Clone, Copy)]
pub struct BucketScan {
pub minimizer: u64,
pub cached_size: usize,
pub start_tuple_idx: u64,
pub num_tuples: u32,
pub num_kmers: u64,
}
#[derive(Debug, Default, Clone, Copy)]
pub struct MergeResult {
pub num_minimizers: u64,
pub num_positions: u64,
pub num_super_kmers: u64,
}
struct FileMergingIterator {
readers: Vec<BufReader<File>>,
current_tuples: Vec<Option<MinimizerTupleExternal>>,
min_idx: usize,
num_active: usize,
}
impl FileMergingIterator {
fn new(paths: Vec<PathBuf>) -> std::io::Result<Self> {
let num_files = paths.len();
if num_files == 0 {
return Ok(Self {
readers: Vec::new(),
current_tuples: Vec::new(),
min_idx: 0,
num_active: 0,
});
}
let mut readers = Vec::with_capacity(num_files);
let mut current_tuples = Vec::with_capacity(num_files);
for path in &paths {
let file = File::open(path)?;
let mut reader = BufReader::with_capacity(1024 * 1024, file);
let tuple = MinimizerTupleExternal::read_from(&mut reader)?;
current_tuples.push(tuple);
readers.push(reader);
}
let num_active = current_tuples.iter().filter(|t| t.is_some()).count();
let mut merger = Self {
readers,
current_tuples,
min_idx: 0,
num_active,
};
if num_active > 0 {
merger.compute_min();
}
Ok(merger)
}
fn has_next(&self) -> bool {
self.num_active > 0
}
fn current(&self) -> MinimizerTupleExternal {
debug_assert!(self.num_active > 0);
self.current_tuples[self.min_idx].unwrap()
}
fn next(&mut self) {
if self.num_active == 0 {
return;
}
match MinimizerTupleExternal::read_from(&mut self.readers[self.min_idx]) {
Ok(tuple) => {
if tuple.is_none() {
self.num_active -= 1;
}
self.current_tuples[self.min_idx] = tuple;
}
Err(_) => {
self.current_tuples[self.min_idx] = None;
self.num_active -= 1;
}
}
if self.num_active > 0 {
self.compute_min();
}
}
fn compute_min(&mut self) {
self.min_idx = 0;
let mut min_tuple: Option<MinimizerTupleExternal> = None;
for (i, tuple_opt) in self.current_tuples.iter().enumerate() {
if let Some(tuple) = tuple_opt {
if min_tuple.is_none() || *tuple < min_tuple.unwrap() {
min_tuple = Some(*tuple);
self.min_idx = i;
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_tuple_external_size() {
assert_eq!(std::mem::size_of::<MinimizerTupleExternal>(), TUPLE_SIZE_BYTES);
}
#[test]
fn test_tuple_roundtrip() {
let tuple = MinimizerTupleExternal {
minimizer: 12345,
pos_in_seq: 67890,
pos_in_kmer: 5,
num_kmers_in_super_kmer: 3,
};
let bytes = tuple.to_bytes();
let recovered = unsafe { MinimizerTupleExternal::from_bytes(bytes.as_ptr()) };
assert_eq!(tuple, recovered);
}
#[test]
fn test_external_sorter_basic() {
let tmp_dir = TempDir::new().unwrap();
let sorter = ExternalSorter::new(tmp_dir.path(), 1, 2, false).unwrap();
let buf_size = sorter.buffer_size_per_thread();
assert!(buf_size > 0);
let mut buffer: Vec<MinimizerTupleExternal> = vec![
MinimizerTupleExternal {
minimizer: 100,
pos_in_seq: 10,
pos_in_kmer: 1,
num_kmers_in_super_kmer: 2,
},
MinimizerTupleExternal {
minimizer: 50,
pos_in_seq: 20,
pos_in_kmer: 3,
num_kmers_in_super_kmer: 1,
},
];
sorter.sort_and_flush(&mut buffer).unwrap();
assert!(buffer.is_empty());
assert_eq!(sorter.num_files(), 1);
}
#[test]
fn test_external_sorter_merge() {
let tmp_dir = TempDir::new().unwrap();
let sorter = ExternalSorter::new(tmp_dir.path(), 1, 2, false).unwrap();
let mut buffer1: Vec<MinimizerTupleExternal> = vec![
MinimizerTupleExternal { minimizer: 10, pos_in_seq: 1, pos_in_kmer: 0, num_kmers_in_super_kmer: 1 },
MinimizerTupleExternal { minimizer: 30, pos_in_seq: 3, pos_in_kmer: 0, num_kmers_in_super_kmer: 1 },
];
sorter.sort_and_flush(&mut buffer1).unwrap();
let mut buffer2: Vec<MinimizerTupleExternal> = vec![
MinimizerTupleExternal { minimizer: 20, pos_in_seq: 2, pos_in_kmer: 0, num_kmers_in_super_kmer: 1 },
MinimizerTupleExternal { minimizer: 40, pos_in_seq: 4, pos_in_kmer: 0, num_kmers_in_super_kmer: 1 },
];
sorter.sort_and_flush(&mut buffer2).unwrap();
assert_eq!(sorter.num_files(), 2);
let result = sorter.merge().unwrap();
assert_eq!(result.num_super_kmers, 4);
assert_eq!(result.num_minimizers, 4);
let tuples = sorter.read_merged_tuples().unwrap();
assert_eq!(tuples.len(), 4);
assert_eq!(tuples[0].minimizer, 10);
assert_eq!(tuples[1].minimizer, 20);
assert_eq!(tuples[2].minimizer, 30);
assert_eq!(tuples[3].minimizer, 40);
}
#[test]
fn test_tuple_ordering() {
let t1 = MinimizerTupleExternal { minimizer: 100, pos_in_seq: 50, pos_in_kmer: 0, num_kmers_in_super_kmer: 1 };
let t2 = MinimizerTupleExternal { minimizer: 100, pos_in_seq: 60, pos_in_kmer: 0, num_kmers_in_super_kmer: 1 };
let t3 = MinimizerTupleExternal { minimizer: 200, pos_in_seq: 10, pos_in_kmer: 0, num_kmers_in_super_kmer: 1 };
assert!(t1 < t2); assert!(t1 < t3); assert!(t2 < t3); }
#[test]
fn test_file_tuples_bucket_iter() {
let tmp_dir = TempDir::new().unwrap();
let sorter = ExternalSorter::new(tmp_dir.path(), 1, 2, false).unwrap();
let mut buffer: Vec<MinimizerTupleExternal> = vec![
MinimizerTupleExternal { minimizer: 10, pos_in_seq: 1, pos_in_kmer: 0, num_kmers_in_super_kmer: 3 },
MinimizerTupleExternal { minimizer: 10, pos_in_seq: 2, pos_in_kmer: 0, num_kmers_in_super_kmer: 2 },
MinimizerTupleExternal { minimizer: 20, pos_in_seq: 5, pos_in_kmer: 1, num_kmers_in_super_kmer: 4 },
];
sorter.sort_and_flush(&mut buffer).unwrap();
sorter.merge().unwrap();
let ft = sorter.open_merged_file().unwrap();
let buckets: Vec<BucketScan> = ft.bucket_iter().unwrap().collect();
assert_eq!(buckets.len(), 2);
assert_eq!(buckets[0].minimizer, 10);
assert_eq!(buckets[0].cached_size, 2);
assert_eq!(buckets[0].num_tuples, 2);
assert_eq!(buckets[0].num_kmers, 5); assert_eq!(buckets[1].minimizer, 20);
assert_eq!(buckets[1].cached_size, 1);
assert_eq!(buckets[1].num_tuples, 1);
assert_eq!(buckets[1].num_kmers, 4);
}
#[test]
fn test_sequential_reader() {
let tmp_dir = TempDir::new().unwrap();
let sorter = ExternalSorter::new(tmp_dir.path(), 1, 2, false).unwrap();
let mut buffer: Vec<MinimizerTupleExternal> = vec![
MinimizerTupleExternal { minimizer: 5, pos_in_seq: 10, pos_in_kmer: 0, num_kmers_in_super_kmer: 1 },
MinimizerTupleExternal { minimizer: 15, pos_in_seq: 20, pos_in_kmer: 0, num_kmers_in_super_kmer: 2 },
];
sorter.sort_and_flush(&mut buffer).unwrap();
sorter.merge().unwrap();
let ft = sorter.open_merged_file().unwrap();
let mut reader = ft.sequential_reader().unwrap();
let t1 = reader.read_next().unwrap().unwrap();
let t1_min = t1.minimizer; assert_eq!(t1_min, 5);
let t2 = reader.read_next().unwrap().unwrap();
let t2_min = t2.minimizer;
assert_eq!(t2_min, 15);
assert!(reader.read_next().unwrap().is_none());
}
}