1use std::fs::{self, File};
22use std::io::{BufReader, BufWriter, Read, Write};
23use std::path::{Path, PathBuf};
24use std::sync::atomic::{AtomicU64, Ordering};
25use std::time::{SystemTime, UNIX_EPOCH};
26
27use memmap2::Mmap;
28use rayon::prelude::*;
29use tracing::{debug, info};
30
31use super::minimizer_tuples::MinimizerTuple;
32
33pub const TUPLE_SIZE_BYTES: usize = 18;
35
36pub const GIB: usize = 1024 * 1024 * 1024;
38
39const READER_BUF_SIZE: usize = 4 * 1024 * 1024;
41
42#[repr(C, packed(2))]
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub struct MinimizerTupleExternal {
48 pub minimizer: u64,
50 pub pos_in_seq: u64,
52 pub pos_in_kmer: u8,
54 pub num_kmers_in_super_kmer: u8,
56}
57
58impl MinimizerTupleExternal {
59 pub fn from_internal(t: &MinimizerTuple) -> Self {
61 Self {
62 minimizer: t.minimizer,
63 pos_in_seq: t.pos_in_seq,
64 pos_in_kmer: t.pos_in_kmer,
65 num_kmers_in_super_kmer: t.num_kmers_in_super_kmer,
66 }
67 }
68
69 pub fn to_internal(&self) -> MinimizerTuple {
71 MinimizerTuple {
72 minimizer: self.minimizer,
73 pos_in_seq: self.pos_in_seq,
74 pos_in_kmer: self.pos_in_kmer,
75 num_kmers_in_super_kmer: self.num_kmers_in_super_kmer,
76 }
77 }
78
79 #[inline]
84 pub unsafe fn from_bytes(bytes: *const u8) -> Self {
85 unsafe { std::ptr::read_unaligned(bytes as *const Self) }
87 }
88
89 pub fn to_bytes(&self) -> [u8; TUPLE_SIZE_BYTES] {
91 let mut buf = [0u8; TUPLE_SIZE_BYTES];
92 unsafe {
93 std::ptr::copy_nonoverlapping(
94 self as *const Self as *const u8,
95 buf.as_mut_ptr(),
96 TUPLE_SIZE_BYTES,
97 );
98 }
99 buf
100 }
101
102 fn read_from<R: Read>(reader: &mut R) -> std::io::Result<Option<Self>> {
104 let mut buf = [0u8; TUPLE_SIZE_BYTES];
105 match reader.read_exact(&mut buf) {
106 Ok(()) => Ok(Some(unsafe { Self::from_bytes(buf.as_ptr()) })),
107 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => Ok(None),
108 Err(e) => Err(e),
109 }
110 }
111}
112
113impl PartialOrd for MinimizerTupleExternal {
114 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
115 Some(self.cmp(other))
116 }
117}
118
119impl Ord for MinimizerTupleExternal {
120 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
121 let self_min = self.minimizer;
123 let other_min = other.minimizer;
124 let self_pos = self.pos_in_seq;
125 let other_pos = other.pos_in_seq;
126
127 match self_min.cmp(&other_min) {
128 std::cmp::Ordering::Equal => self_pos.cmp(&other_pos),
129 ord => ord,
130 }
131 }
132}
133
134pub struct ExternalSorter {
138 tmp_dir: PathBuf,
140 run_id: u64,
142 num_files: AtomicU64,
144 ram_limit_gib: usize,
146 num_threads: usize,
148 verbose: bool,
150 merged_file_handed_off: std::sync::atomic::AtomicBool,
152}
153
154impl ExternalSorter {
155 pub fn new(tmp_dir: impl AsRef<Path>, ram_limit_gib: usize, num_threads: usize, verbose: bool) -> std::io::Result<Self> {
157 let tmp_dir = tmp_dir.as_ref().to_path_buf();
158
159 fs::create_dir_all(&tmp_dir)?;
161
162 let run_id = SystemTime::now()
164 .duration_since(UNIX_EPOCH)
165 .unwrap()
166 .as_nanos() as u64;
167
168 Ok(Self {
169 tmp_dir,
170 run_id,
171 num_files: AtomicU64::new(0),
172 ram_limit_gib,
173 num_threads,
174 verbose,
175 merged_file_handed_off: std::sync::atomic::AtomicBool::new(false),
176 })
177 }
178
179 pub fn buffer_size_per_thread(&self) -> usize {
184 let total_bytes = self.ram_limit_gib * GIB;
185 let bytes_per_thread = total_bytes / (2 * self.num_threads.max(1));
186 bytes_per_thread / TUPLE_SIZE_BYTES
187 }
188
189 fn temp_file_path(&self, id: u64) -> PathBuf {
191 self.tmp_dir.join(format!(
192 "sshash.tmp.run_{}.minimizers.{}.bin",
193 self.run_id, id
194 ))
195 }
196
197 fn merged_file_path(&self) -> PathBuf {
199 self.tmp_dir.join(format!(
200 "sshash.tmp.run_{}.minimizers.bin",
201 self.run_id
202 ))
203 }
204
205 pub fn sort_and_flush(&self, buffer: &mut Vec<MinimizerTupleExternal>) -> std::io::Result<u64> {
209 buffer.par_sort_unstable();
211
212 let file_id = self.num_files.fetch_add(1, Ordering::SeqCst);
214 let path = self.temp_file_path(file_id);
215
216 if self.verbose {
217 debug!("Flushing {} tuples to {:?}", buffer.len(), path);
218 }
219
220 let file = File::create(&path)?;
222 let mut writer = BufWriter::with_capacity(1024 * 1024, file);
223
224 for tuple in buffer.iter() {
225 writer.write_all(&tuple.to_bytes())?;
226 }
227
228 writer.flush()?;
229 buffer.clear();
230
231 Ok(file_id)
232 }
233
234 pub fn num_files(&self) -> u64 {
236 self.num_files.load(Ordering::SeqCst)
237 }
238
239 pub fn merge(&self) -> std::io::Result<MergeResult> {
243 let num_files = self.num_files();
244
245 if num_files == 0 {
246 return Ok(MergeResult::default());
247 }
248
249 if num_files == 1 {
250 let src = self.temp_file_path(0);
252 let dst = self.merged_file_path();
253 fs::rename(&src, &dst)?;
254 return self.scan_merged_file();
255 }
256
257 info!("Merging {} temp files...", num_files);
259
260 let mut merger = FileMergingIterator::new(
261 (0..num_files).map(|id| self.temp_file_path(id)).collect(),
262 )?;
263
264 let merged_path = self.merged_file_path();
265 let file = File::create(&merged_path)?;
266 let mut writer = BufWriter::with_capacity(4 * 1024 * 1024, file);
267
268 let mut result = MergeResult::default();
269 let mut prev_minimizer = u64::MAX;
270 let mut prev_pos_in_seq = u64::MAX;
271
272 while merger.has_next() {
273 let tuple = merger.current();
274
275 if tuple.minimizer != prev_minimizer {
277 prev_minimizer = tuple.minimizer;
278 result.num_minimizers += 1;
279 result.num_positions += 1;
280 } else if tuple.pos_in_seq != prev_pos_in_seq {
281 result.num_positions += 1;
282 }
283 prev_pos_in_seq = tuple.pos_in_seq;
284 result.num_super_kmers += 1;
285
286 writer.write_all(&tuple.to_bytes())?;
287 merger.next();
288
289 if self.verbose && result.num_super_kmers % 100_000_000 == 0 {
290 info!("Merged {} tuples...", result.num_super_kmers);
291 }
292 }
293
294 writer.flush()?;
295 drop(merger);
296
297 for id in 0..num_files {
299 let _ = fs::remove_file(self.temp_file_path(id));
300 }
301
302 info!(
303 "Merge complete: {} minimizers, {} positions, {} super-kmers",
304 result.num_minimizers, result.num_positions, result.num_super_kmers
305 );
306
307 Ok(result)
308 }
309
310 fn scan_merged_file(&self) -> std::io::Result<MergeResult> {
312 let path = self.merged_file_path();
313 let file = File::open(&path)?;
314 let mut reader = BufReader::with_capacity(READER_BUF_SIZE, file);
315
316 let mut result = MergeResult::default();
317 let mut prev_minimizer = u64::MAX;
318 let mut prev_pos_in_seq = u64::MAX;
319
320 while let Some(tuple) = MinimizerTupleExternal::read_from(&mut reader)? {
321 if tuple.minimizer != prev_minimizer {
322 prev_minimizer = tuple.minimizer;
323 result.num_minimizers += 1;
324 result.num_positions += 1;
325 } else if tuple.pos_in_seq != prev_pos_in_seq {
326 result.num_positions += 1;
327 }
328 prev_pos_in_seq = tuple.pos_in_seq;
329 result.num_super_kmers += 1;
330 }
331
332 Ok(result)
333 }
334
335 pub fn read_merged_tuples(&self) -> std::io::Result<Vec<MinimizerTuple>> {
340 let path = self.merged_file_path();
341 let file = File::open(&path)?;
342 let mmap = unsafe { Mmap::map(&file)? };
343
344 let num_tuples = mmap.len() / TUPLE_SIZE_BYTES;
345 let mut tuples = Vec::with_capacity(num_tuples);
346
347 for i in 0..num_tuples {
348 let offset = i * TUPLE_SIZE_BYTES;
349 let ext = unsafe { MinimizerTupleExternal::from_bytes(mmap.as_ptr().add(offset)) };
350 tuples.push(ext.to_internal());
351 }
352
353 Ok(tuples)
354 }
355
356 pub fn open_merged_file(&self) -> std::io::Result<FileTuples> {
365 let path = self.merged_file_path();
366 let file_len = fs::metadata(&path)?.len() as usize;
367 let num_tuples = file_len / TUPLE_SIZE_BYTES;
368
369 self.merged_file_handed_off.store(true, Ordering::SeqCst);
371
372 info!("Opened merged file: {} tuples ({:.2} GB on disk)",
373 num_tuples, file_len as f64 / GIB as f64);
374
375 Ok(FileTuples {
376 path: path.clone(),
377 num_tuples,
378 })
379 }
380
381 pub fn remove_merged_file(&self) -> std::io::Result<()> {
383 let path = self.merged_file_path();
384 if path.exists() {
385 fs::remove_file(path)?;
386 }
387 Ok(())
388 }
389}
390
391impl Drop for ExternalSorter {
392 fn drop(&mut self) {
393 for id in 0..self.num_files() {
395 let _ = fs::remove_file(self.temp_file_path(id));
396 }
397 if !self.merged_file_handed_off.load(Ordering::SeqCst) {
399 let _ = fs::remove_file(self.merged_file_path());
400 }
401 }
402}
403
404pub struct FileTuples {
415 path: PathBuf,
417 num_tuples: usize,
419}
420
421impl FileTuples {
422 #[inline]
424 pub fn num_tuples(&self) -> usize {
425 self.num_tuples
426 }
427
428 #[inline]
430 pub fn path(&self) -> &Path {
431 &self.path
432 }
433
434 pub fn bucket_iter(&self) -> std::io::Result<FileBucketIter> {
440 let file = File::open(&self.path)?;
441 let reader = BufReader::with_capacity(READER_BUF_SIZE, file);
442 Ok(FileBucketIter {
443 reader,
444 pos: 0,
445 num_tuples: self.num_tuples,
446 lookahead: None,
447 })
448 }
449
450 pub fn sequential_reader(&self) -> std::io::Result<SequentialTupleReader> {
455 let file = File::open(&self.path)?;
456 let reader = BufReader::with_capacity(READER_BUF_SIZE, file);
457 Ok(SequentialTupleReader { reader })
458 }
459}
460
461impl Drop for FileTuples {
462 fn drop(&mut self) {
463 let _ = fs::remove_file(&self.path);
465 }
466}
467
468pub struct SequentialTupleReader {
473 reader: BufReader<File>,
474}
475
476impl SequentialTupleReader {
477 #[inline]
479 pub fn read_next(&mut self) -> std::io::Result<Option<MinimizerTupleExternal>> {
480 MinimizerTupleExternal::read_from(&mut self.reader)
481 }
482}
483
484pub struct FileBucketIter {
490 reader: BufReader<File>,
491 pos: usize,
493 num_tuples: usize,
495 lookahead: Option<MinimizerTupleExternal>,
498}
499
500impl Iterator for FileBucketIter {
501 type Item = BucketScan;
502
503 fn next(&mut self) -> Option<BucketScan> {
504 if self.pos >= self.num_tuples {
505 return None;
506 }
507
508 let first = if let Some(la) = self.lookahead.take() {
510 la
511 } else {
512 match MinimizerTupleExternal::read_from(&mut self.reader) {
513 Ok(Some(t)) => t,
514 _ => return None,
515 }
516 };
517
518 let start = self.pos;
519 let minimizer = first.minimizer;
520 let mut cached_size = 1usize;
521 let mut prev_pos_in_seq = first.pos_in_seq;
522 let mut num_kmers = first.num_kmers_in_super_kmer as u64;
523 self.pos += 1;
524
525 while self.pos < self.num_tuples {
527 match MinimizerTupleExternal::read_from(&mut self.reader) {
528 Ok(Some(t)) => {
529 if t.minimizer != minimizer {
530 self.lookahead = Some(t);
532 break;
533 }
534 if t.pos_in_seq != prev_pos_in_seq {
535 cached_size += 1;
536 prev_pos_in_seq = t.pos_in_seq;
537 }
538 num_kmers += t.num_kmers_in_super_kmer as u64;
539 self.pos += 1;
540 }
541 _ => break,
542 }
543 }
544
545 Some(BucketScan {
546 minimizer,
547 cached_size,
548 start_tuple_idx: start as u64,
549 num_tuples: (self.pos - start) as u32,
550 num_kmers,
551 })
552 }
553}
554
555#[derive(Debug, Clone, Copy)]
557pub struct BucketScan {
558 pub minimizer: u64,
560 pub cached_size: usize,
562 pub start_tuple_idx: u64,
564 pub num_tuples: u32,
566 pub num_kmers: u64,
568}
569
570#[derive(Debug, Default, Clone, Copy)]
572pub struct MergeResult {
573 pub num_minimizers: u64,
575 pub num_positions: u64,
577 pub num_super_kmers: u64,
579}
580
581struct FileMergingIterator {
588 readers: Vec<BufReader<File>>,
590 current_tuples: Vec<Option<MinimizerTupleExternal>>,
592 min_idx: usize,
594 num_active: usize,
596}
597
598impl FileMergingIterator {
599 fn new(paths: Vec<PathBuf>) -> std::io::Result<Self> {
600 let num_files = paths.len();
601 if num_files == 0 {
602 return Ok(Self {
603 readers: Vec::new(),
604 current_tuples: Vec::new(),
605 min_idx: 0,
606 num_active: 0,
607 });
608 }
609
610 let mut readers = Vec::with_capacity(num_files);
611 let mut current_tuples = Vec::with_capacity(num_files);
612
613 for path in &paths {
614 let file = File::open(path)?;
615 let mut reader = BufReader::with_capacity(1024 * 1024, file);
616 let tuple = MinimizerTupleExternal::read_from(&mut reader)?;
617 current_tuples.push(tuple);
618 readers.push(reader);
619 }
620
621 let num_active = current_tuples.iter().filter(|t| t.is_some()).count();
622 let mut merger = Self {
623 readers,
624 current_tuples,
625 min_idx: 0,
626 num_active,
627 };
628
629 if num_active > 0 {
630 merger.compute_min();
631 }
632
633 Ok(merger)
634 }
635
636 fn has_next(&self) -> bool {
637 self.num_active > 0
638 }
639
640 fn current(&self) -> MinimizerTupleExternal {
641 debug_assert!(self.num_active > 0);
642 self.current_tuples[self.min_idx].unwrap()
643 }
644
645 fn next(&mut self) {
646 if self.num_active == 0 {
647 return;
648 }
649
650 match MinimizerTupleExternal::read_from(&mut self.readers[self.min_idx]) {
652 Ok(tuple) => {
653 if tuple.is_none() {
654 self.num_active -= 1;
655 }
656 self.current_tuples[self.min_idx] = tuple;
657 }
658 Err(_) => {
659 self.current_tuples[self.min_idx] = None;
660 self.num_active -= 1;
661 }
662 }
663
664 if self.num_active > 0 {
665 self.compute_min();
666 }
667 }
668
669 fn compute_min(&mut self) {
670 self.min_idx = 0;
671 let mut min_tuple: Option<MinimizerTupleExternal> = None;
672
673 for (i, tuple_opt) in self.current_tuples.iter().enumerate() {
674 if let Some(tuple) = tuple_opt {
675 if min_tuple.is_none() || *tuple < min_tuple.unwrap() {
676 min_tuple = Some(*tuple);
677 self.min_idx = i;
678 }
679 }
680 }
681 }
682}
683
684#[cfg(test)]
685mod tests {
686 use super::*;
687 use tempfile::TempDir;
688
689 #[test]
690 fn test_tuple_external_size() {
691 assert_eq!(std::mem::size_of::<MinimizerTupleExternal>(), TUPLE_SIZE_BYTES);
693 }
694
695 #[test]
696 fn test_tuple_roundtrip() {
697 let tuple = MinimizerTupleExternal {
698 minimizer: 12345,
699 pos_in_seq: 67890,
700 pos_in_kmer: 5,
701 num_kmers_in_super_kmer: 3,
702 };
703
704 let bytes = tuple.to_bytes();
705 let recovered = unsafe { MinimizerTupleExternal::from_bytes(bytes.as_ptr()) };
706
707 assert_eq!(tuple, recovered);
708 }
709
710 #[test]
711 fn test_external_sorter_basic() {
712 let tmp_dir = TempDir::new().unwrap();
713 let sorter = ExternalSorter::new(tmp_dir.path(), 1, 2, false).unwrap();
714
715 let buf_size = sorter.buffer_size_per_thread();
717 assert!(buf_size > 0);
718
719 let mut buffer: Vec<MinimizerTupleExternal> = vec![
721 MinimizerTupleExternal {
722 minimizer: 100,
723 pos_in_seq: 10,
724 pos_in_kmer: 1,
725 num_kmers_in_super_kmer: 2,
726 },
727 MinimizerTupleExternal {
728 minimizer: 50,
729 pos_in_seq: 20,
730 pos_in_kmer: 3,
731 num_kmers_in_super_kmer: 1,
732 },
733 ];
734
735 sorter.sort_and_flush(&mut buffer).unwrap();
736 assert!(buffer.is_empty());
737 assert_eq!(sorter.num_files(), 1);
738 }
739
740 #[test]
741 fn test_external_sorter_merge() {
742 let tmp_dir = TempDir::new().unwrap();
743 let sorter = ExternalSorter::new(tmp_dir.path(), 1, 2, false).unwrap();
744
745 let mut buffer1: Vec<MinimizerTupleExternal> = vec![
747 MinimizerTupleExternal { minimizer: 10, pos_in_seq: 1, pos_in_kmer: 0, num_kmers_in_super_kmer: 1 },
748 MinimizerTupleExternal { minimizer: 30, pos_in_seq: 3, pos_in_kmer: 0, num_kmers_in_super_kmer: 1 },
749 ];
750 sorter.sort_and_flush(&mut buffer1).unwrap();
751
752 let mut buffer2: Vec<MinimizerTupleExternal> = vec![
753 MinimizerTupleExternal { minimizer: 20, pos_in_seq: 2, pos_in_kmer: 0, num_kmers_in_super_kmer: 1 },
754 MinimizerTupleExternal { minimizer: 40, pos_in_seq: 4, pos_in_kmer: 0, num_kmers_in_super_kmer: 1 },
755 ];
756 sorter.sort_and_flush(&mut buffer2).unwrap();
757
758 assert_eq!(sorter.num_files(), 2);
759
760 let result = sorter.merge().unwrap();
762 assert_eq!(result.num_super_kmers, 4);
763 assert_eq!(result.num_minimizers, 4);
764
765 let tuples = sorter.read_merged_tuples().unwrap();
767 assert_eq!(tuples.len(), 4);
768
769 assert_eq!(tuples[0].minimizer, 10);
771 assert_eq!(tuples[1].minimizer, 20);
772 assert_eq!(tuples[2].minimizer, 30);
773 assert_eq!(tuples[3].minimizer, 40);
774 }
775
776 #[test]
777 fn test_tuple_ordering() {
778 let t1 = MinimizerTupleExternal { minimizer: 100, pos_in_seq: 50, pos_in_kmer: 0, num_kmers_in_super_kmer: 1 };
779 let t2 = MinimizerTupleExternal { minimizer: 100, pos_in_seq: 60, pos_in_kmer: 0, num_kmers_in_super_kmer: 1 };
780 let t3 = MinimizerTupleExternal { minimizer: 200, pos_in_seq: 10, pos_in_kmer: 0, num_kmers_in_super_kmer: 1 };
781
782 assert!(t1 < t2); assert!(t1 < t3); assert!(t2 < t3); }
786
787 #[test]
788 fn test_file_tuples_bucket_iter() {
789 let tmp_dir = TempDir::new().unwrap();
790 let sorter = ExternalSorter::new(tmp_dir.path(), 1, 2, false).unwrap();
791
792 let mut buffer: Vec<MinimizerTupleExternal> = vec![
794 MinimizerTupleExternal { minimizer: 10, pos_in_seq: 1, pos_in_kmer: 0, num_kmers_in_super_kmer: 3 },
795 MinimizerTupleExternal { minimizer: 10, pos_in_seq: 2, pos_in_kmer: 0, num_kmers_in_super_kmer: 2 },
796 MinimizerTupleExternal { minimizer: 20, pos_in_seq: 5, pos_in_kmer: 1, num_kmers_in_super_kmer: 4 },
797 ];
798 sorter.sort_and_flush(&mut buffer).unwrap();
799 sorter.merge().unwrap();
800 let ft = sorter.open_merged_file().unwrap();
801
802 let buckets: Vec<BucketScan> = ft.bucket_iter().unwrap().collect();
803 assert_eq!(buckets.len(), 2);
804 assert_eq!(buckets[0].minimizer, 10);
805 assert_eq!(buckets[0].cached_size, 2);
806 assert_eq!(buckets[0].num_tuples, 2);
807 assert_eq!(buckets[0].num_kmers, 5); assert_eq!(buckets[1].minimizer, 20);
809 assert_eq!(buckets[1].cached_size, 1);
810 assert_eq!(buckets[1].num_tuples, 1);
811 assert_eq!(buckets[1].num_kmers, 4);
812 }
813
814 #[test]
815 fn test_sequential_reader() {
816 let tmp_dir = TempDir::new().unwrap();
817 let sorter = ExternalSorter::new(tmp_dir.path(), 1, 2, false).unwrap();
818
819 let mut buffer: Vec<MinimizerTupleExternal> = vec![
820 MinimizerTupleExternal { minimizer: 5, pos_in_seq: 10, pos_in_kmer: 0, num_kmers_in_super_kmer: 1 },
821 MinimizerTupleExternal { minimizer: 15, pos_in_seq: 20, pos_in_kmer: 0, num_kmers_in_super_kmer: 2 },
822 ];
823 sorter.sort_and_flush(&mut buffer).unwrap();
824 sorter.merge().unwrap();
825 let ft = sorter.open_merged_file().unwrap();
826
827 let mut reader = ft.sequential_reader().unwrap();
828 let t1 = reader.read_next().unwrap().unwrap();
829 let t1_min = t1.minimizer; assert_eq!(t1_min, 5);
831 let t2 = reader.read_next().unwrap().unwrap();
832 let t2_min = t2.minimizer;
833 assert_eq!(t2_min, 15);
834 assert!(reader.read_next().unwrap().is_none());
835 }
836}