use crate::{
builder::{
buckets::{classify_into_buckets_inplace, ClassifiedBuckets, BucketStatistics, MIN_BUCKET_SIZE},
config::BuildConfiguration,
encode::Encoder,
external_sort::FileTuples,
minimizer_tuples::{compute_minimizer_tuples, compute_minimizer_tuples_external_file, needs_external_sorting},
},
dictionary::Dictionary,
kmer::{Kmer, KmerBits},
minimizers_control_map::{MinimizersControlMap, MinimizersControlMapBuilder, BucketType},
partitioned_mphf::PartitionedMphf,
sparse_and_skew_index::SparseAndSkewIndex,
spectrum_preserving_string_set::SpectrumPreservingStringSet,
};
use std::io::{BufWriter, Write};
use tracing::info;
pub struct BucketMetadata {
pub cached_sizes: Vec<u32>,
pub num_singleton: u64,
pub num_light: u64,
pub num_heavy: u64,
}
impl BucketMetadata {
#[inline]
pub fn num_buckets(&self) -> usize {
self.cached_sizes.len()
}
}
pub struct DictionaryBuilder {
config: BuildConfiguration,
}
impl DictionaryBuilder {
pub fn new(config: BuildConfiguration) -> Result<Self, String> {
config.validate()?;
Ok(Self { config })
}
pub fn build_from_sequences(&self, sequences: Vec<String>) -> Result<Dictionary, String> {
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(self.config.num_threads)
.build()
.map_err(|e| format!("Failed to create thread pool: {e}"))?;
pool.install(|| self.build_from_sequences_inner(sequences))
}
fn build_from_sequences_inner(&self, sequences: Vec<String>) -> Result<Dictionary, String> {
self.config.print();
info!("Building SSHash Dictionary");
info!("Step 1: Encoding sequences...");
let (spss, num_sequences) = self.encode_sequences(sequences)?;
info!(" Encoded {} sequences", num_sequences);
info!(" Total bases: {}", spss.total_bases());
let total_bases = spss.total_bases();
let num_strings = spss.num_strings();
let k = self.config.k as u64;
let total_kmers = total_bases.saturating_sub(num_strings * (k - 1));
if needs_external_sorting(total_kmers, self.config.ram_limit_gib) {
info!("Using external sorting: estimated {} k-mers exceeds RAM limit of {} GiB",
total_kmers, self.config.ram_limit_gib);
self.build_with_external_sort(spss)
} else {
self.build_with_in_memory_tuples(spss)
}
}
fn build_with_in_memory_tuples(&self, spss: SpectrumPreservingStringSet) -> Result<Dictionary, String> {
info!("Step 2: Extracting and coalescing minimizer tuples (in-memory)...");
let tuples = crate::dispatch_on_k!(self.config.k, K => {
Ok::<_, String>(compute_minimizer_tuples::<K>(&spss, &self.config))
})?;
info!(" Extracted and coalesced {} tuples", tuples.len());
info!("Step 3: Classifying buckets (in-place)...");
let classified = classify_into_buckets_inplace(tuples);
let mut stats = BucketStatistics::new();
for i in 0..classified.num_buckets() {
stats.add_bucket(classified.bucket_tuples(i));
}
stats.print_summary();
info!("Step 4: Building minimizers control map...");
let (control_map, bucket_id_by_mphf_index) = self.build_control_map(&classified)?;
info!(" Built MPHF for {} minimizers", control_map.num_minimizers());
info!("Step 5: Building sparse and skew index...");
let mphf_order = if !bucket_id_by_mphf_index.is_empty() {
Some(bucket_id_by_mphf_index)
} else {
None
};
let index = self.build_index(&classified, mphf_order.as_deref(), &spss)?;
info!(" Index built successfully");
self.assemble_dictionary(spss, control_map, index)
}
fn build_with_external_sort(&self, spss: SpectrumPreservingStringSet) -> Result<Dictionary, String> {
info!("Step 2: External sort...");
let file_tuples = crate::dispatch_on_k!(self.config.k, K => {
compute_minimizer_tuples_external_file::<K>(&spss, &self.config)
.map_err(|e| e.to_string())
})?;
info!(" Sorted {} tuples to disk", file_tuples.num_tuples());
info!("Step 3: Scanning buckets (pass A + B)...");
let (bucket_meta, control_map, bucket_id_by_mphf_index) =
self.scan_and_build_control_map(&file_tuples)?;
info!(" Found {} buckets ({} singleton, {} light, {} heavy)",
bucket_meta.num_buckets(),
bucket_meta.num_singleton,
bucket_meta.num_light,
bucket_meta.num_heavy);
info!(" Built MPHF for {} minimizers", control_map.num_minimizers());
info!("Step 5: Building sparse and skew index (pass 2)...");
let mphf_order = if !bucket_id_by_mphf_index.is_empty() {
Some(bucket_id_by_mphf_index)
} else {
None
};
let index = self.build_index_from_file(
&file_tuples,
&bucket_meta,
mphf_order,
&spss,
)?;
info!(" Index built successfully");
self.assemble_dictionary(spss, control_map, index)
}
fn assemble_dictionary(
&self,
spss: SpectrumPreservingStringSet,
control_map: crate::minimizers_control_map::MinimizersControlMap,
index: SparseAndSkewIndex,
) -> Result<Dictionary, String> {
info!("Dictionary Build Complete");
let total_bits = spss.num_bits() + control_map.num_bits() + index.num_bits();
info!("Total memory: {:.2} MB", total_bits as f64 / (8.0 * 1024.0 * 1024.0));
Ok(Dictionary::new(
spss,
control_map,
index,
self.config.k,
self.config.m,
self.config.canonical,
))
}
fn scan_and_build_control_map(
&self,
file_tuples: &FileTuples,
) -> Result<(BucketMetadata, MinimizersControlMap, Vec<usize>), String> {
let minimizers_path = file_tuples.path().with_extension("minimizers.tmp");
let mut num_buckets = 0usize;
{
let file = std::fs::File::create(&minimizers_path)
.map_err(|e| format!("Failed to create minimizers temp file: {e}"))?;
let mut writer = BufWriter::with_capacity(4 * 1024 * 1024, file);
let bucket_iter = file_tuples.bucket_iter()
.map_err(|e| format!("Failed to open file for pass A: {e}"))?;
for scan in bucket_iter {
writer.write_all(&scan.minimizer.to_ne_bytes())
.map_err(|e| format!("Failed to write minimizer: {e}"))?;
num_buckets += 1;
}
writer.flush().map_err(|e| format!("Failed to flush minimizers: {e}"))?;
}
let min_file = std::fs::File::open(&minimizers_path)
.map_err(|e| format!("Failed to open minimizers file: {e}"))?;
let min_mmap = unsafe { memmap2::Mmap::map(&min_file) }
.map_err(|e| format!("Failed to mmap minimizers file: {e}"))?;
assert_eq!(min_mmap.len(), num_buckets * 8);
let minimizers_slice: &[u64] = unsafe {
std::slice::from_raw_parts(min_mmap.as_ptr() as *const u64, num_buckets)
};
info!("Building PHast MPHF for {} minimizers (partitioned={})",
num_buckets, self.config.partitioned_mphf);
let mphf = PartitionedMphf::build_from_slice(minimizers_slice, self.config.partitioned_mphf);
drop(min_mmap);
drop(min_file);
let _ = std::fs::remove_file(&minimizers_path);
let mut cached_sizes: Vec<u32> = Vec::with_capacity(num_buckets);
let mut bucket_id_by_mphf_index = vec![0usize; num_buckets];
let mut num_singleton = 0u64;
let mut num_light = 0u64;
let mut num_heavy = 0u64;
let mut num_kmers = 0u64;
let bucket_iter = file_tuples.bucket_iter()
.map_err(|e| format!("Failed to open file for pass B: {e}"))?;
for (bucket_idx, scan) in bucket_iter.enumerate() {
cached_sizes.push(scan.cached_size as u32);
match scan.cached_size {
1 => num_singleton += 1,
2..=MIN_BUCKET_SIZE => num_light += 1,
_ => num_heavy += 1,
}
num_kmers += scan.num_kmers;
let mphf_idx = mphf.get(&scan.minimizer);
bucket_id_by_mphf_index[mphf_idx] = bucket_idx;
}
info!(" Total k-mers: {}", num_kmers);
let control_map = MinimizersControlMap::from_mphf(mphf, num_buckets as u64);
let bucket_meta = BucketMetadata {
cached_sizes,
num_singleton,
num_light,
num_heavy,
};
Ok((bucket_meta, control_map, bucket_id_by_mphf_index))
}
fn build_index_from_file(
&self,
file_tuples: &FileTuples,
bucket_meta: &BucketMetadata,
mphf_order: Option<Vec<usize>>,
spss: &SpectrumPreservingStringSet,
) -> Result<SparseAndSkewIndex, String> {
let total_bases = spss.total_bases();
let num_bits_per_offset = crate::constants::ceil_log2(total_bases);
let index = crate::dispatch_on_k!(self.config.k, K => {
SparseAndSkewIndex::build_from_file::<K>(
file_tuples,
bucket_meta,
mphf_order,
num_bits_per_offset,
spss,
self.config.canonical,
).map_err(|e| e.to_string())?
});
Ok(index)
}
fn encode_sequences(&self, sequences: Vec<String>) -> Result<(SpectrumPreservingStringSet, usize), String> {
let num_sequences = sequences.len();
let spss = crate::dispatch_on_k!(self.config.k, K => {
self.encode_sequences_k::<K>(sequences)?
});
Ok((spss, num_sequences))
}
fn encode_sequences_k<const K: usize>(&self, sequences: Vec<String>) -> Result<SpectrumPreservingStringSet, String>
where
Kmer<K>: KmerBits,
{
let mut encoder = Encoder::<K>::new();
for (idx, seq) in sequences.iter().enumerate() {
encoder.add_sequence(seq.as_bytes()).map_err(|e| {
format!("Failed to encode sequence {}: {}", idx, e)
})?;
}
Ok(encoder.build(self.config.m))
}
fn build_control_map(&self, classified: &ClassifiedBuckets) -> Result<(crate::minimizers_control_map::MinimizersControlMap, Vec<usize>), String> {
let mut builder = MinimizersControlMapBuilder::new();
for (bucket_id, bref) in classified.bucket_refs.iter().enumerate() {
builder.add_minimizer(bref.minimizer);
let bucket_type = match bref.bucket_type {
crate::builder::buckets::BucketType::Singleton => BucketType::Regular,
crate::builder::buckets::BucketType::Light => BucketType::Sparse,
crate::builder::buckets::BucketType::Heavy => BucketType::HeavyLoad,
};
builder.set_bucket_type(bref.minimizer, bucket_type);
if let Some(control) = builder.get_control_mut(bref.minimizer) {
control.metadata = bucket_id as u64;
}
}
let c = 100u16;
let alpha = 0.94;
builder.build(c, alpha, self.config.partitioned_mphf).map_err(|e| {
format!("Failed to build minimizers control map: {}", e)
})
}
fn build_index(
&self,
classified: &ClassifiedBuckets,
mphf_order: Option<&[usize]>,
spss: &SpectrumPreservingStringSet,
) -> Result<SparseAndSkewIndex, String> {
let total_bases = spss.total_bases();
let num_bits_per_offset = crate::constants::ceil_log2(total_bases);
let index = crate::dispatch_on_k!(self.config.k, K => {
SparseAndSkewIndex::build_from_classified::<K>(classified, mphf_order, num_bits_per_offset, spss, self.config.canonical)
});
Ok(index)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_dictionary_builder_creation() {
let config = BuildConfiguration::default();
let builder = DictionaryBuilder::new(config);
assert!(builder.is_ok());
}
#[test]
fn test_dictionary_builder_invalid_config() {
let config = BuildConfiguration { k: 30, ..BuildConfiguration::default() }; let builder = DictionaryBuilder::new(config);
assert!(builder.is_err());
}
#[test]
fn test_build_simple_dictionary() {
let config = BuildConfiguration::new(21, 11).unwrap();
let builder = DictionaryBuilder::new(config).unwrap();
let sequences = vec![
"ACGTACGTACGTACGTACGTACGT".to_string(),
"TGCATGCATGCATGCATGCATGCA".to_string(),
];
let dict = builder.build_from_sequences(sequences);
println!("Dictionary build result: {:?}", dict.is_ok());
}
}