1use crate::{
12 builder::{
13 buckets::{classify_into_buckets_inplace, ClassifiedBuckets, BucketStatistics, MIN_BUCKET_SIZE},
14 config::BuildConfiguration,
15 encode::Encoder,
16 external_sort::FileTuples,
17 minimizer_tuples::{compute_minimizer_tuples, compute_minimizer_tuples_external_file, needs_external_sorting},
18 },
19 dictionary::Dictionary,
20 kmer::{Kmer, KmerBits},
21 minimizers_control_map::{MinimizersControlMap, MinimizersControlMapBuilder, BucketType},
22 partitioned_mphf::PartitionedMphf,
23 sparse_and_skew_index::SparseAndSkewIndex,
24 spectrum_preserving_string_set::SpectrumPreservingStringSet,
25};
26use std::io::{BufWriter, Write};
27use tracing::info;
28
29pub struct BucketMetadata {
35 pub cached_sizes: Vec<u32>,
37 pub num_singleton: u64,
39 pub num_light: u64,
41 pub num_heavy: u64,
43}
44
45impl BucketMetadata {
46 #[inline]
48 pub fn num_buckets(&self) -> usize {
49 self.cached_sizes.len()
50 }
51}
52
53pub struct DictionaryBuilder {
55 config: BuildConfiguration,
56}
57
58impl DictionaryBuilder {
59 pub fn new(config: BuildConfiguration) -> Result<Self, String> {
61 config.validate()?;
62 Ok(Self { config })
63 }
64
65 pub fn build_from_sequences(&self, sequences: Vec<String>) -> Result<Dictionary, String> {
79 let pool = rayon::ThreadPoolBuilder::new()
82 .num_threads(self.config.num_threads)
83 .build()
84 .map_err(|e| format!("Failed to create thread pool: {e}"))?;
85
86 pool.install(|| self.build_from_sequences_inner(sequences))
87 }
88
89 fn build_from_sequences_inner(&self, sequences: Vec<String>) -> Result<Dictionary, String> {
91 self.config.print();
92 info!("Building SSHash Dictionary");
93
94 info!("Step 1: Encoding sequences...");
96 let (spss, num_sequences) = self.encode_sequences(sequences)?;
97 info!(" Encoded {} sequences", num_sequences);
98 info!(" Total bases: {}", spss.total_bases());
99
100 let total_bases = spss.total_bases();
102 let num_strings = spss.num_strings();
103 let k = self.config.k as u64;
104 let total_kmers = total_bases.saturating_sub(num_strings * (k - 1));
105
106 if needs_external_sorting(total_kmers, self.config.ram_limit_gib) {
107 info!("Using external sorting: estimated {} k-mers exceeds RAM limit of {} GiB",
108 total_kmers, self.config.ram_limit_gib);
109 self.build_with_external_sort(spss)
110 } else {
111 self.build_with_in_memory_tuples(spss)
112 }
113 }
114
115 fn build_with_in_memory_tuples(&self, spss: SpectrumPreservingStringSet) -> Result<Dictionary, String> {
120 info!("Step 2: Extracting and coalescing minimizer tuples (in-memory)...");
122 let tuples = crate::dispatch_on_k!(self.config.k, K => {
123 Ok::<_, String>(compute_minimizer_tuples::<K>(&spss, &self.config))
124 })?;
125 info!(" Extracted and coalesced {} tuples", tuples.len());
126
127 info!("Step 3: Classifying buckets (in-place)...");
129 let classified = classify_into_buckets_inplace(tuples);
130
131 let mut stats = BucketStatistics::new();
133 for i in 0..classified.num_buckets() {
134 stats.add_bucket(classified.bucket_tuples(i));
135 }
136 stats.print_summary();
137
138 info!("Step 4: Building minimizers control map...");
140 let (control_map, bucket_id_by_mphf_index) = self.build_control_map(&classified)?;
141 info!(" Built MPHF for {} minimizers", control_map.num_minimizers());
142
143 info!("Step 5: Building sparse and skew index...");
145 let mphf_order = if !bucket_id_by_mphf_index.is_empty() {
146 Some(bucket_id_by_mphf_index)
147 } else {
148 None
149 };
150 let index = self.build_index(&classified, mphf_order.as_deref(), &spss)?;
151 info!(" Index built successfully");
152
153 self.assemble_dictionary(spss, control_map, index)
154 }
155
156 fn build_with_external_sort(&self, spss: SpectrumPreservingStringSet) -> Result<Dictionary, String> {
165 info!("Step 2: External sort...");
167 let file_tuples = crate::dispatch_on_k!(self.config.k, K => {
168 compute_minimizer_tuples_external_file::<K>(&spss, &self.config)
169 .map_err(|e| e.to_string())
170 })?;
171 info!(" Sorted {} tuples to disk", file_tuples.num_tuples());
172
173 info!("Step 3: Scanning buckets (pass A + B)...");
175 let (bucket_meta, control_map, bucket_id_by_mphf_index) =
176 self.scan_and_build_control_map(&file_tuples)?;
177 info!(" Found {} buckets ({} singleton, {} light, {} heavy)",
178 bucket_meta.num_buckets(),
179 bucket_meta.num_singleton,
180 bucket_meta.num_light,
181 bucket_meta.num_heavy);
182 info!(" Built MPHF for {} minimizers", control_map.num_minimizers());
183
184 info!("Step 5: Building sparse and skew index (pass 2)...");
186 let mphf_order = if !bucket_id_by_mphf_index.is_empty() {
187 Some(bucket_id_by_mphf_index)
188 } else {
189 None
190 };
191 let index = self.build_index_from_file(
192 &file_tuples,
193 &bucket_meta,
194 mphf_order,
195 &spss,
196 )?;
197 info!(" Index built successfully");
198
199 self.assemble_dictionary(spss, control_map, index)
200 }
201
202 fn assemble_dictionary(
204 &self,
205 spss: SpectrumPreservingStringSet,
206 control_map: crate::minimizers_control_map::MinimizersControlMap,
207 index: SparseAndSkewIndex,
208 ) -> Result<Dictionary, String> {
209 info!("Dictionary Build Complete");
210 let total_bits = spss.num_bits() + control_map.num_bits() + index.num_bits();
211 info!("Total memory: {:.2} MB", total_bits as f64 / (8.0 * 1024.0 * 1024.0));
212
213 Ok(Dictionary::new(
214 spss,
215 control_map,
216 index,
217 self.config.k,
218 self.config.m,
219 self.config.canonical,
220 ))
221 }
222
223 fn scan_and_build_control_map(
236 &self,
237 file_tuples: &FileTuples,
238 ) -> Result<(BucketMetadata, MinimizersControlMap, Vec<usize>), String> {
239 let minimizers_path = file_tuples.path().with_extension("minimizers.tmp");
241 let mut num_buckets = 0usize;
242 {
243 let file = std::fs::File::create(&minimizers_path)
244 .map_err(|e| format!("Failed to create minimizers temp file: {e}"))?;
245 let mut writer = BufWriter::with_capacity(4 * 1024 * 1024, file);
246 let bucket_iter = file_tuples.bucket_iter()
247 .map_err(|e| format!("Failed to open file for pass A: {e}"))?;
248 for scan in bucket_iter {
249 writer.write_all(&scan.minimizer.to_ne_bytes())
250 .map_err(|e| format!("Failed to write minimizer: {e}"))?;
251 num_buckets += 1;
252 }
253 writer.flush().map_err(|e| format!("Failed to flush minimizers: {e}"))?;
254 }
255 let min_file = std::fs::File::open(&minimizers_path)
257 .map_err(|e| format!("Failed to open minimizers file: {e}"))?;
258 let min_mmap = unsafe { memmap2::Mmap::map(&min_file) }
259 .map_err(|e| format!("Failed to mmap minimizers file: {e}"))?;
260 assert_eq!(min_mmap.len(), num_buckets * 8);
261 let minimizers_slice: &[u64] = unsafe {
264 std::slice::from_raw_parts(min_mmap.as_ptr() as *const u64, num_buckets)
265 };
266 info!("Building PHast MPHF for {} minimizers (partitioned={})",
268 num_buckets, self.config.partitioned_mphf);
269 let mphf = PartitionedMphf::build_from_slice(minimizers_slice, self.config.partitioned_mphf);
270
271 drop(min_mmap);
273 drop(min_file);
274 let _ = std::fs::remove_file(&minimizers_path);
275
276 let mut cached_sizes: Vec<u32> = Vec::with_capacity(num_buckets);
278 let mut bucket_id_by_mphf_index = vec![0usize; num_buckets];
279 let mut num_singleton = 0u64;
280 let mut num_light = 0u64;
281 let mut num_heavy = 0u64;
282 let mut num_kmers = 0u64;
283
284 let bucket_iter = file_tuples.bucket_iter()
285 .map_err(|e| format!("Failed to open file for pass B: {e}"))?;
286 for (bucket_idx, scan) in bucket_iter.enumerate() {
287 cached_sizes.push(scan.cached_size as u32);
288
289 match scan.cached_size {
290 1 => num_singleton += 1,
291 2..=MIN_BUCKET_SIZE => num_light += 1,
292 _ => num_heavy += 1,
293 }
294
295 num_kmers += scan.num_kmers;
297
298 let mphf_idx = mphf.get(&scan.minimizer);
300 bucket_id_by_mphf_index[mphf_idx] = bucket_idx;
301 }
302
303 info!(" Total k-mers: {}", num_kmers);
304
305 let control_map = MinimizersControlMap::from_mphf(mphf, num_buckets as u64);
306
307 let bucket_meta = BucketMetadata {
308 cached_sizes,
309 num_singleton,
310 num_light,
311 num_heavy,
312 };
313
314 Ok((bucket_meta, control_map, bucket_id_by_mphf_index))
315 }
316
317 fn build_index_from_file(
322 &self,
323 file_tuples: &FileTuples,
324 bucket_meta: &BucketMetadata,
325 mphf_order: Option<Vec<usize>>,
326 spss: &SpectrumPreservingStringSet,
327 ) -> Result<SparseAndSkewIndex, String> {
328 let total_bases = spss.total_bases();
329 let num_bits_per_offset = crate::constants::ceil_log2(total_bases);
330
331 let index = crate::dispatch_on_k!(self.config.k, K => {
332 SparseAndSkewIndex::build_from_file::<K>(
333 file_tuples,
334 bucket_meta,
335 mphf_order,
336 num_bits_per_offset,
337 spss,
338 self.config.canonical,
339 ).map_err(|e| e.to_string())?
340 });
341
342 Ok(index)
343 }
344
345 fn encode_sequences(&self, sequences: Vec<String>) -> Result<(SpectrumPreservingStringSet, usize), String> {
347 let num_sequences = sequences.len();
348 let spss = crate::dispatch_on_k!(self.config.k, K => {
349 self.encode_sequences_k::<K>(sequences)?
350 });
351
352 Ok((spss, num_sequences))
353 }
354
355 fn encode_sequences_k<const K: usize>(&self, sequences: Vec<String>) -> Result<SpectrumPreservingStringSet, String>
357 where
358 Kmer<K>: KmerBits,
359 {
360 let mut encoder = Encoder::<K>::new();
361
362 for (idx, seq) in sequences.iter().enumerate() {
363 encoder.add_sequence(seq.as_bytes()).map_err(|e| {
364 format!("Failed to encode sequence {}: {}", idx, e)
365 })?;
366 }
367
368 Ok(encoder.build(self.config.m))
369 }
370
371 fn build_control_map(&self, classified: &ClassifiedBuckets) -> Result<(crate::minimizers_control_map::MinimizersControlMap, Vec<usize>), String> {
376 let mut builder = MinimizersControlMapBuilder::new();
377
378 for (bucket_id, bref) in classified.bucket_refs.iter().enumerate() {
379 builder.add_minimizer(bref.minimizer);
380
381 let bucket_type = match bref.bucket_type {
382 crate::builder::buckets::BucketType::Singleton => BucketType::Regular,
383 crate::builder::buckets::BucketType::Light => BucketType::Sparse,
384 crate::builder::buckets::BucketType::Heavy => BucketType::HeavyLoad,
385 };
386
387 builder.set_bucket_type(bref.minimizer, bucket_type);
388
389 if let Some(control) = builder.get_control_mut(bref.minimizer) {
390 control.metadata = bucket_id as u64;
391 }
392 }
393
394 let c = 100u16;
395 let alpha = 0.94;
396
397 builder.build(c, alpha, self.config.partitioned_mphf).map_err(|e| {
398 format!("Failed to build minimizers control map: {}", e)
399 })
400 }
401
402 fn build_index(
404 &self,
405 classified: &ClassifiedBuckets,
406 mphf_order: Option<&[usize]>,
407 spss: &SpectrumPreservingStringSet,
408 ) -> Result<SparseAndSkewIndex, String> {
409 let total_bases = spss.total_bases();
410 let num_bits_per_offset = crate::constants::ceil_log2(total_bases);
411
412 let index = crate::dispatch_on_k!(self.config.k, K => {
413 SparseAndSkewIndex::build_from_classified::<K>(classified, mphf_order, num_bits_per_offset, spss, self.config.canonical)
414 });
415
416 Ok(index)
417 }
418}
419
420#[cfg(test)]
421mod tests {
422 use super::*;
423
424 #[test]
425 fn test_dictionary_builder_creation() {
426 let config = BuildConfiguration::default();
427 let builder = DictionaryBuilder::new(config);
428 assert!(builder.is_ok());
429 }
430
431 #[test]
432 fn test_dictionary_builder_invalid_config() {
433 let config = BuildConfiguration { k: 30, ..BuildConfiguration::default() }; let builder = DictionaryBuilder::new(config);
435 assert!(builder.is_err());
436 }
437
438 #[test]
439 fn test_build_simple_dictionary() {
440 let config = BuildConfiguration::new(21, 11).unwrap();
441 let builder = DictionaryBuilder::new(config).unwrap();
442
443 let sequences = vec![
444 "ACGTACGTACGTACGTACGTACGT".to_string(),
445 "TGCATGCATGCATGCATGCATGCA".to_string(),
446 ];
447
448 let dict = builder.build_from_sequences(sequences);
449 println!("Dictionary build result: {:?}", dict.is_ok());
452 }
453}