use std::cmp::Ordering;
use std::sync::Arc;
use rayon::prelude::*;
use tracing::info;
use crate::{
kmer::{Kmer, KmerBits},
minimizer::MinimizerIterator,
spectrum_preserving_string_set::SpectrumPreservingStringSet,
};
use super::config::BuildConfiguration;
use super::external_sort::{ExternalSorter, MinimizerTupleExternal, GIB};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct MinimizerTuple {
pub minimizer: u64,
pub pos_in_seq: u64,
pub pos_in_kmer: u8,
pub num_kmers_in_super_kmer: u8,
}
impl MinimizerTuple {
pub fn new(minimizer: u64, pos_in_seq: u64, pos_in_kmer: u8) -> Self {
Self {
minimizer,
pos_in_seq,
pos_in_kmer,
num_kmers_in_super_kmer: 1,
}
}
}
impl PartialOrd for MinimizerTuple {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for MinimizerTuple {
fn cmp(&self, other: &Self) -> Ordering {
match self.minimizer.cmp(&other.minimizer) {
Ordering::Equal => {
self.pos_in_seq.cmp(&other.pos_in_seq)
}
other => other,
}
}
}
pub fn compute_minimizer_tuples<const K: usize>(
spss: &SpectrumPreservingStringSet,
config: &BuildConfiguration,
) -> Vec<MinimizerTuple>
where
Kmer<K>: KmerBits,
{
let k = K;
let m = config.m;
if k <= m {
panic!("k must be > m (k={}, m={})", k, m);
}
let num_strings = spss.num_strings() as usize;
let mut tuples: Vec<MinimizerTuple> = (0..num_strings)
.into_par_iter()
.flat_map_iter(|string_id| {
extract_tuples_for_string::<K>(spss, string_id as u64, k, m, config)
})
.collect();
tuples.par_sort_unstable();
tuples
}
fn extract_tuples_for_string<const K: usize>(
spss: &SpectrumPreservingStringSet,
string_id: u64,
k: usize,
m: usize,
config: &BuildConfiguration,
) -> Vec<MinimizerTuple>
where
Kmer<K>: KmerBits,
{
let string_len = spss.string_length(string_id);
if string_len < k {
return Vec::new();
}
let num_kmers = string_len - k + 1;
let mut tuples = Vec::new();
let mut minimizer_iter = MinimizerIterator::with_seed(k, m, config.seed);
let string_begin = spss.string_offset(string_id);
minimizer_iter.set_position(string_begin);
let mut prev_mini_tuple: Option<MinimizerTuple> = None;
let mut num_kmers_in_super_kmer = 0u8;
for kmer_pos in 0..num_kmers {
let kmer = spss.decode_kmer::<K>(string_id, kmer_pos);
let forward_minimizer = minimizer_iter.next(kmer);
let (final_minimizer, final_pos_in_kmer) = if config.canonical {
let kmer_rc = kmer.reverse_complement();
let mut fresh_rc_iter = MinimizerIterator::with_seed(k, m, config.seed);
let rc_minimizer = fresh_rc_iter.next(kmer_rc);
if rc_minimizer.value < forward_minimizer.value {
let adjusted_pos = (k - m) as u8 - rc_minimizer.pos_in_kmer as u8;
(rc_minimizer.value, adjusted_pos)
} else {
(forward_minimizer.value, forward_minimizer.pos_in_kmer as u8)
}
} else {
(forward_minimizer.value, forward_minimizer.pos_in_kmer as u8)
};
let absolute_pos_in_seq = string_begin + kmer_pos as u64 + final_pos_in_kmer as u64;
let current_mini = MinimizerTuple {
minimizer: final_minimizer,
pos_in_seq: absolute_pos_in_seq,
pos_in_kmer: final_pos_in_kmer,
num_kmers_in_super_kmer: 1, };
if let Some(prev) = prev_mini_tuple.take() {
if current_mini.minimizer == prev.minimizer && current_mini.pos_in_seq == prev.pos_in_seq {
num_kmers_in_super_kmer += 1;
prev_mini_tuple = Some(prev);
} else {
let mut saved = prev;
saved.num_kmers_in_super_kmer = num_kmers_in_super_kmer;
tuples.push(saved);
prev_mini_tuple = Some(current_mini);
num_kmers_in_super_kmer = 1;
}
} else {
prev_mini_tuple = Some(current_mini);
num_kmers_in_super_kmer = 1;
}
}
if let Some(mut last) = prev_mini_tuple.take() {
last.num_kmers_in_super_kmer = num_kmers_in_super_kmer;
tuples.push(last);
}
tuples
}
pub fn estimate_memory_bytes(total_kmers: u64) -> u64 {
total_kmers * 24 * 2
}
pub fn needs_external_sorting(total_kmers: u64, ram_limit_gib: usize) -> bool {
if ram_limit_gib == 0 {
return false; }
let estimated_bytes = estimate_memory_bytes(total_kmers);
let ram_bytes = (ram_limit_gib as u64) * (GIB as u64);
estimated_bytes > ram_bytes
}
pub fn compute_minimizer_tuples_external<const K: usize>(
spss: &SpectrumPreservingStringSet,
config: &BuildConfiguration,
) -> Result<Vec<MinimizerTuple>, std::io::Error>
where
Kmer<K>: KmerBits,
{
let sorter = run_external_sort::<K>(spss, config)?;
sorter.read_merged_tuples()
}
pub fn compute_minimizer_tuples_external_file<const K: usize>(
spss: &SpectrumPreservingStringSet,
config: &BuildConfiguration,
) -> Result<super::external_sort::FileTuples, std::io::Error>
where
Kmer<K>: KmerBits,
{
let sorter = run_external_sort::<K>(spss, config)?;
sorter.open_merged_file()
}
fn run_external_sort<const K: usize>(
spss: &SpectrumPreservingStringSet,
config: &BuildConfiguration,
) -> Result<ExternalSorter, std::io::Error>
where
Kmer<K>: KmerBits,
{
let k = K;
let m = config.m;
if k <= m {
panic!("k must be > m (k={}, m={})", k, m);
}
let num_threads = if config.num_threads == 0 {
rayon::current_num_threads()
} else {
config.num_threads
};
let sorter = Arc::new(ExternalSorter::new(
&config.tmp_dirname,
config.ram_limit_gib,
num_threads,
config.verbose,
)?);
let buffer_size = sorter.buffer_size_per_thread();
info!(
"External sorting: {} threads, {} GiB RAM limit, {} tuples/buffer",
num_threads, config.ram_limit_gib, buffer_size
);
let num_strings = spss.num_strings() as usize;
let num_strings_per_thread = num_strings.div_ceil(num_threads);
std::thread::scope(|scope| {
let mut handles = Vec::with_capacity(num_threads);
for t in 0..num_threads {
let begin = t * num_strings_per_thread;
if begin >= num_strings {
break;
}
let end = (begin + num_strings_per_thread).min(num_strings);
let sorter = Arc::clone(&sorter);
let handle = scope.spawn(move || {
process_string_range_external::<K>(
spss,
begin..end,
k,
m,
config,
&sorter,
buffer_size,
)
});
handles.push(handle);
}
for handle in handles {
handle.join().expect("Thread panicked")?;
}
Ok::<(), std::io::Error>(())
})?;
let _merge_result = sorter.merge()?;
Arc::try_unwrap(sorter).map_err(|_| {
std::io::Error::other("ExternalSorter still has multiple owners")
})
}
fn process_string_range_external<const K: usize>(
spss: &SpectrumPreservingStringSet,
string_range: std::ops::Range<usize>,
k: usize,
m: usize,
config: &BuildConfiguration,
sorter: &ExternalSorter,
buffer_size: usize,
) -> Result<(), std::io::Error>
where
Kmer<K>: KmerBits,
{
let mut buffer: Vec<MinimizerTupleExternal> = Vec::with_capacity(buffer_size);
for string_id in string_range {
let string_len = spss.string_length(string_id as u64);
if string_len < k {
continue;
}
let num_kmers = string_len - k + 1;
let mut minimizer_iter = MinimizerIterator::with_seed(k, m, config.seed);
let string_begin = spss.string_offset(string_id as u64);
minimizer_iter.set_position(string_begin);
let mut prev_mini: Option<MinimizerTupleExternal> = None;
let mut num_kmers_in_super_kmer: u8 = 0;
for kmer_pos in 0..num_kmers {
let kmer = spss.decode_kmer::<K>(string_id as u64, kmer_pos);
let forward_minimizer = minimizer_iter.next(kmer);
let (final_minimizer, final_pos_in_kmer) = if config.canonical {
let kmer_rc = kmer.reverse_complement();
let mut fresh_rc_iter = MinimizerIterator::with_seed(k, m, config.seed);
let rc_minimizer = fresh_rc_iter.next(kmer_rc);
if rc_minimizer.value < forward_minimizer.value {
let adjusted_pos = (k - m) as u8 - rc_minimizer.pos_in_kmer as u8;
(rc_minimizer.value, adjusted_pos)
} else {
(forward_minimizer.value, forward_minimizer.pos_in_kmer as u8)
}
} else {
(forward_minimizer.value, forward_minimizer.pos_in_kmer as u8)
};
let absolute_pos_in_seq = string_begin + kmer_pos as u64 + final_pos_in_kmer as u64;
let current = MinimizerTupleExternal {
minimizer: final_minimizer,
pos_in_seq: absolute_pos_in_seq,
pos_in_kmer: final_pos_in_kmer,
num_kmers_in_super_kmer: 1,
};
if let Some(mut prev) = prev_mini.take() {
if current.minimizer == prev.minimizer
&& current.pos_in_seq == prev.pos_in_seq
{
num_kmers_in_super_kmer = num_kmers_in_super_kmer.saturating_add(1);
prev_mini = Some(prev);
} else {
prev.num_kmers_in_super_kmer = num_kmers_in_super_kmer;
if buffer.len() >= buffer_size {
let mut buf = std::mem::take(&mut buffer);
sorter.sort_and_flush(&mut buf)?;
buffer = buf; }
buffer.push(prev);
prev_mini = Some(current);
num_kmers_in_super_kmer = 1;
}
} else {
prev_mini = Some(current);
num_kmers_in_super_kmer = 1;
}
}
if let Some(mut last) = prev_mini.take() {
last.num_kmers_in_super_kmer = num_kmers_in_super_kmer;
if buffer.len() >= buffer_size {
let mut buf = std::mem::take(&mut buffer);
sorter.sort_and_flush(&mut buf)?;
buffer = buf;
}
buffer.push(last);
}
}
if !buffer.is_empty() {
let mut buf = buffer;
sorter.sort_and_flush(&mut buf)?;
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_minimizer_tuple_creation() {
let tuple = MinimizerTuple::new(12345, 100, 5);
assert_eq!(tuple.minimizer, 12345);
assert_eq!(tuple.pos_in_seq, 100);
assert_eq!(tuple.pos_in_kmer, 5);
assert_eq!(tuple.num_kmers_in_super_kmer, 1);
}
#[test]
fn test_minimizer_tuple_ordering() {
let t1 = MinimizerTuple::new(100, 50, 0);
let t2 = MinimizerTuple::new(200, 50, 0);
let t3 = MinimizerTuple::new(100, 100, 0);
assert!(t1 < t2); assert!(t1 < t3); assert!(t2 > t1);
}
#[test]
fn test_compute_minimizer_tuples_basic() {
use crate::builder::config::BuildConfiguration;
use crate::builder::encode::Encoder;
let config = BuildConfiguration::new(31, 13).unwrap();
let mut encoder = Encoder::<31>::new();
let sequence = "ACGTACGTACGTACGTACGTACGTACGTACG";
encoder.add_sequence(sequence.as_bytes()).unwrap();
let spss = encoder.build(config.m);
let tuples = compute_minimizer_tuples::<31>(&spss, &config);
assert_eq!(tuples.len(), 1);
assert!(tuples[0].pos_in_seq < sequence.len() as u64);
assert_eq!(tuples[0].num_kmers_in_super_kmer, 1);
}
#[test]
fn test_compute_minimizer_tuples_canonical() {
use crate::builder::config::BuildConfiguration;
use crate::builder::encode::Encoder;
let mut config = BuildConfiguration::new(31, 13).unwrap();
config.canonical = true;
let mut encoder = Encoder::<31>::new();
let sequence = "ACGTACGTACGTACGTACGTACGTACGTACG";
encoder.add_sequence(sequence.as_bytes()).unwrap();
let spss = encoder.build(config.m);
let tuples = compute_minimizer_tuples::<31>(&spss, &config);
assert!(!tuples.is_empty());
for tuple in &tuples {
assert!(tuple.pos_in_kmer <= (31 - 13) as u8);
}
}
}