use crate::contig_compression::{compress_contig, CompressionContext};
use crate::priority_queue::{BoundedPriorityQueue, PopResult};
use crate::segment_buffer::BufferedSegments;
use crate::segment_compression::compress_reference_segment;
use crate::task::{ContigProcessingStage, Task};
use crate::zstd_pool::compress_segment_pooled;
use ahash::{AHashMap, AHashSet};
use ragc_common::{Archive, Contig};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Barrier, Mutex};
#[derive(Debug, Clone)]
struct SegmentPlacement {
sample_name: String,
contig_name: String,
place: usize, group_id: u32,
in_group_id: u32,
is_rev_comp: bool,
raw_length: u32, }
struct SegmentGroup {
group_id: u32,
stream_id: usize, ref_stream_id: usize, reference: Option<Contig>, ref_written: bool, in_group_counter: u32, }
impl SegmentGroup {
fn new(group_id: u32, stream_id: usize, ref_stream_id: usize) -> Self {
SegmentGroup {
group_id,
stream_id,
ref_stream_id,
reference: None,
ref_written: false,
in_group_counter: 0,
}
}
fn add_segment(&mut self, seg_data: &[u8], archive: &mut Archive) -> anyhow::Result<u32> {
if self.reference.is_none() {
let seg_vec = seg_data.to_vec();
self.reference = Some(seg_vec.clone());
let (compressed, marker) = compress_reference_segment(&seg_vec)?;
archive.add_part(self.ref_stream_id, &compressed, marker as u64)?;
self.ref_written = true;
let in_group_id = self.in_group_counter;
self.in_group_counter += 1;
Ok(in_group_id) } else {
let seg_vec = seg_data.to_vec();
let compressed = compress_segment_pooled(&seg_vec, 17)?;
archive.add_part(self.stream_id, &compressed, 0)?;
let in_group_id = self.in_group_counter;
self.in_group_counter += 1;
Ok(in_group_id)
}
}
}
pub struct SharedCompressorState {
pub processed_bases: AtomicUsize,
pub processed_samples: AtomicUsize,
pub raw_contigs: Mutex<Vec<(String, String, Vec<u8>)>>,
pub verbosity: usize,
pub buffered_segments: Arc<Mutex<BufferedSegments>>,
pub splitters: Arc<Mutex<AHashSet<u64>>>,
pub bloom_splitters: Arc<Mutex<crate::bloom_filter::BloomFilter>>,
pub vv_splitters: Mutex<Vec<Vec<u64>>>,
pub v_candidate_kmers: Vec<u64>,
pub v_duplicated_kmers: Vec<u64>,
pub kmer_length: usize,
pub adaptive_mode: bool,
pub map_segments: Arc<Mutex<AHashMap<(u64, u64), u32>>>,
pub map_segments_terminators: Arc<Mutex<AHashMap<u64, Vec<u64>>>>,
pub concatenated_genomes: bool,
pub no_segments: Arc<Mutex<u32>>,
pub no_raw_groups: u32,
pub archive: Option<Arc<Mutex<Archive>>>,
pub v_segments: Arc<Mutex<Vec<Option<SegmentGroup>>>>,
pub collection: Option<Arc<Mutex<ragc_common::CollectionV3>>>,
pub aux_queue: Arc<BoundedPriorityQueue<Task>>,
pub working_queue: Mutex<Arc<BoundedPriorityQueue<Task>>>,
}
impl SharedCompressorState {
pub fn new(
verbosity: usize,
kmer_length: usize,
adaptive_mode: bool,
concatenated_genomes: bool,
no_raw_groups: u32,
main_queue: Arc<BoundedPriorityQueue<Task>>,
) -> Self {
let bloom_filter = crate::bloom_filter::BloomFilter::new(80 * 1024);
let aux_queue = Arc::new(BoundedPriorityQueue::new(1, usize::MAX));
SharedCompressorState {
processed_bases: AtomicUsize::new(0),
processed_samples: AtomicUsize::new(0),
raw_contigs: Mutex::new(Vec::new()),
verbosity,
buffered_segments: Arc::new(Mutex::new(BufferedSegments::new(no_raw_groups as usize))),
splitters: Arc::new(Mutex::new(AHashSet::new())),
bloom_splitters: Arc::new(Mutex::new(bloom_filter)),
vv_splitters: Mutex::new(Vec::new()),
v_candidate_kmers: Vec::new(),
v_duplicated_kmers: Vec::new(),
kmer_length,
adaptive_mode,
map_segments: Arc::new(Mutex::new(AHashMap::new())),
map_segments_terminators: Arc::new(Mutex::new(AHashMap::new())),
concatenated_genomes,
no_segments: Arc::new(Mutex::new(0)),
no_raw_groups,
archive: None, v_segments: Arc::new(Mutex::new(Vec::new())),
collection: None, aux_queue,
working_queue: Mutex::new(main_queue), }
}
}
pub fn worker_thread(
worker_id: usize,
num_workers: usize,
_queue: Arc<BoundedPriorityQueue<Task>>, barrier: Arc<Barrier>,
shared: Arc<SharedCompressorState>,
) {
loop {
let queue = {
let working_queue_guard = shared.working_queue.lock().unwrap();
Arc::clone(&*working_queue_guard)
};
let (result, task_opt) = queue.pop_large();
match result {
PopResult::Empty => continue,
PopResult::Completed => break,
PopResult::Normal => {
let task = task_opt.expect("PopResult::Normal should have task");
match task.stage {
ContigProcessingStage::Registration => {
handle_registration_stage(worker_id, &barrier, &shared);
continue; }
ContigProcessingStage::NewSplitters => {
handle_new_splitters_stage(worker_id, num_workers, &barrier, &shared);
continue; }
ContigProcessingStage::AllContigs => {
}
ContigProcessingStage::HardContigs => {
}
}
let ctg_size = task.sequence.len();
if compress_contig_task(&task, worker_id, &barrier, &shared) {
let old_pb = shared
.processed_bases
.fetch_add(ctg_size, Ordering::Relaxed);
let new_pb = old_pb + ctg_size;
if shared.verbosity > 0 && old_pb / 10_000_000 != new_pb / 10_000_000 {
eprintln!("Compressed: {} Mb\r", new_pb / 1_000_000);
}
} else {
let mut raw_contigs = shared.raw_contigs.lock().unwrap();
raw_contigs.push((
task.sample_name.clone(),
task.contig_name.clone(),
task.sequence.clone(),
));
}
}
}
}
}
fn handle_registration_stage(
worker_id: usize,
barrier: &Arc<Barrier>,
shared: &Arc<SharedCompressorState>,
) {
let wait_result = barrier.wait();
if wait_result.is_leader() {
register_segments(shared);
}
barrier.wait();
store_segments(worker_id, shared);
barrier.wait();
if worker_id == 0 {
} else if worker_id == 1 {
}
barrier.wait();
}
fn find_new_splitters(
contig: &ragc_common::Contig,
thread_id: usize,
shared: &Arc<SharedCompressorState>,
) {
use crate::kmer_extract::{enumerate_kmers, remove_non_singletons};
let mut v_contig_kmers = enumerate_kmers(contig, shared.kmer_length);
v_contig_kmers.sort_unstable();
remove_non_singletons(&mut v_contig_kmers, 0);
if shared.verbosity > 1 {
eprintln!(
"find_new_splitters: contig has {} singleton k-mers",
v_contig_kmers.len()
);
}
let mut v_tmp = Vec::with_capacity(v_contig_kmers.len());
set_difference(&v_contig_kmers, &shared.v_candidate_kmers, &mut v_tmp);
if shared.verbosity > 1 {
eprintln!(
"find_new_splitters: {} k-mers after excluding reference singletons",
v_tmp.len()
);
}
v_contig_kmers.clear();
set_difference(&v_tmp, &shared.v_duplicated_kmers, &mut v_contig_kmers);
if shared.verbosity > 1 {
eprintln!(
"find_new_splitters: {} NEW splitters found",
v_contig_kmers.len()
);
}
let mut vv_splitters = shared.vv_splitters.lock().unwrap();
vv_splitters[thread_id].extend(v_contig_kmers);
}
fn set_difference(a: &[u64], b: &[u64], result: &mut Vec<u64>) {
result.clear();
let mut i = 0;
let mut j = 0;
while i < a.len() && j < b.len() {
if a[i] < b[j] {
result.push(a[i]);
i += 1;
} else if a[i] > b[j] {
j += 1;
} else {
i += 1;
j += 1;
}
}
result.extend_from_slice(&a[i..]);
}
fn handle_new_splitters_stage(
worker_id: usize,
num_workers: usize,
barrier: &Arc<Barrier>,
shared: &Arc<SharedCompressorState>,
) {
barrier.wait();
if num_workers > 1 || worker_id == 0 {
let mut splitters = shared.splitters.lock().unwrap();
let mut bloom = shared.bloom_splitters.lock().unwrap();
let mut vv_splitters = shared.vv_splitters.lock().unwrap();
let mut total_new = 0;
for thread_splitters in vv_splitters.iter_mut() {
total_new += thread_splitters.len();
for &kmer in thread_splitters.iter() {
splitters.insert(kmer);
bloom.insert(kmer);
}
thread_splitters.clear();
}
if shared.verbosity > 0 && total_new > 0 {
eprintln!("Adaptive mode: Added {} new splitters", total_new);
eprintln!("Total splitters: {}", splitters.len());
}
let filling_factor = bloom.filling_factor();
if filling_factor > 0.3 {
if shared.verbosity > 1 {
eprintln!(
"Bloom filter filling factor {:.2}, resizing...",
filling_factor
);
}
let new_size_bits = (splitters.len() as f64 / 0.25) as usize * 8; bloom.resize(new_size_bits);
for &kmer in splitters.iter() {
bloom.insert(kmer);
}
if shared.verbosity > 1 {
eprintln!("Bloom filter resized to {} bits", new_size_bits);
}
}
}
if worker_id == 0 {
let mut raw_contigs = shared.raw_contigs.lock().unwrap();
if shared.verbosity > 0 && !raw_contigs.is_empty() {
eprintln!(
"Adaptive mode: Re-enqueueing {} hard contigs for reprocessing",
raw_contigs.len()
);
}
for (sample_name, contig_name, sequence) in raw_contigs.drain(..) {
let cost = sequence.len();
shared.aux_queue.emplace(
Task::new_contig(
sample_name,
contig_name,
sequence,
ContigProcessingStage::HardContigs,
),
1, cost,
);
}
shared.aux_queue.emplace_many_no_cost(
Task::new_sync(ContigProcessingStage::Registration),
0, num_workers,
);
let mut working_queue = shared.working_queue.lock().unwrap();
*working_queue = Arc::clone(&shared.aux_queue);
if shared.verbosity > 1 {
eprintln!("Switched to auxiliary queue for hard contig reprocessing");
}
}
barrier.wait();
}
fn compress_contig_task(
task: &Task,
worker_id: usize,
_barrier: &Arc<Barrier>,
shared: &Arc<SharedCompressorState>,
) -> bool {
let ctx = CompressionContext {
splitters: Arc::clone(&shared.splitters),
bloom_splitters: Arc::clone(&shared.bloom_splitters),
buffered_segments: Arc::clone(&shared.buffered_segments),
kmer_length: shared.kmer_length,
adaptive_mode: shared.adaptive_mode,
map_segments: Arc::clone(&shared.map_segments),
map_segments_terminators: Arc::clone(&shared.map_segments_terminators),
concatenated_genomes: shared.concatenated_genomes,
};
let success = compress_contig(&task.sample_name, &task.contig_name, &task.sequence, &ctx);
if shared.adaptive_mode
&& !success
&& task.stage == ContigProcessingStage::AllContigs
&& task.sequence.len() >= 1000
{
if shared.verbosity > 1 {
eprintln!(
"Hard contig detected: {}/{} ({} bp)",
task.sample_name,
task.contig_name,
task.sequence.len()
);
}
find_new_splitters(&task.sequence, worker_id, shared);
let mut raw_contigs = shared.raw_contigs.lock().unwrap();
raw_contigs.push((
task.sample_name.clone(),
task.contig_name.clone(),
task.sequence.clone(),
));
return false;
}
success
}
fn register_segments(shared: &Arc<SharedCompressorState>) {
let mut buffered = shared.buffered_segments.lock().unwrap();
buffered.sort_known(1);
let no_new = buffered.process_new(&shared.map_segments);
drop(buffered);
if no_new > 0 {
let mut no_segments = shared.no_segments.lock().unwrap();
let current_no_segments = *no_segments;
if let Some(archive_mutex) = &shared.archive {
let mut archive = archive_mutex.lock().unwrap();
for i in 0..no_new {
let seg_num = current_no_segments + i;
archive.register_stream(&ragc_common::stream_delta_name(3000, seg_num));
archive.register_stream(&ragc_common::stream_ref_name(3000, seg_num));
}
}
*no_segments += no_new;
drop(no_segments);
let no_segments_value = *shared.no_segments.lock().unwrap();
let mut v_segments = shared.v_segments.lock().unwrap();
v_segments.resize_with(no_segments_value as usize, || None);
}
if shared.no_raw_groups > 0 {
let buffered = shared.buffered_segments.lock().unwrap();
buffered.distribute_segments(0, 0, shared.no_raw_groups);
}
let buffered = shared.buffered_segments.lock().unwrap();
buffered.restart_read_vec();
}
fn store_segments(_worker_id: usize, shared: &Arc<SharedCompressorState>) {
const MAX_BUFF_SIZE: usize = 32;
let mut buffered_placements: Vec<SegmentPlacement> = Vec::with_capacity(MAX_BUFF_SIZE);
let buffered = shared.buffered_segments.lock().unwrap();
loop {
let block_group_id = buffered.get_vec_id();
if block_group_id < 0 {
break; }
for group_id in
(block_group_id - crate::segment_buffer::PART_ID_STEP + 1..=block_group_id).rev()
{
if buffered.is_empty_part(group_id) {
continue;
}
while let Some((
kmer1,
kmer2,
sample_name,
contig_name,
seg_data,
is_rev_comp,
seg_part_no,
)) = buffered.get_part(group_id)
{
let mut v_segments = shared.v_segments.lock().unwrap();
if group_id >= 0
&& (group_id as usize) < v_segments.len()
&& v_segments[group_id as usize].is_none()
{
if let Some(archive_mutex) = &shared.archive {
let archive = archive_mutex.lock().unwrap();
let stream_id = archive
.get_stream_id(&ragc_common::stream_delta_name(3000, group_id as u32))
.unwrap_or(0);
let ref_stream_id = archive
.get_stream_id(&ragc_common::stream_ref_name(3000, group_id as u32))
.unwrap_or(0);
drop(archive);
v_segments[group_id as usize] =
Some(SegmentGroup::new(group_id as u32, stream_id, ref_stream_id));
}
}
let mut in_group_id = 0;
if group_id >= 0 && (group_id as usize) < v_segments.len() {
if let Some(segment_group) = &mut v_segments[group_id as usize] {
if let Some(archive_mutex) = &shared.archive {
let mut archive = archive_mutex.lock().unwrap();
if let Ok(id) = segment_group.add_segment(&seg_data, &mut archive) {
in_group_id = id;
}
}
}
}
drop(v_segments);
buffered_placements.push(SegmentPlacement {
sample_name,
contig_name,
place: seg_part_no as usize,
group_id: group_id as u32,
in_group_id,
is_rev_comp,
raw_length: seg_data.len() as u32,
});
if buffered_placements.len() >= MAX_BUFF_SIZE {
if let Some(collection_mutex) = &shared.collection {
let mut collection = collection_mutex.lock().unwrap();
for placement in &buffered_placements {
let _ = collection.add_segment_placed(
&placement.sample_name,
&placement.contig_name,
placement.place,
placement.group_id,
placement.in_group_id,
placement.is_rev_comp,
placement.raw_length,
);
}
}
buffered_placements.clear();
}
let key = (kmer1, kmer2);
let mut map_segments = shared.map_segments.lock().unwrap();
map_segments
.entry(key)
.and_modify(|existing| {
if (group_id as u32) < *existing {
*existing = group_id as u32;
}
})
.or_insert(group_id as u32);
drop(map_segments);
if kmer1 != crate::contig_compression::MISSING_KMER
&& kmer2 != crate::contig_compression::MISSING_KMER
{
let mut terminators = shared.map_segments_terminators.lock().unwrap();
let list1 = terminators.entry(kmer1).or_insert_with(Vec::new);
if !list1.contains(&kmer2) {
list1.push(kmer2);
list1.sort_unstable();
}
if kmer1 != kmer2 {
let list2 = terminators.entry(kmer2).or_insert_with(Vec::new);
if !list2.contains(&kmer1) {
list2.push(kmer1);
list2.sort_unstable();
}
}
drop(terminators);
}
}
}
}
if !buffered_placements.is_empty() {
if let Some(collection_mutex) = &shared.collection {
let mut collection = collection_mutex.lock().unwrap();
for placement in &buffered_placements {
let _ = collection.add_segment_placed(
&placement.sample_name,
&placement.contig_name,
placement.place,
placement.group_id,
placement.in_group_id,
placement.is_rev_comp,
placement.raw_length,
);
}
}
}
}
pub fn create_agc_archive(
output_path: &str,
sample_files: Vec<(String, String)>,
splitters: AHashSet<u64>,
candidate_kmers: AHashSet<u64>,
duplicated_kmers: AHashSet<u64>,
kmer_length: usize,
segment_size: u32,
num_threads: usize,
adaptive_mode: bool,
concatenated_genomes: bool,
verbosity: usize,
) -> anyhow::Result<()> {
use ragc_common::CollectionV3;
if sample_files.is_empty() {
return Ok(());
}
let num_samples = sample_files.len();
let mut archive = Archive::new_writer();
archive.open(output_path)?;
{
let mut data = Vec::new();
let append_str = |data: &mut Vec<u8>, s: &str| {
data.extend_from_slice(s.as_bytes());
data.push(0);
};
append_str(&mut data, "producer");
append_str(&mut data, "ragc");
append_str(&mut data, "producer_version_major");
append_str(&mut data, &ragc_common::AGC_FILE_MAJOR.to_string());
append_str(&mut data, "producer_version_minor");
append_str(&mut data, &ragc_common::AGC_FILE_MINOR.to_string());
append_str(&mut data, "producer_version_build");
append_str(&mut data, "0");
append_str(&mut data, "file_version_major");
append_str(&mut data, &ragc_common::AGC_FILE_MAJOR.to_string());
append_str(&mut data, "file_version_minor");
append_str(&mut data, &ragc_common::AGC_FILE_MINOR.to_string());
append_str(&mut data, "comment");
append_str(
&mut data,
&format!(
"RAGC v.{}.{}",
ragc_common::AGC_FILE_MAJOR,
ragc_common::AGC_FILE_MINOR
),
);
let stream_id = archive.register_stream("file_type_info");
archive.add_part(stream_id, &data, 7)?;
if verbosity > 0 {
eprintln!(
"Wrote file_type_info: version {}.{}",
ragc_common::AGC_FILE_MAJOR,
ragc_common::AGC_FILE_MINOR
);
}
}
{
let params_stream_id = archive.register_stream("params");
let mut params_data = Vec::new();
params_data.extend_from_slice(&(kmer_length as u32).to_le_bytes());
params_data.extend_from_slice(&20u32.to_le_bytes()); params_data.extend_from_slice(&50u32.to_le_bytes()); params_data.extend_from_slice(&segment_size.to_le_bytes());
archive.add_part(params_stream_id, ¶ms_data, 0)?;
if verbosity > 0 {
eprintln!(
"Wrote params: k={}, segment_size={}",
kmer_length, segment_size
);
}
}
let mut collection = CollectionV3::new();
collection.set_config(segment_size, kmer_length as u32, None);
collection.prepare_for_compression(&mut archive)?;
let archive = Arc::new(Mutex::new(archive));
let collection = Arc::new(Mutex::new(collection));
compress_samples_streaming_with_archive(
sample_files,
splitters,
candidate_kmers,
duplicated_kmers,
kmer_length,
num_threads,
adaptive_mode,
concatenated_genomes,
verbosity,
Some(archive.clone()),
Some(collection.clone()),
)
.map_err(|e| anyhow::anyhow!(e))?;
{
let mut archive_guard = archive.lock().unwrap();
let mut collection_guard = collection.lock().unwrap();
if verbosity > 0 {
eprintln!(
"Serializing collection metadata for {} samples...",
num_samples
);
}
collection_guard.store_batch_sample_names(&mut archive_guard)?;
collection_guard.store_contig_batch(&mut archive_guard, 0, num_samples)?;
if verbosity > 0 {
eprintln!("Collection metadata serialized successfully");
}
}
let mut archive = archive.lock().unwrap();
archive.close()?;
if verbosity > 0 {
eprintln!("AGC archive created: {}", output_path);
}
Ok(())
}
fn compress_samples_streaming_with_archive(
sample_files: Vec<(String, String)>,
splitters: AHashSet<u64>,
candidate_kmers: AHashSet<u64>,
duplicated_kmers: AHashSet<u64>,
kmer_length: usize,
num_threads: usize,
adaptive_mode: bool,
concatenated_genomes: bool,
verbosity: usize,
archive: Option<Arc<Mutex<Archive>>>,
collection: Option<Arc<Mutex<ragc_common::CollectionV3>>>,
) -> Result<(), String> {
use crate::genome_io::GenomeIO;
if sample_files.is_empty() {
return Ok(());
}
let queue_capacity = std::cmp::max(2_u64 << 30, num_threads as u64 * (192_u64 << 20));
let queue = Arc::new(BoundedPriorityQueue::new(1, queue_capacity as usize));
let no_workers = if num_threads < 8 {
num_threads
} else {
num_threads - 1
};
let mut v_candidate_kmers: Vec<u64> = candidate_kmers.into_iter().collect();
v_candidate_kmers.sort_unstable();
let mut v_duplicated_kmers: Vec<u64> = duplicated_kmers.into_iter().collect();
v_duplicated_kmers.sort_unstable();
let mut bloom_filter = crate::bloom_filter::BloomFilter::new(
splitters.len() * 8, );
for &kmer in &splitters {
bloom_filter.insert(kmer);
}
let aux_queue = Arc::new(BoundedPriorityQueue::new(1, usize::MAX));
let shared = Arc::new(SharedCompressorState {
processed_bases: AtomicUsize::new(0),
processed_samples: AtomicUsize::new(0),
raw_contigs: Mutex::new(Vec::new()),
verbosity,
buffered_segments: Arc::new(Mutex::new(BufferedSegments::new(0))),
splitters: Arc::new(Mutex::new(splitters)),
bloom_splitters: Arc::new(Mutex::new(bloom_filter)),
vv_splitters: Mutex::new(vec![Vec::new(); no_workers]),
v_candidate_kmers,
v_duplicated_kmers,
kmer_length,
adaptive_mode,
map_segments: Arc::new(Mutex::new(AHashMap::new())),
map_segments_terminators: Arc::new(Mutex::new(AHashMap::new())),
concatenated_genomes,
no_segments: Arc::new(Mutex::new(0)),
no_raw_groups: 0,
archive,
v_segments: Arc::new(Mutex::new(Vec::new())),
collection,
aux_queue,
working_queue: Mutex::new(Arc::clone(&queue)), });
let barrier = Arc::new(Barrier::new(no_workers));
let mut worker_handles = Vec::with_capacity(no_workers);
for worker_id in 0..no_workers {
let q = queue.clone();
let b = barrier.clone();
let s = shared.clone();
let handle = std::thread::spawn(move || {
worker_thread(worker_id, no_workers, q, b, s);
});
worker_handles.push(handle);
}
let mut sample_priority = usize::MAX;
let mut _cnt_contigs_in_sample = 0;
const PACK_CARDINALITY: usize = 50;
for (sample_name, file_path) in sample_files {
if verbosity > 0 {
eprintln!("Processing sample: {} from {}", sample_name, file_path);
}
let mut gio = match GenomeIO::open(&file_path) {
Ok(g) => g,
Err(e) => {
eprintln!("Cannot open file {}: {}", file_path, e);
continue;
}
};
let mut any_contigs_added = false;
while let Ok(Some((contig_name, sequence))) = gio.read_contig_raw() {
if concatenated_genomes {
let concat_sample_name = String::from("");
if let Some(collection_mutex) = &shared.collection {
let mut collection = collection_mutex.lock().unwrap();
let _ = collection.register_sample_contig(&concat_sample_name, &contig_name);
}
let cost = sequence.len();
queue.emplace(
Task::new_contig(
concat_sample_name,
contig_name.clone(),
sequence,
ContigProcessingStage::AllContigs,
),
sample_priority,
cost,
);
any_contigs_added = true;
} else {
if let Some(collection_mutex) = &shared.collection {
let mut collection = collection_mutex.lock().unwrap();
let _ = collection.register_sample_contig(&sample_name, &contig_name);
}
let cost = sequence.len();
queue.emplace(
Task::new_contig(
sample_name.clone(),
contig_name.clone(),
sequence,
ContigProcessingStage::AllContigs,
),
sample_priority,
cost,
);
any_contigs_added = true;
}
}
if !concatenated_genomes && any_contigs_added {
let sync_stage = if adaptive_mode {
ContigProcessingStage::NewSplitters
} else {
ContigProcessingStage::Registration
};
queue.emplace_many_no_cost(Task::new_sync(sync_stage), sample_priority, no_workers);
sample_priority -= 1;
}
}
queue.mark_completed();
for handle in worker_handles {
handle.join().map_err(|_| "Worker thread panicked")?;
}
if verbosity > 0 {
let total_bases = shared.processed_bases.load(Ordering::Relaxed);
eprintln!("Compression complete: {} bases processed", total_bases);
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_shared_state_creation() {
let queue = Arc::new(BoundedPriorityQueue::new(1, 1000));
let state = SharedCompressorState::new(1, 21, false, false, 0, queue);
assert_eq!(state.verbosity, 1);
assert_eq!(state.kmer_length, 21);
assert_eq!(state.adaptive_mode, false);
assert_eq!(state.concatenated_genomes, false);
assert_eq!(state.no_raw_groups, 0);
assert_eq!(state.processed_bases.load(Ordering::Relaxed), 0);
assert_eq!(state.processed_samples.load(Ordering::Relaxed), 0);
assert_eq!(state.raw_contigs.lock().unwrap().len(), 0);
assert_eq!(*state.no_segments.lock().unwrap(), 0);
}
#[test]
fn test_worker_thread_completion() {
use std::thread;
let queue = Arc::new(BoundedPriorityQueue::new(1, 1000));
let barrier = Arc::new(Barrier::new(2));
let shared = Arc::new(SharedCompressorState::new(
0,
21,
false,
false,
0,
queue.clone(),
));
queue.mark_completed();
let handles: Vec<_> = (0..2)
.map(|worker_id| {
let q = queue.clone();
let b = barrier.clone();
let s = shared.clone();
thread::spawn(move || {
worker_thread(worker_id, 2, q, b, s);
})
})
.collect();
for handle in handles {
handle.join().unwrap();
}
}
#[test]
fn test_worker_thread_with_task() {
use std::thread;
let queue = Arc::new(BoundedPriorityQueue::new(1, 1000));
let barrier = Arc::new(Barrier::new(1));
let shared = Arc::new(SharedCompressorState::new(
0,
21,
false,
false,
0,
queue.clone(),
));
queue.emplace(
Task::new_contig(
"sample1".to_string(),
"chr1".to_string(),
vec![0, 1, 2, 3, 0, 1, 2, 3], ContigProcessingStage::AllContigs,
),
100,
8,
);
queue.mark_completed();
let q = queue.clone();
let b = barrier.clone();
let s = shared.clone();
let handle = thread::spawn(move || {
worker_thread(0, 1, q, b, s);
});
handle.join().unwrap();
assert_eq!(shared.processed_bases.load(Ordering::Relaxed), 8);
}
}