Skip to main content

sshash_lib/builder/
external_sort.rs

1//! External sorting for minimizer tuples
2//!
3//! This module implements RAM-bounded external sorting for large datasets,
4//! following the C++ sshash implementation precisely:
5//!
6//! 1. Each thread has a RAM-bounded buffer
7//! 2. When buffer fills → parallel sort + flush to temp binary file
8//! 3. After all tuples processed → k-way merge of temp files
9//! 4. Merge uses buffered file I/O (not mmap) to avoid resident page bloat
10//!
11//! ## Memory Management
12//!
13//! Buffer size per thread = `(ram_limit_gib * GiB) / (2 * sizeof(tuple) * num_threads)`
14//!
15//! The factor of 2 accounts for temporary memory during parallel sort.
16//!
17//! After merge, the sorted file is accessed via sequential buffered readers
18//! (`FileTuples`) rather than mmap, keeping RSS proportional to the I/O buffer
19//! size (~4 MB) instead of the file size (~8-10 GB).
20
21use std::fs::{self, File};
22use std::io::{BufReader, BufWriter, Read, Write};
23use std::path::{Path, PathBuf};
24use std::sync::atomic::{AtomicU64, Ordering};
25use std::time::{SystemTime, UNIX_EPOCH};
26
27use memmap2::Mmap;
28use rayon::prelude::*;
29use tracing::{debug, info};
30
31use super::minimizer_tuples::MinimizerTuple;
32
33/// Size of `MinimizerTupleExternal` in bytes (packed, no padding)
34pub const TUPLE_SIZE_BYTES: usize = 18;
35
36/// Bytes per GiB
37pub const GIB: usize = 1024 * 1024 * 1024;
38
39/// Default buffer size for sequential tuple readers (4 MB).
40const READER_BUF_SIZE: usize = 4 * 1024 * 1024;
41
42/// Packed minimizer tuple for disk I/O (matches C++ `#pragma pack(push, 2)`)
43///
44/// Layout: minimizer (8) + pos_in_seq (8) + pos_in_kmer (1) + num_kmers_in_super_kmer (1) = 18 bytes
45#[repr(C, packed(2))]
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub struct MinimizerTupleExternal {
48    /// The minimizer hash value
49    pub minimizer: u64,
50    /// Position in the sequence
51    pub pos_in_seq: u64,
52    /// Position of the minimizer within the k-mer
53    pub pos_in_kmer: u8,
54    /// Number of k-mers in the super-k-mer
55    pub num_kmers_in_super_kmer: u8,
56}
57
58impl MinimizerTupleExternal {
59    /// Convert from internal MinimizerTuple
60    pub fn from_internal(t: &MinimizerTuple) -> Self {
61        Self {
62            minimizer: t.minimizer,
63            pos_in_seq: t.pos_in_seq,
64            pos_in_kmer: t.pos_in_kmer,
65            num_kmers_in_super_kmer: t.num_kmers_in_super_kmer,
66        }
67    }
68
69    /// Convert to internal MinimizerTuple
70    pub fn to_internal(&self) -> MinimizerTuple {
71        MinimizerTuple {
72            minimizer: self.minimizer,
73            pos_in_seq: self.pos_in_seq,
74            pos_in_kmer: self.pos_in_kmer,
75            num_kmers_in_super_kmer: self.num_kmers_in_super_kmer,
76        }
77    }
78
79    /// Read from bytes (unsafe, assumes correct alignment)
80    ///
81    /// # Safety
82    /// Caller must ensure `bytes` points to a valid `MinimizerTupleExternal`
83    #[inline]
84    pub unsafe fn from_bytes(bytes: *const u8) -> Self {
85        // SAFETY: read_unaligned handles packed/unaligned access
86        unsafe { std::ptr::read_unaligned(bytes as *const Self) }
87    }
88
89    /// Write to bytes
90    pub fn to_bytes(&self) -> [u8; TUPLE_SIZE_BYTES] {
91        let mut buf = [0u8; TUPLE_SIZE_BYTES];
92        unsafe {
93            std::ptr::copy_nonoverlapping(
94                self as *const Self as *const u8,
95                buf.as_mut_ptr(),
96                TUPLE_SIZE_BYTES,
97            );
98        }
99        buf
100    }
101
102    /// Read one tuple from a reader. Returns None on EOF.
103    fn read_from<R: Read>(reader: &mut R) -> std::io::Result<Option<Self>> {
104        let mut buf = [0u8; TUPLE_SIZE_BYTES];
105        match reader.read_exact(&mut buf) {
106            Ok(()) => Ok(Some(unsafe { Self::from_bytes(buf.as_ptr()) })),
107            Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => Ok(None),
108            Err(e) => Err(e),
109        }
110    }
111}
112
113impl PartialOrd for MinimizerTupleExternal {
114    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
115        Some(self.cmp(other))
116    }
117}
118
119impl Ord for MinimizerTupleExternal {
120    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
121        // Copy fields to avoid taking references to packed struct fields
122        let self_min = self.minimizer;
123        let other_min = other.minimizer;
124        let self_pos = self.pos_in_seq;
125        let other_pos = other.pos_in_seq;
126
127        match self_min.cmp(&other_min) {
128            std::cmp::Ordering::Equal => self_pos.cmp(&other_pos),
129            ord => ord,
130        }
131    }
132}
133
134/// External sorter for minimizer tuples
135///
136/// Manages RAM-bounded sorting with temp file spillover and k-way merge.
137pub struct ExternalSorter {
138    /// Temp directory for intermediate files
139    tmp_dir: PathBuf,
140    /// Run identifier (timestamp-based for uniqueness)
141    run_id: u64,
142    /// Atomic counter for temp file IDs
143    num_files: AtomicU64,
144    /// RAM limit in GiB
145    ram_limit_gib: usize,
146    /// Number of threads
147    num_threads: usize,
148    /// Verbose logging
149    verbose: bool,
150    /// If true, merged file cleanup is owned by FileTuples (don't delete in Drop)
151    merged_file_handed_off: std::sync::atomic::AtomicBool,
152}
153
154impl ExternalSorter {
155    /// Create a new external sorter
156    pub fn new(tmp_dir: impl AsRef<Path>, ram_limit_gib: usize, num_threads: usize, verbose: bool) -> std::io::Result<Self> {
157        let tmp_dir = tmp_dir.as_ref().to_path_buf();
158
159        // Create temp directory if it doesn't exist
160        fs::create_dir_all(&tmp_dir)?;
161
162        // Generate unique run ID from timestamp
163        let run_id = SystemTime::now()
164            .duration_since(UNIX_EPOCH)
165            .unwrap()
166            .as_nanos() as u64;
167
168        Ok(Self {
169            tmp_dir,
170            run_id,
171            num_files: AtomicU64::new(0),
172            ram_limit_gib,
173            num_threads,
174            verbose,
175            merged_file_handed_off: std::sync::atomic::AtomicBool::new(false),
176        })
177    }
178
179    /// Calculate buffer size per thread in number of tuples
180    ///
181    /// Formula: `(ram_limit * GiB) / (2 * sizeof(tuple) * num_threads)`
182    /// The factor of 2 accounts for temporary memory during parallel sort.
183    pub fn buffer_size_per_thread(&self) -> usize {
184        let total_bytes = self.ram_limit_gib * GIB;
185        let bytes_per_thread = total_bytes / (2 * self.num_threads.max(1));
186        bytes_per_thread / TUPLE_SIZE_BYTES
187    }
188
189    /// Get path for a temp file by ID
190    fn temp_file_path(&self, id: u64) -> PathBuf {
191        self.tmp_dir.join(format!(
192            "sshash.tmp.run_{}.minimizers.{}.bin",
193            self.run_id, id
194        ))
195    }
196
197    /// Get path for the final merged file
198    fn merged_file_path(&self) -> PathBuf {
199        self.tmp_dir.join(format!(
200            "sshash.tmp.run_{}.minimizers.bin",
201            self.run_id
202        ))
203    }
204
205    /// Sort a buffer and flush to a temp file
206    ///
207    /// Returns the file ID. Thread-safe via atomic counter.
208    pub fn sort_and_flush(&self, buffer: &mut Vec<MinimizerTupleExternal>) -> std::io::Result<u64> {
209        // Parallel sort
210        buffer.par_sort_unstable();
211
212        // Get unique file ID
213        let file_id = self.num_files.fetch_add(1, Ordering::SeqCst);
214        let path = self.temp_file_path(file_id);
215
216        if self.verbose {
217            debug!("Flushing {} tuples to {:?}", buffer.len(), path);
218        }
219
220        // Write to binary file
221        let file = File::create(&path)?;
222        let mut writer = BufWriter::with_capacity(1024 * 1024, file);
223
224        for tuple in buffer.iter() {
225            writer.write_all(&tuple.to_bytes())?;
226        }
227
228        writer.flush()?;
229        buffer.clear();
230
231        Ok(file_id)
232    }
233
234    /// Number of temp files created
235    pub fn num_files(&self) -> u64 {
236        self.num_files.load(Ordering::SeqCst)
237    }
238
239    /// Merge all temp files into final sorted output
240    ///
241    /// Returns statistics: (num_minimizers, num_positions, num_super_kmers)
242    pub fn merge(&self) -> std::io::Result<MergeResult> {
243        let num_files = self.num_files();
244
245        if num_files == 0 {
246            return Ok(MergeResult::default());
247        }
248
249        if num_files == 1 {
250            // Just rename the single file
251            let src = self.temp_file_path(0);
252            let dst = self.merged_file_path();
253            fs::rename(&src, &dst)?;
254            return self.scan_merged_file();
255        }
256
257        // Multiple files: k-way merge using buffered I/O
258        info!("Merging {} temp files...", num_files);
259
260        let mut merger = FileMergingIterator::new(
261            (0..num_files).map(|id| self.temp_file_path(id)).collect(),
262        )?;
263
264        let merged_path = self.merged_file_path();
265        let file = File::create(&merged_path)?;
266        let mut writer = BufWriter::with_capacity(4 * 1024 * 1024, file);
267
268        let mut result = MergeResult::default();
269        let mut prev_minimizer = u64::MAX;
270        let mut prev_pos_in_seq = u64::MAX;
271
272        while merger.has_next() {
273            let tuple = merger.current();
274
275            // Track statistics
276            if tuple.minimizer != prev_minimizer {
277                prev_minimizer = tuple.minimizer;
278                result.num_minimizers += 1;
279                result.num_positions += 1;
280            } else if tuple.pos_in_seq != prev_pos_in_seq {
281                result.num_positions += 1;
282            }
283            prev_pos_in_seq = tuple.pos_in_seq;
284            result.num_super_kmers += 1;
285
286            writer.write_all(&tuple.to_bytes())?;
287            merger.next();
288
289            if self.verbose && result.num_super_kmers % 100_000_000 == 0 {
290                info!("Merged {} tuples...", result.num_super_kmers);
291            }
292        }
293
294        writer.flush()?;
295        drop(merger);
296
297        // Remove temp files
298        for id in 0..num_files {
299            let _ = fs::remove_file(self.temp_file_path(id));
300        }
301
302        info!(
303            "Merge complete: {} minimizers, {} positions, {} super-kmers",
304            result.num_minimizers, result.num_positions, result.num_super_kmers
305        );
306
307        Ok(result)
308    }
309
310    /// Scan merged file to compute statistics (for single-file case)
311    fn scan_merged_file(&self) -> std::io::Result<MergeResult> {
312        let path = self.merged_file_path();
313        let file = File::open(&path)?;
314        let mut reader = BufReader::with_capacity(READER_BUF_SIZE, file);
315
316        let mut result = MergeResult::default();
317        let mut prev_minimizer = u64::MAX;
318        let mut prev_pos_in_seq = u64::MAX;
319
320        while let Some(tuple) = MinimizerTupleExternal::read_from(&mut reader)? {
321            if tuple.minimizer != prev_minimizer {
322                prev_minimizer = tuple.minimizer;
323                result.num_minimizers += 1;
324                result.num_positions += 1;
325            } else if tuple.pos_in_seq != prev_pos_in_seq {
326                result.num_positions += 1;
327            }
328            prev_pos_in_seq = tuple.pos_in_seq;
329            result.num_super_kmers += 1;
330        }
331
332        Ok(result)
333    }
334
335    /// Read merged tuples into memory as internal MinimizerTuples
336    ///
337    /// Call this after `merge()` to get the final sorted tuples.
338    /// For large datasets, prefer [`open_merged_file`] to avoid full materialization.
339    pub fn read_merged_tuples(&self) -> std::io::Result<Vec<MinimizerTuple>> {
340        let path = self.merged_file_path();
341        let file = File::open(&path)?;
342        let mmap = unsafe { Mmap::map(&file)? };
343
344        let num_tuples = mmap.len() / TUPLE_SIZE_BYTES;
345        let mut tuples = Vec::with_capacity(num_tuples);
346
347        for i in 0..num_tuples {
348            let offset = i * TUPLE_SIZE_BYTES;
349            let ext = unsafe { MinimizerTupleExternal::from_bytes(mmap.as_ptr().add(offset)) };
350            tuples.push(ext.to_internal());
351        }
352
353        Ok(tuples)
354    }
355
356    /// Open the merged file for sequential buffered access.
357    ///
358    /// Returns a [`FileTuples`] handle that provides sequential access via
359    /// buffered readers. Each scan pass opens a fresh `BufReader`, keeping
360    /// RSS proportional to the buffer size (~4 MB) instead of the file size.
361    ///
362    /// **Important**: The returned `FileTuples` takes ownership of cleanup
363    /// responsibility for the merged file.
364    pub fn open_merged_file(&self) -> std::io::Result<FileTuples> {
365        let path = self.merged_file_path();
366        let file_len = fs::metadata(&path)?.len() as usize;
367        let num_tuples = file_len / TUPLE_SIZE_BYTES;
368
369        // Mark that cleanup responsibility has been transferred to FileTuples
370        self.merged_file_handed_off.store(true, Ordering::SeqCst);
371
372        info!("Opened merged file: {} tuples ({:.2} GB on disk)",
373            num_tuples, file_len as f64 / GIB as f64);
374
375        Ok(FileTuples {
376            path: path.clone(),
377            num_tuples,
378        })
379    }
380
381    /// Remove the merged file (cleanup)
382    pub fn remove_merged_file(&self) -> std::io::Result<()> {
383        let path = self.merged_file_path();
384        if path.exists() {
385            fs::remove_file(path)?;
386        }
387        Ok(())
388    }
389}
390
391impl Drop for ExternalSorter {
392    fn drop(&mut self) {
393        // Clean up any remaining temp files
394        for id in 0..self.num_files() {
395            let _ = fs::remove_file(self.temp_file_path(id));
396        }
397        // Only clean up merged file if it wasn't handed off to FileTuples
398        if !self.merged_file_handed_off.load(Ordering::SeqCst) {
399            let _ = fs::remove_file(self.merged_file_path());
400        }
401    }
402}
403
404// ---------------------------------------------------------------------------
405// Sequential buffered tuple access (no mmap)
406// ---------------------------------------------------------------------------
407
408/// Handle to the merged external sort file for sequential buffered access.
409///
410/// Unlike the previous `MmapTuples`, this does NOT mmap the file. Instead,
411/// each access pass opens a fresh `BufReader`, reading tuples sequentially
412/// with a ~4 MB buffer. This keeps RSS proportional to the buffer size
413/// instead of the file size (~8-10 GB for large datasets).
414pub struct FileTuples {
415    /// Path to the merged file
416    path: PathBuf,
417    /// Total number of tuples
418    num_tuples: usize,
419}
420
421impl FileTuples {
422    /// Total number of tuples in the merged file.
423    #[inline]
424    pub fn num_tuples(&self) -> usize {
425        self.num_tuples
426    }
427
428    /// Path to the underlying merged file (for deriving temp file names).
429    #[inline]
430    pub fn path(&self) -> &Path {
431        &self.path
432    }
433
434    /// Create an iterator that yields one [`BucketScan`] per unique minimizer,
435    /// scanning the file sequentially via buffered I/O.
436    ///
437    /// Each call opens a fresh `BufReader` — no state is retained between
438    /// passes, and no mmap pages linger in memory.
439    pub fn bucket_iter(&self) -> std::io::Result<FileBucketIter> {
440        let file = File::open(&self.path)?;
441        let reader = BufReader::with_capacity(READER_BUF_SIZE, file);
442        Ok(FileBucketIter {
443            reader,
444            pos: 0,
445            num_tuples: self.num_tuples,
446            lookahead: None,
447        })
448    }
449
450    /// Create a sequential tuple reader for forward-only access.
451    ///
452    /// Returns tuples one at a time via `read_next()`. Used by the
453    /// sparse/skew index builder for its sequential pass over all tuples.
454    pub fn sequential_reader(&self) -> std::io::Result<SequentialTupleReader> {
455        let file = File::open(&self.path)?;
456        let reader = BufReader::with_capacity(READER_BUF_SIZE, file);
457        Ok(SequentialTupleReader { reader })
458    }
459}
460
461impl Drop for FileTuples {
462    fn drop(&mut self) {
463        // Clean up the merged file when done
464        let _ = fs::remove_file(&self.path);
465    }
466}
467
468/// Sequential reader for tuples from the merged file.
469///
470/// Reads tuples one at a time via `read_next()`. All access is
471/// strictly forward — no seeking or random access.
472pub struct SequentialTupleReader {
473    reader: BufReader<File>,
474}
475
476impl SequentialTupleReader {
477    /// Read the next tuple, or None on EOF.
478    #[inline]
479    pub fn read_next(&mut self) -> std::io::Result<Option<MinimizerTupleExternal>> {
480        MinimizerTupleExternal::read_from(&mut self.reader)
481    }
482}
483
484/// Sequential iterator over buckets in the merged file via buffered I/O.
485///
486/// Each call to `next()` returns a [`BucketScan`] describing one bucket
487/// (a contiguous group of tuples sharing the same minimizer value).
488/// Owns a `BufReader` — no mmap pages are kept resident.
489pub struct FileBucketIter {
490    reader: BufReader<File>,
491    /// Current tuple position index
492    pos: usize,
493    /// Total tuples in file
494    num_tuples: usize,
495    /// Lookahead tuple (the first tuple of the NEXT bucket, read while
496    /// scanning the current bucket's boundary)
497    lookahead: Option<MinimizerTupleExternal>,
498}
499
500impl Iterator for FileBucketIter {
501    type Item = BucketScan;
502
503    fn next(&mut self) -> Option<BucketScan> {
504        if self.pos >= self.num_tuples {
505            return None;
506        }
507
508        // Get the first tuple of this bucket — either from lookahead or by reading
509        let first = if let Some(la) = self.lookahead.take() {
510            la
511        } else {
512            match MinimizerTupleExternal::read_from(&mut self.reader) {
513                Ok(Some(t)) => t,
514                _ => return None,
515            }
516        };
517
518        let start = self.pos;
519        let minimizer = first.minimizer;
520        let mut cached_size = 1usize;
521        let mut prev_pos_in_seq = first.pos_in_seq;
522        let mut num_kmers = first.num_kmers_in_super_kmer as u64;
523        self.pos += 1;
524
525        // Read tuples until we hit a different minimizer
526        while self.pos < self.num_tuples {
527            match MinimizerTupleExternal::read_from(&mut self.reader) {
528                Ok(Some(t)) => {
529                    if t.minimizer != minimizer {
530                        // This tuple belongs to the next bucket — save as lookahead
531                        self.lookahead = Some(t);
532                        break;
533                    }
534                    if t.pos_in_seq != prev_pos_in_seq {
535                        cached_size += 1;
536                        prev_pos_in_seq = t.pos_in_seq;
537                    }
538                    num_kmers += t.num_kmers_in_super_kmer as u64;
539                    self.pos += 1;
540                }
541                _ => break,
542            }
543        }
544
545        Some(BucketScan {
546            minimizer,
547            cached_size,
548            start_tuple_idx: start as u64,
549            num_tuples: (self.pos - start) as u32,
550            num_kmers,
551        })
552    }
553}
554
555/// Metadata about a single bucket, gathered during a sequential scan.
556#[derive(Debug, Clone, Copy)]
557pub struct BucketScan {
558    /// The minimizer value for this bucket.
559    pub minimizer: u64,
560    /// Number of unique super-kmers (distinct `pos_in_seq` values).
561    pub cached_size: usize,
562    /// Index of the first tuple in the file.
563    pub start_tuple_idx: u64,
564    /// Number of tuples in this bucket.
565    pub num_tuples: u32,
566    /// Total k-mers across all tuples in this bucket.
567    pub num_kmers: u64,
568}
569
570/// Result of merge operation
571#[derive(Debug, Default, Clone, Copy)]
572pub struct MergeResult {
573    /// Number of distinct minimizers
574    pub num_minimizers: u64,
575    /// Total number of positions
576    pub num_positions: u64,
577    /// Number of super-k-mers
578    pub num_super_kmers: u64,
579}
580
581/// K-way merge iterator using buffered file I/O.
582///
583/// Uses buffered readers instead of mmap to avoid keeping all temp file pages
584/// resident in memory. Each file gets a ~1 MB read buffer.
585///
586/// Memory footprint: ~1 MB per file (buffer) + 18 bytes per file (current tuple).
587struct FileMergingIterator {
588    /// Buffered readers for each input file
589    readers: Vec<BufReader<File>>,
590    /// Current tuple from each file (None if exhausted)
591    current_tuples: Vec<Option<MinimizerTupleExternal>>,
592    /// Current minimum index
593    min_idx: usize,
594    /// Number of active files
595    num_active: usize,
596}
597
598impl FileMergingIterator {
599    fn new(paths: Vec<PathBuf>) -> std::io::Result<Self> {
600        let num_files = paths.len();
601        if num_files == 0 {
602            return Ok(Self {
603                readers: Vec::new(),
604                current_tuples: Vec::new(),
605                min_idx: 0,
606                num_active: 0,
607            });
608        }
609
610        let mut readers = Vec::with_capacity(num_files);
611        let mut current_tuples = Vec::with_capacity(num_files);
612
613        for path in &paths {
614            let file = File::open(path)?;
615            let mut reader = BufReader::with_capacity(1024 * 1024, file);
616            let tuple = MinimizerTupleExternal::read_from(&mut reader)?;
617            current_tuples.push(tuple);
618            readers.push(reader);
619        }
620
621        let num_active = current_tuples.iter().filter(|t| t.is_some()).count();
622        let mut merger = Self {
623            readers,
624            current_tuples,
625            min_idx: 0,
626            num_active,
627        };
628
629        if num_active > 0 {
630            merger.compute_min();
631        }
632
633        Ok(merger)
634    }
635
636    fn has_next(&self) -> bool {
637        self.num_active > 0
638    }
639
640    fn current(&self) -> MinimizerTupleExternal {
641        debug_assert!(self.num_active > 0);
642        self.current_tuples[self.min_idx].unwrap()
643    }
644
645    fn next(&mut self) {
646        if self.num_active == 0 {
647            return;
648        }
649
650        // Advance the min file
651        match MinimizerTupleExternal::read_from(&mut self.readers[self.min_idx]) {
652            Ok(tuple) => {
653                if tuple.is_none() {
654                    self.num_active -= 1;
655                }
656                self.current_tuples[self.min_idx] = tuple;
657            }
658            Err(_) => {
659                self.current_tuples[self.min_idx] = None;
660                self.num_active -= 1;
661            }
662        }
663
664        if self.num_active > 0 {
665            self.compute_min();
666        }
667    }
668
669    fn compute_min(&mut self) {
670        self.min_idx = 0;
671        let mut min_tuple: Option<MinimizerTupleExternal> = None;
672
673        for (i, tuple_opt) in self.current_tuples.iter().enumerate() {
674            if let Some(tuple) = tuple_opt {
675                if min_tuple.is_none() || *tuple < min_tuple.unwrap() {
676                    min_tuple = Some(*tuple);
677                    self.min_idx = i;
678                }
679            }
680        }
681    }
682}
683
684#[cfg(test)]
685mod tests {
686    use super::*;
687    use tempfile::TempDir;
688
689    #[test]
690    fn test_tuple_external_size() {
691        // Verify packed size
692        assert_eq!(std::mem::size_of::<MinimizerTupleExternal>(), TUPLE_SIZE_BYTES);
693    }
694
695    #[test]
696    fn test_tuple_roundtrip() {
697        let tuple = MinimizerTupleExternal {
698            minimizer: 12345,
699            pos_in_seq: 67890,
700            pos_in_kmer: 5,
701            num_kmers_in_super_kmer: 3,
702        };
703
704        let bytes = tuple.to_bytes();
705        let recovered = unsafe { MinimizerTupleExternal::from_bytes(bytes.as_ptr()) };
706
707        assert_eq!(tuple, recovered);
708    }
709
710    #[test]
711    fn test_external_sorter_basic() {
712        let tmp_dir = TempDir::new().unwrap();
713        let sorter = ExternalSorter::new(tmp_dir.path(), 1, 2, false).unwrap();
714
715        // Buffer size should be reasonable
716        let buf_size = sorter.buffer_size_per_thread();
717        assert!(buf_size > 0);
718
719        // Test sort_and_flush
720        let mut buffer: Vec<MinimizerTupleExternal> = vec![
721            MinimizerTupleExternal {
722                minimizer: 100,
723                pos_in_seq: 10,
724                pos_in_kmer: 1,
725                num_kmers_in_super_kmer: 2,
726            },
727            MinimizerTupleExternal {
728                minimizer: 50,
729                pos_in_seq: 20,
730                pos_in_kmer: 3,
731                num_kmers_in_super_kmer: 1,
732            },
733        ];
734
735        sorter.sort_and_flush(&mut buffer).unwrap();
736        assert!(buffer.is_empty());
737        assert_eq!(sorter.num_files(), 1);
738    }
739
740    #[test]
741    fn test_external_sorter_merge() {
742        let tmp_dir = TempDir::new().unwrap();
743        let sorter = ExternalSorter::new(tmp_dir.path(), 1, 2, false).unwrap();
744
745        // Create multiple temp files
746        let mut buffer1: Vec<MinimizerTupleExternal> = vec![
747            MinimizerTupleExternal { minimizer: 10, pos_in_seq: 1, pos_in_kmer: 0, num_kmers_in_super_kmer: 1 },
748            MinimizerTupleExternal { minimizer: 30, pos_in_seq: 3, pos_in_kmer: 0, num_kmers_in_super_kmer: 1 },
749        ];
750        sorter.sort_and_flush(&mut buffer1).unwrap();
751
752        let mut buffer2: Vec<MinimizerTupleExternal> = vec![
753            MinimizerTupleExternal { minimizer: 20, pos_in_seq: 2, pos_in_kmer: 0, num_kmers_in_super_kmer: 1 },
754            MinimizerTupleExternal { minimizer: 40, pos_in_seq: 4, pos_in_kmer: 0, num_kmers_in_super_kmer: 1 },
755        ];
756        sorter.sort_and_flush(&mut buffer2).unwrap();
757
758        assert_eq!(sorter.num_files(), 2);
759
760        // Merge
761        let result = sorter.merge().unwrap();
762        assert_eq!(result.num_super_kmers, 4);
763        assert_eq!(result.num_minimizers, 4);
764
765        // Read merged tuples
766        let tuples = sorter.read_merged_tuples().unwrap();
767        assert_eq!(tuples.len(), 4);
768
769        // Verify sorted order
770        assert_eq!(tuples[0].minimizer, 10);
771        assert_eq!(tuples[1].minimizer, 20);
772        assert_eq!(tuples[2].minimizer, 30);
773        assert_eq!(tuples[3].minimizer, 40);
774    }
775
776    #[test]
777    fn test_tuple_ordering() {
778        let t1 = MinimizerTupleExternal { minimizer: 100, pos_in_seq: 50, pos_in_kmer: 0, num_kmers_in_super_kmer: 1 };
779        let t2 = MinimizerTupleExternal { minimizer: 100, pos_in_seq: 60, pos_in_kmer: 0, num_kmers_in_super_kmer: 1 };
780        let t3 = MinimizerTupleExternal { minimizer: 200, pos_in_seq: 10, pos_in_kmer: 0, num_kmers_in_super_kmer: 1 };
781
782        assert!(t1 < t2);  // Same minimizer, different pos
783        assert!(t1 < t3);  // Different minimizer
784        assert!(t2 < t3);  // Different minimizer
785    }
786
787    #[test]
788    fn test_file_tuples_bucket_iter() {
789        let tmp_dir = TempDir::new().unwrap();
790        let sorter = ExternalSorter::new(tmp_dir.path(), 1, 2, false).unwrap();
791
792        // Create a sorted file with 2 buckets: minimizer 10 (2 tuples), minimizer 20 (1 tuple)
793        let mut buffer: Vec<MinimizerTupleExternal> = vec![
794            MinimizerTupleExternal { minimizer: 10, pos_in_seq: 1, pos_in_kmer: 0, num_kmers_in_super_kmer: 3 },
795            MinimizerTupleExternal { minimizer: 10, pos_in_seq: 2, pos_in_kmer: 0, num_kmers_in_super_kmer: 2 },
796            MinimizerTupleExternal { minimizer: 20, pos_in_seq: 5, pos_in_kmer: 1, num_kmers_in_super_kmer: 4 },
797        ];
798        sorter.sort_and_flush(&mut buffer).unwrap();
799        sorter.merge().unwrap();
800        let ft = sorter.open_merged_file().unwrap();
801
802        let buckets: Vec<BucketScan> = ft.bucket_iter().unwrap().collect();
803        assert_eq!(buckets.len(), 2);
804        assert_eq!(buckets[0].minimizer, 10);
805        assert_eq!(buckets[0].cached_size, 2);
806        assert_eq!(buckets[0].num_tuples, 2);
807        assert_eq!(buckets[0].num_kmers, 5); // 3 + 2
808        assert_eq!(buckets[1].minimizer, 20);
809        assert_eq!(buckets[1].cached_size, 1);
810        assert_eq!(buckets[1].num_tuples, 1);
811        assert_eq!(buckets[1].num_kmers, 4);
812    }
813
814    #[test]
815    fn test_sequential_reader() {
816        let tmp_dir = TempDir::new().unwrap();
817        let sorter = ExternalSorter::new(tmp_dir.path(), 1, 2, false).unwrap();
818
819        let mut buffer: Vec<MinimizerTupleExternal> = vec![
820            MinimizerTupleExternal { minimizer: 5, pos_in_seq: 10, pos_in_kmer: 0, num_kmers_in_super_kmer: 1 },
821            MinimizerTupleExternal { minimizer: 15, pos_in_seq: 20, pos_in_kmer: 0, num_kmers_in_super_kmer: 2 },
822        ];
823        sorter.sort_and_flush(&mut buffer).unwrap();
824        sorter.merge().unwrap();
825        let ft = sorter.open_merged_file().unwrap();
826
827        let mut reader = ft.sequential_reader().unwrap();
828        let t1 = reader.read_next().unwrap().unwrap();
829        let t1_min = t1.minimizer; // copy to avoid packed struct ref
830        assert_eq!(t1_min, 5);
831        let t2 = reader.read_next().unwrap().unwrap();
832        let t2_min = t2.minimizer;
833        assert_eq!(t2_min, 15);
834        assert!(reader.read_next().unwrap().is_none());
835    }
836}