Skip to main content

sshash_lib/builder/
dictionary_builder.rs

1//! Dictionary builder orchestration
2//!
3//! Coordinates the multi-step pipeline to build an SSHash dictionary:
4//! 1. Parse and encode sequences into SPSS
5//! 2. Extract minimizer tuples
6//! 3. Classify buckets
7//! 4. Build minimizers control map
8//! 5. Build sparse and skew index
9//! 6. Assemble final dictionary
10
11use crate::{
12    builder::{
13        buckets::{classify_into_buckets_inplace, ClassifiedBuckets, BucketStatistics, MIN_BUCKET_SIZE},
14        config::BuildConfiguration,
15        encode::Encoder,
16        external_sort::FileTuples,
17        minimizer_tuples::{compute_minimizer_tuples, compute_minimizer_tuples_external_file, needs_external_sorting},
18    },
19    dictionary::Dictionary,
20    kmer::{Kmer, KmerBits},
21    minimizers_control_map::{MinimizersControlMap, MinimizersControlMapBuilder, BucketType},
22    partitioned_mphf::PartitionedMphf,
23    sparse_and_skew_index::SparseAndSkewIndex,
24    spectrum_preserving_string_set::SpectrumPreservingStringSet,
25};
26use std::io::{BufWriter, Write};
27use tracing::info;
28
29/// Bucket metadata collected during a sequential scan of sorted tuples.
30///
31/// Only stores per-bucket sizes (needed for EF construction and pass 2 routing).
32/// Minimizer values are fed directly to the MPHF builder during the scan,
33/// and bucket start positions are recovered by rescanning the file sequentially.
34pub struct BucketMetadata {
35    /// Number of unique super-kmers per bucket.
36    pub cached_sizes: Vec<u32>,
37    /// Count of singleton buckets (cached_size == 1).
38    pub num_singleton: u64,
39    /// Count of light buckets (2 <= cached_size <= MIN_BUCKET_SIZE).
40    pub num_light: u64,
41    /// Count of heavy buckets (cached_size > MIN_BUCKET_SIZE).
42    pub num_heavy: u64,
43}
44
45impl BucketMetadata {
46    /// Number of buckets.
47    #[inline]
48    pub fn num_buckets(&self) -> usize {
49        self.cached_sizes.len()
50    }
51}
52
53/// Builder for constructing SSHash dictionaries
54pub struct DictionaryBuilder {
55    config: BuildConfiguration,
56}
57
58impl DictionaryBuilder {
59    /// Create a new dictionary builder with the given configuration
60    pub fn new(config: BuildConfiguration) -> Result<Self, String> {
61        config.validate()?;
62        Ok(Self { config })
63    }
64    
65    /// Build a dictionary from input sequences
66    ///
67    /// # Arguments
68    /// * `sequences` - Vector of DNA sequences (strings)
69    ///
70    /// # Parallelism
71    /// The number of threads is controlled by `config.num_threads`:
72    /// - `0` — use all available CPU cores (rayon default)
73    /// - `1` — single-threaded (no rayon overhead)
74    /// - `N` — use exactly N threads
75    ///
76    /// # Returns
77    /// A fully constructed Dictionary ready for queries
78    pub fn build_from_sequences(&self, sequences: Vec<String>) -> Result<Dictionary, String> {
79        // Build a rayon thread pool sized to config.num_threads.
80        // num_threads == 0 means "all cores" (rayon default).
81        let pool = rayon::ThreadPoolBuilder::new()
82            .num_threads(self.config.num_threads)
83            .build()
84            .map_err(|e| format!("Failed to create thread pool: {e}"))?;
85
86        pool.install(|| self.build_from_sequences_inner(sequences))
87    }
88
89    /// Inner build logic, runs inside the rayon thread pool
90    fn build_from_sequences_inner(&self, sequences: Vec<String>) -> Result<Dictionary, String> {
91        self.config.print();
92        info!("Building SSHash Dictionary");
93
94        // Step 1: Encode sequences into SPSS
95        info!("Step 1: Encoding sequences...");
96        let (spss, num_sequences) = self.encode_sequences(sequences)?;
97        info!("  Encoded {} sequences", num_sequences);
98        info!("  Total bases: {}", spss.total_bases());
99
100        // Decide between external sort (streaming from disk) or in-memory paths
101        let total_bases = spss.total_bases();
102        let num_strings = spss.num_strings();
103        let k = self.config.k as u64;
104        let total_kmers = total_bases.saturating_sub(num_strings * (k - 1));
105
106        if needs_external_sorting(total_kmers, self.config.ram_limit_gib) {
107            info!("Using external sorting: estimated {} k-mers exceeds RAM limit of {} GiB",
108                total_kmers, self.config.ram_limit_gib);
109            self.build_with_external_sort(spss)
110        } else {
111            self.build_with_in_memory_tuples(spss)
112        }
113    }
114
115    /// In-memory build path: all tuples fit in RAM.
116    ///
117    /// This is the original pipeline: parallel tuple extraction → sort → classify
118    /// in-place → build MPHF → build sparse/skew index.
119    fn build_with_in_memory_tuples(&self, spss: SpectrumPreservingStringSet) -> Result<Dictionary, String> {
120        // Step 2: Extract minimizer tuples (with inline coalescing during extraction)
121        info!("Step 2: Extracting and coalescing minimizer tuples (in-memory)...");
122        let tuples = crate::dispatch_on_k!(self.config.k, K => {
123            Ok::<_, String>(compute_minimizer_tuples::<K>(&spss, &self.config))
124        })?;
125        info!("  Extracted and coalesced {} tuples", tuples.len());
126
127        // Step 3: Classify into buckets (in-place — no tuple duplication)
128        info!("Step 3: Classifying buckets (in-place)...");
129        let classified = classify_into_buckets_inplace(tuples);
130
131        // Compute statistics
132        let mut stats = BucketStatistics::new();
133        for i in 0..classified.num_buckets() {
134            stats.add_bucket(classified.bucket_tuples(i));
135        }
136        stats.print_summary();
137
138        // Step 4: Build minimizers control map
139        info!("Step 4: Building minimizers control map...");
140        let (control_map, bucket_id_by_mphf_index) = self.build_control_map(&classified)?;
141        info!("  Built MPHF for {} minimizers", control_map.num_minimizers());
142
143        // Step 5: Build sparse and skew index
144        info!("Step 5: Building sparse and skew index...");
145        let mphf_order = if !bucket_id_by_mphf_index.is_empty() {
146            Some(bucket_id_by_mphf_index)
147        } else {
148            None
149        };
150        let index = self.build_index(&classified, mphf_order.as_deref(), &spss)?;
151        info!("  Index built successfully");
152
153        self.assemble_dictionary(spss, control_map, index)
154    }
155
156    /// External sort build path: tuples streamed from file via buffered I/O.
157    ///
158    /// Following the C++ approach: never materializes all tuples in memory.
159    /// Instead, scans the file multiple times via fresh `BufReader`s:
160    /// - Pass A: Scan buckets → write minimizers to temp file → build MPHF
161    /// - Pass B: Rescan → collect cached_sizes + bucket_id_by_mphf_index
162    /// - Pass 2: Build sparse index (fill offsets) + collect heavy bucket
163    ///   tuples for skew index
164    fn build_with_external_sort(&self, spss: SpectrumPreservingStringSet) -> Result<Dictionary, String> {
165        // Step 2: External sort → merged file (accessed via buffered I/O)
166        info!("Step 2: External sort...");
167        let file_tuples = crate::dispatch_on_k!(self.config.k, K => {
168            compute_minimizer_tuples_external_file::<K>(&spss, &self.config)
169                .map_err(|e| e.to_string())
170        })?;
171        info!("  Sorted {} tuples to disk", file_tuples.num_tuples());
172
173        // Steps 3+4: Scan file → feed minimizers to MPHF builder + collect sizes
174        info!("Step 3: Scanning buckets (pass A + B)...");
175        let (bucket_meta, control_map, bucket_id_by_mphf_index) =
176            self.scan_and_build_control_map(&file_tuples)?;
177        info!("  Found {} buckets ({} singleton, {} light, {} heavy)",
178            bucket_meta.num_buckets(),
179            bucket_meta.num_singleton,
180            bucket_meta.num_light,
181            bucket_meta.num_heavy);
182        info!("  Built MPHF for {} minimizers", control_map.num_minimizers());
183
184        // Step 5: Build sparse and skew index from file (pass 2)
185        info!("Step 5: Building sparse and skew index (pass 2)...");
186        let mphf_order = if !bucket_id_by_mphf_index.is_empty() {
187            Some(bucket_id_by_mphf_index)
188        } else {
189            None
190        };
191        let index = self.build_index_from_file(
192            &file_tuples,
193            &bucket_meta,
194            mphf_order,
195            &spss,
196        )?;
197        info!("  Index built successfully");
198
199        self.assemble_dictionary(spss, control_map, index)
200    }
201
202    /// Assemble the final dictionary from its components.
203    fn assemble_dictionary(
204        &self,
205        spss: SpectrumPreservingStringSet,
206        control_map: crate::minimizers_control_map::MinimizersControlMap,
207        index: SparseAndSkewIndex,
208    ) -> Result<Dictionary, String> {
209        info!("Dictionary Build Complete");
210        let total_bits = spss.num_bits() + control_map.num_bits() + index.num_bits();
211        info!("Total memory: {:.2} MB", total_bits as f64 / (8.0 * 1024.0 * 1024.0));
212
213        Ok(Dictionary::new(
214            spss,
215            control_map,
216            index,
217            self.config.k,
218            self.config.m,
219            self.config.canonical,
220        ))
221    }
222    
223    /// Scan the merged file, collect minimizers, and build MPHF directly.
224    ///
225    /// Bypasses `MinimizersControlMapBuilder` entirely for the external sort path.
226    /// Splits scanning into two passes to avoid holding 3.2 GB minimizers Vec
227    /// and 1.6 GB cached_sizes Vec simultaneously:
228    ///
229    /// - Pass A: Scan file → write minimizers to temp file → mmap it → build MPHF
230    /// - Pass B: Rescan file → build bucket_id_by_mphf_index + collect cached_sizes
231    ///
232    /// Each pass opens a fresh `BufReader` — no mmap pages linger between passes.
233    ///
234    /// Returns `(bucket_meta, control_map, bucket_id_by_mphf_index)`.
235    fn scan_and_build_control_map(
236        &self,
237        file_tuples: &FileTuples,
238    ) -> Result<(BucketMetadata, MinimizersControlMap, Vec<usize>), String> {
239        // --- Pass A: Write minimizers to temp file, mmap it, build MPHF ---
240        let minimizers_path = file_tuples.path().with_extension("minimizers.tmp");
241        let mut num_buckets = 0usize;
242        {
243            let file = std::fs::File::create(&minimizers_path)
244                .map_err(|e| format!("Failed to create minimizers temp file: {e}"))?;
245            let mut writer = BufWriter::with_capacity(4 * 1024 * 1024, file);
246            let bucket_iter = file_tuples.bucket_iter()
247                .map_err(|e| format!("Failed to open file for pass A: {e}"))?;
248            for scan in bucket_iter {
249                writer.write_all(&scan.minimizer.to_ne_bytes())
250                    .map_err(|e| format!("Failed to write minimizer: {e}"))?;
251                num_buckets += 1;
252            }
253            writer.flush().map_err(|e| format!("Failed to flush minimizers: {e}"))?;
254        }
255        // Mmap the minimizers file as a &[u64] slice for PHast
256        let min_file = std::fs::File::open(&minimizers_path)
257            .map_err(|e| format!("Failed to open minimizers file: {e}"))?;
258        let min_mmap = unsafe { memmap2::Mmap::map(&min_file) }
259            .map_err(|e| format!("Failed to mmap minimizers file: {e}"))?;
260        assert_eq!(min_mmap.len(), num_buckets * 8);
261        // SAFETY: the file contains native-endian u64 values we just wrote,
262        // and u64 has alignment 8 which mmap guarantees (page-aligned).
263        let minimizers_slice: &[u64] = unsafe {
264            std::slice::from_raw_parts(min_mmap.as_ptr() as *const u64, num_buckets)
265        };
266        // Build MPHF from the mmap'd slice — no heap allocation for keys
267        info!("Building PHast MPHF for {} minimizers (partitioned={})",
268            num_buckets, self.config.partitioned_mphf);
269        let mphf = PartitionedMphf::build_from_slice(minimizers_slice, self.config.partitioned_mphf);
270
271        // Drop minimizers mmap + delete temp file
272        drop(min_mmap);
273        drop(min_file);
274        let _ = std::fs::remove_file(&minimizers_path);
275
276        // --- Pass B: Rescan file → build bucket_id_by_mphf_index + cached_sizes ---
277        let mut cached_sizes: Vec<u32> = Vec::with_capacity(num_buckets);
278        let mut bucket_id_by_mphf_index = vec![0usize; num_buckets];
279        let mut num_singleton = 0u64;
280        let mut num_light = 0u64;
281        let mut num_heavy = 0u64;
282        let mut num_kmers = 0u64;
283
284        let bucket_iter = file_tuples.bucket_iter()
285            .map_err(|e| format!("Failed to open file for pass B: {e}"))?;
286        for (bucket_idx, scan) in bucket_iter.enumerate() {
287            cached_sizes.push(scan.cached_size as u32);
288
289            match scan.cached_size {
290                1 => num_singleton += 1,
291                2..=MIN_BUCKET_SIZE => num_light += 1,
292                _ => num_heavy += 1,
293            }
294
295            // num_kmers is pre-computed by FileBucketIter
296            num_kmers += scan.num_kmers;
297
298            // Look up MPHF index for this bucket's minimizer
299            let mphf_idx = mphf.get(&scan.minimizer);
300            bucket_id_by_mphf_index[mphf_idx] = bucket_idx;
301        }
302
303        info!("  Total k-mers: {}", num_kmers);
304
305        let control_map = MinimizersControlMap::from_mphf(mphf, num_buckets as u64);
306
307        let bucket_meta = BucketMetadata {
308            cached_sizes,
309            num_singleton,
310            num_light,
311            num_heavy,
312        };
313
314        Ok((bucket_meta, control_map, bucket_id_by_mphf_index))
315    }
316
317    /// Build the sparse and skew index from file tuples (streaming path).
318    ///
319    /// Takes `mphf_order` by value so it can be dropped after computing
320    /// offset_start_by_orig (saves ~3.2 GB for 400M buckets).
321    fn build_index_from_file(
322        &self,
323        file_tuples: &FileTuples,
324        bucket_meta: &BucketMetadata,
325        mphf_order: Option<Vec<usize>>,
326        spss: &SpectrumPreservingStringSet,
327    ) -> Result<SparseAndSkewIndex, String> {
328        let total_bases = spss.total_bases();
329        let num_bits_per_offset = crate::constants::ceil_log2(total_bases);
330
331        let index = crate::dispatch_on_k!(self.config.k, K => {
332            SparseAndSkewIndex::build_from_file::<K>(
333                file_tuples,
334                bucket_meta,
335                mphf_order,
336                num_bits_per_offset,
337                spss,
338                self.config.canonical,
339            ).map_err(|e| e.to_string())?
340        });
341
342        Ok(index)
343    }
344
345    /// Encode sequences into spectrum-preserving string set
346    fn encode_sequences(&self, sequences: Vec<String>) -> Result<(SpectrumPreservingStringSet, usize), String> {
347        let num_sequences = sequences.len();
348        let spss = crate::dispatch_on_k!(self.config.k, K => {
349            self.encode_sequences_k::<K>(sequences)?
350        });
351        
352        Ok((spss, num_sequences))
353    }
354    
355    /// Encode sequences with specific K
356    fn encode_sequences_k<const K: usize>(&self, sequences: Vec<String>) -> Result<SpectrumPreservingStringSet, String>
357    where
358        Kmer<K>: KmerBits,
359    {
360        let mut encoder = Encoder::<K>::new();
361        
362        for (idx, seq) in sequences.iter().enumerate() {
363            encoder.add_sequence(seq.as_bytes()).map_err(|e| {
364                format!("Failed to encode sequence {}: {}", idx, e)
365            })?;
366        }
367        
368        Ok(encoder.build(self.config.m))
369    }
370    
371    /// Build the minimizers control map from classified buckets (in-memory path).
372    ///
373    /// Returns the control map AND a mapping from MPHF index to bucket_id
374    /// for reordering control_codewords to MPHF order.
375    fn build_control_map(&self, classified: &ClassifiedBuckets) -> Result<(crate::minimizers_control_map::MinimizersControlMap, Vec<usize>), String> {
376        let mut builder = MinimizersControlMapBuilder::new();
377
378        for (bucket_id, bref) in classified.bucket_refs.iter().enumerate() {
379            builder.add_minimizer(bref.minimizer);
380
381            let bucket_type = match bref.bucket_type {
382                crate::builder::buckets::BucketType::Singleton => BucketType::Regular,
383                crate::builder::buckets::BucketType::Light => BucketType::Sparse,
384                crate::builder::buckets::BucketType::Heavy => BucketType::HeavyLoad,
385            };
386
387            builder.set_bucket_type(bref.minimizer, bucket_type);
388
389            if let Some(control) = builder.get_control_mut(bref.minimizer) {
390                control.metadata = bucket_id as u64;
391            }
392        }
393
394        let c = 100u16;
395        let alpha = 0.94;
396
397        builder.build(c, alpha, self.config.partitioned_mphf).map_err(|e| {
398            format!("Failed to build minimizers control map: {}", e)
399        })
400    }
401    
402    /// Build the sparse and skew index
403    fn build_index(
404        &self,
405        classified: &ClassifiedBuckets,
406        mphf_order: Option<&[usize]>,
407        spss: &SpectrumPreservingStringSet,
408    ) -> Result<SparseAndSkewIndex, String> {
409        let total_bases = spss.total_bases();
410        let num_bits_per_offset = crate::constants::ceil_log2(total_bases);
411
412        let index = crate::dispatch_on_k!(self.config.k, K => {
413            SparseAndSkewIndex::build_from_classified::<K>(classified, mphf_order, num_bits_per_offset, spss, self.config.canonical)
414        });
415
416        Ok(index)
417    }
418}
419
420#[cfg(test)]
421mod tests {
422    use super::*;
423    
424    #[test]
425    fn test_dictionary_builder_creation() {
426        let config = BuildConfiguration::default();
427        let builder = DictionaryBuilder::new(config);
428        assert!(builder.is_ok());
429    }
430    
431    #[test]
432    fn test_dictionary_builder_invalid_config() {
433        let config = BuildConfiguration { k: 30, ..BuildConfiguration::default() }; // Even k is invalid
434        let builder = DictionaryBuilder::new(config);
435        assert!(builder.is_err());
436    }
437    
438    #[test]
439    fn test_build_simple_dictionary() {
440        let config = BuildConfiguration::new(21, 11).unwrap();
441        let builder = DictionaryBuilder::new(config).unwrap();
442        
443        let sequences = vec![
444            "ACGTACGTACGTACGTACGTACGT".to_string(),
445            "TGCATGCATGCATGCATGCATGCA".to_string(),
446        ];
447        
448        let dict = builder.build_from_sequences(sequences);
449        // Note: This test may fail until we have proper k-mer extraction
450        // in the build pipeline. For now, just check that it runs.
451        println!("Dictionary build result: {:?}", dict.is_ok());
452    }
453}