1use crate::{
5 genome_io::GenomeWriter, kmer::reverse_complement, lz_diff::LZDiff,
6 segment_compression::decompress_segment_with_marker,
7};
8use anyhow::{anyhow, Context, Result};
9use ragc_common::{
10 stream_delta_name, stream_ref_name, Archive, CollectionV3, Contig, SegmentDesc, AGC_FILE_MAJOR,
11 AGC_FILE_MINOR, CONTIG_SEPARATOR,
12};
13use std::collections::HashMap;
14use std::fs::File;
15use std::path::Path;
16
17#[derive(Debug, Clone)]
19pub struct DecompressorConfig {
20 pub verbosity: u32,
21}
22
23impl Default for DecompressorConfig {
24 fn default() -> Self {
25 DecompressorConfig { verbosity: 1 }
26 }
27}
28
29pub struct Decompressor {
56 config: DecompressorConfig,
57 archive: Archive,
58 collection: CollectionV3,
59
60 segment_cache: HashMap<u32, Contig>,
62
63 _segment_size: u32,
65 kmer_length: u32,
66 min_match_len: u32,
67
68 archive_path: String,
70}
71
72impl Decompressor {
73 pub fn open(archive_path: &str, config: DecompressorConfig) -> Result<Self> {
75 let mut archive = Archive::new_reader();
76 archive
77 .open(archive_path)
78 .context("Failed to open archive for reading")?;
79
80 let mut collection = CollectionV3::new();
81
82 let (segment_size, kmer_length, min_match_len) = Self::load_params(&mut archive)?;
84
85 if config.verbosity > 1 {
86 eprintln!("Loaded params: segment_size={segment_size}, kmer_length={kmer_length}");
87 }
88
89 collection.set_config(segment_size, kmer_length, None);
91
92 collection.prepare_for_decompression(&archive)?;
94 collection.load_batch_sample_names(&mut archive)?;
95
96 if config.verbosity > 0 {
97 eprintln!(
98 "Loaded archive with {} samples",
99 collection.get_no_samples()
100 );
101 let samples = collection.get_samples_list(false);
102 eprintln!("Sample names: {samples:?}");
103 }
104
105 Ok(Decompressor {
106 config,
107 archive,
108 collection,
109 segment_cache: HashMap::new(),
110 _segment_size: segment_size,
111 kmer_length,
112 min_match_len,
113 archive_path: archive_path.to_string(),
114 })
115 }
116
117 fn load_params(archive: &mut Archive) -> Result<(u32, u32, u32)> {
124 let stream_id = archive
126 .get_stream_id("params")
127 .ok_or_else(|| anyhow!("params stream not found in archive"))?;
128
129 let num_parts = archive.get_num_parts(stream_id);
131 if num_parts != 1 {
132 anyhow::bail!("Expected 1 part in params stream, found {num_parts}");
133 }
134
135 let (data, _metadata) = archive.get_part_by_id(stream_id, 0)?;
137
138 if data.len() < 12 {
140 anyhow::bail!(
141 "params stream too short: {} bytes (expected at least 12)",
142 data.len()
143 );
144 }
145
146 let kmer_length = u32::from_le_bytes([data[0], data[1], data[2], data[3]]);
147 let min_match_len = u32::from_le_bytes([data[4], data[5], data[6], data[7]]);
148 let _pack_cardinality = u32::from_le_bytes([data[8], data[9], data[10], data[11]]);
149
150 let segment_size = if data.len() >= 16 {
152 u32::from_le_bytes([data[12], data[13], data[14], data[15]])
153 } else {
154 60000
156 };
157
158 Ok((segment_size, kmer_length, min_match_len))
161 }
162
163 pub fn list_samples(&self) -> Vec<String> {
165 self.collection.get_samples_list(false)
166 }
167
168 pub fn list_contigs(&mut self, sample_name: &str) -> Result<Vec<String>> {
170 if self.config.verbosity > 1 {
171 eprintln!("list_contigs called for: {sample_name}");
172 eprintln!(
173 "get_no_contigs before load: {:?}",
174 self.collection.get_no_contigs(sample_name)
175 );
176 }
177
178 if self
180 .collection
181 .get_no_contigs(sample_name)
182 .is_none_or(|count| count == 0)
183 {
184 if self.config.verbosity > 1 {
185 eprintln!("Loading contig batch for sample: {sample_name}");
186 }
187 self.collection.load_contig_batch(&mut self.archive, 0)?;
188
189 if self.config.verbosity > 1 {
190 eprintln!(
191 "After load: get_no_contigs = {:?}",
192 self.collection.get_no_contigs(sample_name)
193 );
194 }
195 } else if self.config.verbosity > 1 {
196 eprintln!(
197 "Contig batch already loaded, get_no_contigs = {:?}",
198 self.collection.get_no_contigs(sample_name)
199 );
200 }
201
202 let result = self.collection.get_contig_list(sample_name);
203 if self.config.verbosity > 1 {
204 eprintln!("get_contig_list result: {result:?}");
205 }
206
207 result.ok_or_else(|| anyhow!("Sample not found: {sample_name}"))
208 }
209
210 pub fn get_contig(&mut self, sample_name: &str, contig_name: &str) -> Result<Contig> {
212 if self
214 .collection
215 .get_no_contigs(sample_name)
216 .is_none_or(|count| count == 0)
217 {
218 let _num_samples = self.collection.get_no_samples();
219 self.collection.load_contig_batch(&mut self.archive, 0)?;
220 }
221
222 let segments = self
224 .collection
225 .get_contig_desc(sample_name, contig_name)
226 .ok_or_else(|| anyhow!("Contig not found: {sample_name}/{contig_name}"))?;
227
228 if self.config.verbosity > 1 {
229 eprintln!(
230 "Extracting {}/{} ({} segments)",
231 sample_name,
232 contig_name,
233 segments.len()
234 );
235 for (i, seg) in segments.iter().enumerate() {
236 eprintln!(
237 " Segment[{}]: group_id={}, in_group_id={}, is_rev_comp={}, raw_length={}",
238 i, seg.group_id, seg.in_group_id, seg.is_rev_comp, seg.raw_length
239 );
240 }
241 }
242
243 self.reconstruct_contig(&segments)
245 }
246
247 pub fn get_sample(&mut self, sample_name: &str) -> Result<Vec<(String, Contig)>> {
249 if self
251 .collection
252 .get_no_contigs(sample_name)
253 .is_none_or(|count| count == 0)
254 {
255 let _num_samples = self.collection.get_no_samples();
256 self.collection.load_contig_batch(&mut self.archive, 0)?;
257 }
258
259 let sample_desc = self
260 .collection
261 .get_sample_desc(sample_name)
262 .ok_or_else(|| anyhow!("Sample not found: {sample_name}"))?;
263
264 let mut contigs = Vec::new();
265
266 for (contig_name, segments) in sample_desc {
267 if self.config.verbosity > 1 {
268 eprintln!(
269 "Extracting {}/{} ({} segments)",
270 sample_name,
271 contig_name,
272 segments.len()
273 );
274 }
275
276 let contig_data = self.reconstruct_contig(&segments)?;
277 contigs.push((contig_name, contig_data));
278 }
279
280 Ok(contigs)
281 }
282
283 fn unpack_2bit(packed: &[u8], expected_length: usize) -> Contig {
287 let mut unpacked = Vec::with_capacity(expected_length);
288
289 for &byte in packed {
290 unpacked.push((byte >> 6) & 0x03);
292 unpacked.push((byte >> 4) & 0x03);
293 unpacked.push((byte >> 2) & 0x03);
294 unpacked.push(byte & 0x03);
295 }
296
297 unpacked.truncate(expected_length);
299 unpacked
300 }
301
302 fn reverse_complement_segment(segment: &[u8]) -> Contig {
304 segment
305 .iter()
306 .rev()
307 .map(|&base| reverse_complement(base as u64) as u8)
308 .collect()
309 }
310
311 fn reconstruct_contig(&mut self, segments: &[SegmentDesc]) -> Result<Contig> {
313 let mut contig = Contig::new();
314
315 for (i, segment_desc) in segments.iter().enumerate() {
316 let mut segment_data = self.get_segment(segment_desc)?;
317
318 if segment_desc.is_rev_comp {
320 segment_data = Self::reverse_complement_segment(&segment_data);
321 }
322
323 if i == 0 {
324 contig.extend_from_slice(&segment_data);
326 } else {
327 if segment_data.len() < self.kmer_length as usize {
329 eprintln!(
330 "ERROR: Segment {} too short! Length={}, kmer_length={}",
331 i,
332 segment_data.len(),
333 self.kmer_length
334 );
335 eprintln!(
336 " Segment desc: group_id={}, in_group_id={}, raw_length={}",
337 segment_desc.group_id, segment_desc.in_group_id, segment_desc.raw_length
338 );
339 anyhow::bail!("Corrupted archive: segment too short (segment {}, got {} bytes, need at least {} bytes)", i, segment_data.len(), self.kmer_length);
340 }
341 contig.extend_from_slice(&segment_data[self.kmer_length as usize..]);
342 }
343 }
344
345 Ok(contig)
346 }
347
348 fn unpack_contig(packed_data: &[u8], position_in_pack: usize) -> Result<Contig> {
350 let mut contig_start = 0;
351 let mut current_position = 0;
352
353 for (i, &byte) in packed_data.iter().enumerate() {
354 if byte == CONTIG_SEPARATOR {
355 if current_position == position_in_pack {
356 return Ok(packed_data[contig_start..i].to_vec());
358 }
359 current_position += 1;
360 contig_start = i + 1;
361 }
362 }
363
364 if current_position == position_in_pack {
366 return Ok(packed_data[contig_start..].to_vec());
367 }
368
369 anyhow::bail!(
370 "Position {} not found in packed data (only {} contigs found)",
371 position_in_pack,
372 current_position + 1
373 )
374 }
375
376 fn get_segment(&mut self, desc: &SegmentDesc) -> Result<Contig> {
383 const PACK_CARDINALITY: usize = 50; const NO_RAW_GROUPS: u32 = 16;
385
386 let archive_version = AGC_FILE_MAJOR * 1000 + AGC_FILE_MINOR;
387
388 if self.config.verbosity > 1 {
389 eprintln!(
390 "get_segment: group_id={}, in_group_id={}, raw_length={}",
391 desc.group_id, desc.in_group_id, desc.raw_length
392 );
393 }
394
395 if desc.group_id >= NO_RAW_GROUPS {
397 if !self.segment_cache.contains_key(&desc.group_id) {
401 let ref_stream_name = stream_ref_name(archive_version, desc.group_id);
403 let ref_stream_id = self
404 .archive
405 .get_stream_id(&ref_stream_name)
406 .ok_or_else(|| anyhow!("Reference stream not found: {ref_stream_name}"))?;
407
408 let (mut ref_data, ref_metadata) = self.archive.get_part_by_id(ref_stream_id, 0)?;
409
410 let mut decompressed_ref = if ref_metadata == 0 {
412 ref_data
413 } else {
414 if ref_data.is_empty() {
415 anyhow::bail!("Empty compressed reference data");
416 }
417 let marker = ref_data.pop().unwrap();
418 decompress_segment_with_marker(&ref_data, marker)?
419 };
420
421 let is_packed = decompressed_ref.len() * 4 >= desc.raw_length as usize
425 && decompressed_ref.len() * 4 < desc.raw_length as usize + 8;
426
427 if self.config.verbosity > 2 {
428 eprintln!(
429 " DEBUG: decompressed_len={}, raw_length={}, decompressed*4={}, is_packed={}",
430 decompressed_ref.len(),
431 desc.raw_length,
432 decompressed_ref.len() * 4,
433 is_packed
434 );
435 }
436
437 if is_packed {
438 decompressed_ref =
440 Self::unpack_2bit(&decompressed_ref, desc.raw_length as usize);
441
442 if self.config.verbosity > 1 {
443 eprintln!(
444 "Loaded & unpacked reference for group {}: length={} (was {} bytes packed)",
445 desc.group_id,
446 decompressed_ref.len(),
447 decompressed_ref.len() / 4
448 );
449 }
450 } else if self.config.verbosity > 1 {
451 eprintln!(
452 "Loaded reference for group {}: length={} (already unpacked)",
453 desc.group_id,
454 decompressed_ref.len()
455 );
456 }
457
458 self.segment_cache.insert(desc.group_id, decompressed_ref);
460 }
461
462 if desc.in_group_id == 0 {
464 return Ok(self.segment_cache.get(&desc.group_id).unwrap().clone());
465 }
466
467 let delta_position = (desc.in_group_id - 1) as usize;
470 let pack_id = delta_position / PACK_CARDINALITY;
471 let position_in_pack = delta_position % PACK_CARDINALITY;
472
473 if self.config.verbosity > 1 {
474 eprintln!(
475 " LZ group: delta_position={delta_position}, pack_id={pack_id}, position_in_pack={position_in_pack}"
476 );
477 }
478
479 let delta_stream_name = stream_delta_name(archive_version, desc.group_id);
480 let delta_stream_id =
481 self.archive
482 .get_stream_id(&delta_stream_name)
483 .ok_or_else(|| {
484 eprintln!("ERROR: Delta stream not found: {}", delta_stream_name);
485 eprintln!(
486 " Requested by segment: group_id={}, in_group_id={}, raw_length={}",
487 desc.group_id, desc.in_group_id, desc.raw_length
488 );
489 anyhow!("Delta stream not found: {delta_stream_name}")
490 })?;
491
492 let num_parts = self.archive.get_num_parts(delta_stream_id);
494
495 if pack_id >= num_parts {
496 anyhow::bail!("Pack ID {} out of range (stream has only {} parts). in_group_id={}, delta_position={}",
497 pack_id, num_parts, desc.in_group_id, delta_position);
498 }
499
500 let (mut delta_data, delta_metadata) =
501 self.archive.get_part_by_id(delta_stream_id, pack_id)?;
502
503 let decompressed_pack = if delta_metadata == 0 {
505 delta_data
506 } else {
507 if delta_data.is_empty() {
508 anyhow::bail!("Empty compressed delta data");
509 }
510 let marker = delta_data.pop().unwrap();
511 decompress_segment_with_marker(&delta_data, marker)?
512 };
513
514 let lz_encoded = Self::unpack_contig(&decompressed_pack, position_in_pack)?;
516
517 if self.config.verbosity > 1 {
518 eprintln!("Unpacked LZ-encoded segment: length={}", lz_encoded.len());
519 }
520
521 let reference = self
523 .segment_cache
524 .get(&desc.group_id)
525 .ok_or_else(|| anyhow!("Reference not loaded for group {}", desc.group_id))?;
526
527 let mut lz_diff = LZDiff::new(self.min_match_len);
528 lz_diff.prepare(reference);
529
530 let decoded = if lz_encoded.is_empty() {
532 reference.clone()
533 } else {
534 lz_diff.decode(&lz_encoded)
535 };
536
537 if self.config.verbosity > 1 {
538 eprintln!("Decoded segment: length={}", decoded.len());
539 }
540
541 Ok(decoded)
542 } else {
543 let pack_id = desc.in_group_id as usize / PACK_CARDINALITY;
545 let position_in_pack = desc.in_group_id as usize % PACK_CARDINALITY;
546
547 if self.config.verbosity > 1 {
548 eprintln!(" Raw group: pack_id={pack_id}, position_in_pack={position_in_pack}");
549 }
550
551 let stream_name = stream_delta_name(archive_version, desc.group_id);
552 let stream_id = self
553 .archive
554 .get_stream_id(&stream_name)
555 .ok_or_else(|| anyhow!("Delta stream not found: {stream_name}"))?;
556
557 let (mut data, metadata) = self.archive.get_part_by_id(stream_id, pack_id)?;
558
559 let decompressed_pack = if metadata == 0 {
561 data
562 } else {
563 if data.is_empty() {
564 anyhow::bail!("Empty compressed data");
565 }
566 let marker = data.pop().unwrap();
567 decompress_segment_with_marker(&data, marker)?
568 };
569
570 let contig_data = Self::unpack_contig(&decompressed_pack, position_in_pack)?;
572
573 if self.config.verbosity > 1 {
574 eprintln!("Unpacked raw segment: length={}", contig_data.len());
575 }
576
577 Ok(contig_data)
578 }
579 }
580
581 pub fn list_samples_with_prefix(&self, prefix: &str) -> Vec<String> {
600 self.list_samples()
601 .into_iter()
602 .filter(|s| s.starts_with(prefix))
603 .collect()
604 }
605
606 pub fn get_samples_by_prefix(
629 &mut self,
630 prefix: &str,
631 ) -> Result<HashMap<String, Vec<(String, Contig)>>> {
632 let sample_names = self.list_samples_with_prefix(prefix);
633 let mut results = HashMap::new();
634
635 for sample_name in sample_names {
636 let contigs = self.get_sample(&sample_name)?;
637 results.insert(sample_name, contigs);
638 }
639
640 Ok(results)
641 }
642
643 pub fn clone_for_thread(&self) -> Result<Self> {
676 Self::open(&self.archive_path, self.config.clone())
679 }
680
681 pub fn write_sample_fasta(&mut self, sample_name: &str, output_path: &Path) -> Result<()> {
683 let contigs = self.get_sample(sample_name)?;
684
685 let mut writer = GenomeWriter::<File>::create(output_path)?;
686
687 for (contig_name, contig_data) in contigs {
688 let mut ascii_data = Vec::with_capacity(contig_data.len());
690 for &base in &contig_data {
691 let ascii_base = match base {
692 0 => b'A',
693 1 => b'C',
694 2 => b'G',
695 3 => b'T',
696 _ => b'N',
697 };
698 ascii_data.push(ascii_base);
699 }
700
701 writer.save_contig_directly(&contig_name, &ascii_data, 0)?;
702 }
703
704 Ok(())
705 }
706
707 pub fn close(mut self) -> Result<()> {
709 self.archive.close()
710 }
711}
712
713#[cfg(test)]
714mod tests {
715 use super::*;
716 use crate::{Compressor, CompressorConfig};
717 use std::fs;
718
719 #[test]
720 fn test_decompressor_basic() {
721 let archive_path = "/tmp/test_decompress.agc";
722 let _ = fs::remove_file(archive_path);
723
724 {
726 let config = CompressorConfig::default();
727 let mut compressor = Compressor::new(archive_path, config).unwrap();
728
729 let seq1 = vec![0, 1, 2, 3, 0, 1, 2, 3]; compressor
731 .add_contig("sample1", "chr1", seq1.clone())
732 .unwrap();
733
734 compressor.finalize().unwrap();
735 }
736
737 {
739 let config = DecompressorConfig { verbosity: 2 }; let mut decompressor = Decompressor::open(archive_path, config).unwrap();
741
742 eprintln!("=== Test: list_samples ===");
743 let samples = decompressor.list_samples();
745 eprintln!("Samples: {samples:?}");
746 assert_eq!(samples.len(), 1);
747 assert_eq!(samples[0], "sample1");
748
749 eprintln!("=== Test: list_contigs ===");
750 let contigs = decompressor.list_contigs("sample1").unwrap();
752 eprintln!("Contigs: {contigs:?}");
753 assert_eq!(contigs.len(), 1);
754 assert_eq!(contigs[0], "chr1");
755
756 let extracted = decompressor.get_contig("sample1", "chr1").unwrap();
758 assert_eq!(extracted, vec![0, 1, 2, 3, 0, 1, 2, 3]);
759
760 decompressor.close().unwrap();
761 }
762
763 fs::remove_file(archive_path).unwrap();
764 }
765
766 #[test]
767 fn test_decompressor_multiple_contigs() {
768 let archive_path = "/tmp/test_decompress_multi.agc";
769 let _ = fs::remove_file(archive_path);
770
771 {
773 let config = CompressorConfig::default();
774 let mut compressor = Compressor::new(archive_path, config).unwrap();
775
776 let seq1 = vec![0, 1, 2, 3, 0, 1, 2, 3];
777 let seq2 = vec![3, 2, 1, 0, 3, 2, 1, 0];
778
779 compressor
780 .add_contig("sample1", "chr1", seq1.clone())
781 .unwrap();
782 compressor
783 .add_contig("sample1", "chr2", seq2.clone())
784 .unwrap();
785
786 compressor.finalize().unwrap();
787 }
788
789 {
791 let config = DecompressorConfig { verbosity: 0 };
792 let mut decompressor = Decompressor::open(archive_path, config).unwrap();
793
794 let sample_contigs = decompressor.get_sample("sample1").unwrap();
796 assert_eq!(sample_contigs.len(), 2);
797
798 assert_eq!(sample_contigs[0].0, "chr1");
800 assert_eq!(sample_contigs[0].1, vec![0, 1, 2, 3, 0, 1, 2, 3]);
801
802 assert_eq!(sample_contigs[1].0, "chr2");
803 assert_eq!(sample_contigs[1].1, vec![3, 2, 1, 0, 3, 2, 1, 0]);
804
805 decompressor.close().unwrap();
806 }
807
808 fs::remove_file(archive_path).unwrap();
809 }
810}