ragc_core/
compressor.rs

1// AGC Compressor
2// Orchestrates the compression pipeline: FASTA → segments → compression → archive
3
4use crate::{
5    genome_io::GenomeIO,
6    kmer::{Kmer, KmerMode},
7    lz_diff::LZDiff,
8    segment_compression::compress_segment,
9};
10use anyhow::{Context, Result};
11use ragc_common::{
12    stream_delta_name, Archive, CollectionV3, Contig, AGC_FILE_MAJOR, AGC_FILE_MINOR,
13    CONTIG_SEPARATOR,
14};
15use std::collections::HashMap;
16use std::io::Read;
17use std::path::Path;
18
19/// Configuration for the compressor
20#[derive(Debug, Clone)]
21pub struct CompressorConfig {
22    pub kmer_length: u32,
23    pub segment_size: u32,
24    pub min_match_len: u32,
25    pub verbosity: u32,
26}
27
28impl Default for CompressorConfig {
29    fn default() -> Self {
30        CompressorConfig {
31            kmer_length: 21,
32            segment_size: 1000,
33            min_match_len: 15,
34            verbosity: 1,
35        }
36    }
37}
38
39/// A segment with its metadata
40#[derive(Debug, Clone)]
41struct SegmentInfo {
42    sample_name: String,
43    contig_name: String,
44    seg_part_no: usize,
45    data: Contig,
46    #[allow(dead_code)]
47    kmer_front: u64,
48    #[allow(dead_code)]
49    kmer_back: u64,
50    is_rev_comp: bool,
51}
52
53/// Segment group identified by flanking k-mers
54#[derive(Debug, Clone, PartialEq, Eq, Hash)]
55struct SegmentGroupKey {
56    kmer_front: u64,
57    kmer_back: u64,
58}
59
60/// AGC Compressor
61pub struct Compressor {
62    config: CompressorConfig,
63    archive: Archive,
64    collection: CollectionV3,
65
66    // Segment storage
67    segment_groups: HashMap<SegmentGroupKey, Vec<SegmentInfo>>,
68    group_references: HashMap<SegmentGroupKey, Vec<u8>>, // First segment in each group
69
70    // Statistics
71    total_bases_processed: usize,
72    total_segments: usize,
73}
74
75impl Compressor {
76    /// Create a new compressor
77    pub fn new(archive_path: &str, config: CompressorConfig) -> Result<Self> {
78        let mut archive = Archive::new_writer();
79        archive
80            .open(archive_path)
81            .context("Failed to create archive")?;
82
83        let mut collection = CollectionV3::new();
84        collection.set_config(config.segment_size, config.kmer_length, None);
85        collection.prepare_for_compression(&mut archive)?;
86
87        Ok(Compressor {
88            config,
89            archive,
90            collection,
91            segment_groups: HashMap::new(),
92            group_references: HashMap::new(),
93            total_bases_processed: 0,
94            total_segments: 0,
95        })
96    }
97
98    /// Add a FASTA file to the archive
99    pub fn add_fasta_file(&mut self, sample_name: &str, fasta_path: &Path) -> Result<()> {
100        if self.config.verbosity > 0 {
101            println!("Processing sample: {sample_name} from {fasta_path:?}");
102        }
103
104        let mut reader =
105            GenomeIO::<Box<dyn Read>>::open(fasta_path).context("Failed to open FASTA file")?;
106
107        // Read contigs with conversion (ASCII -> numeric)
108        while let Some((contig_name, sequence)) = reader.read_contig_converted()? {
109            self.add_contig(sample_name, &contig_name, sequence)?;
110        }
111
112        Ok(())
113    }
114
115    /// Add a contig to the archive
116    pub fn add_contig(
117        &mut self,
118        sample_name: &str,
119        contig_name: &str,
120        sequence: Contig,
121    ) -> Result<()> {
122        if sequence.is_empty() {
123            return Ok(());
124        }
125
126        self.total_bases_processed += sequence.len();
127
128        // Register in collection
129        self.collection
130            .register_sample_contig(sample_name, contig_name)?;
131
132        // For now, treat entire contig as a single segment
133        // TODO: Add splitter-based segmentation later
134        self.add_segment(sample_name, contig_name, 0, sequence)?;
135
136        Ok(())
137    }
138
139    /// Add a segment to the compressor
140    #[allow(clippy::needless_range_loop)]
141    fn add_segment(
142        &mut self,
143        sample_name: &str,
144        contig_name: &str,
145        seg_part_no: usize,
146        segment: Contig,
147    ) -> Result<()> {
148        // Extract flanking k-mers
149        let k = self.config.kmer_length;
150
151        let (kmer_front, kmer_back, is_rev_comp) = if segment.len() >= (k * 2) as usize {
152            // Extract front k-mer
153            let mut front = Kmer::new(k, KmerMode::Canonical);
154            for i in 0..(k as usize) {
155                if segment[i] > 3 {
156                    // Invalid base, use 0
157                    front.reset();
158                    break;
159                }
160                front.insert(segment[i] as u64);
161            }
162            let front_kmer_val = if front.is_full() { front.data() } else { 0 };
163
164            // Extract back k-mer
165            let mut back = Kmer::new(k, KmerMode::Canonical);
166            let start = segment.len() - (k as usize);
167            for i in 0..(k as usize) {
168                if segment[start + i] > 3 {
169                    // Invalid base, use 0
170                    back.reset();
171                    break;
172                }
173                back.insert(segment[start + i] as u64);
174            }
175            let back_kmer_val = if back.is_full() { back.data() } else { 0 };
176
177            (front_kmer_val, back_kmer_val, false)
178        } else {
179            // Too short for proper k-mers
180            (0u64, 0u64, false)
181        };
182
183        self.add_segment_with_kmers(
184            sample_name,
185            contig_name,
186            seg_part_no,
187            segment,
188            kmer_front,
189            kmer_back,
190            is_rev_comp,
191        )
192    }
193
194    /// Add a segment with known k-mers
195    #[allow(clippy::too_many_arguments)]
196    fn add_segment_with_kmers(
197        &mut self,
198        sample_name: &str,
199        contig_name: &str,
200        seg_part_no: usize,
201        segment: Contig,
202        kmer_front: u64,
203        kmer_back: u64,
204        is_rev_comp: bool,
205    ) -> Result<()> {
206        let key = SegmentGroupKey {
207            kmer_front,
208            kmer_back,
209        };
210
211        let seg_info = SegmentInfo {
212            sample_name: sample_name.to_string(),
213            contig_name: contig_name.to_string(),
214            seg_part_no,
215            data: segment,
216            kmer_front,
217            kmer_back,
218            is_rev_comp,
219        };
220
221        self.segment_groups.entry(key).or_default().push(seg_info);
222        self.total_segments += 1;
223
224        Ok(())
225    }
226
227    /// Store params stream (C++ compatibility)
228    /// Format: kmer_length (u32) + min_match_len (u32) + pack_cardinality (u32) + segment_size (u32)
229    fn store_params_stream(&mut self) -> Result<()> {
230        let mut params_data = Vec::new();
231
232        // Append u32 values in little-endian format (matching C++ append function)
233        let append_u32 = |data: &mut Vec<u8>, value: u32| {
234            data.extend_from_slice(&value.to_le_bytes());
235        };
236
237        append_u32(&mut params_data, self.config.kmer_length);
238        append_u32(&mut params_data, self.config.min_match_len);
239        append_u32(&mut params_data, 50); // pack_cardinality - default 50 (TODO: make configurable)
240        append_u32(&mut params_data, self.config.segment_size);
241
242        let stream_id = self.archive.register_stream("params");
243        self.archive
244            .add_part(stream_id, &params_data, params_data.len() as u64)?;
245
246        Ok(())
247    }
248
249    /// Store empty splitters stream (C++ compatibility)
250    fn store_splitters_stream(&mut self) -> Result<()> {
251        // For now, store empty splitters (no segmentation yet)
252        let splitters_data = Vec::new();
253        let stream_id = self.archive.register_stream("splitters");
254        self.archive.add_part(stream_id, &splitters_data, 0)?;
255        Ok(())
256    }
257
258    /// Store empty segment-splitters stream (C++ compatibility)
259    fn store_segment_splitters_stream(&mut self) -> Result<()> {
260        // For now, store empty segment-splitters (no segmentation yet)
261        let seg_splitters_data = Vec::new();
262        let stream_id = self.archive.register_stream("segment-splitters");
263        self.archive.add_part(stream_id, &seg_splitters_data, 0)?;
264        Ok(())
265    }
266
267    /// Store file_type_info stream (C++ compatibility)
268    /// Format: null-terminated string pairs (key, value, key, value, ...)
269    /// C++ reads this first to determine archive version
270    fn store_file_type_info(&mut self) -> Result<()> {
271        let mut data = Vec::new();
272
273        // Helper to append null-terminated string
274        let append_str = |data: &mut Vec<u8>, s: &str| {
275            data.extend_from_slice(s.as_bytes());
276            data.push(0); // null terminator
277        };
278
279        // Add key-value pairs (C++ expects these specific keys)
280        append_str(&mut data, "producer");
281        append_str(&mut data, "agc-rust");
282
283        append_str(&mut data, "producer_version_major");
284        append_str(&mut data, &AGC_FILE_MAJOR.to_string());
285
286        append_str(&mut data, "producer_version_minor");
287        append_str(&mut data, &AGC_FILE_MINOR.to_string());
288
289        append_str(&mut data, "producer_version_build");
290        append_str(&mut data, "0"); // TODO: Add actual build version
291
292        append_str(&mut data, "file_version_major");
293        append_str(&mut data, &AGC_FILE_MAJOR.to_string());
294
295        append_str(&mut data, "file_version_minor");
296        append_str(&mut data, &AGC_FILE_MINOR.to_string());
297
298        append_str(&mut data, "comment");
299        append_str(
300            &mut data,
301            &format!("AGC (Rust implementation) v.{AGC_FILE_MAJOR}.{AGC_FILE_MINOR}"),
302        );
303
304        let stream_id = self.archive.register_stream("file_type_info");
305        // raw_size = number of key-value pairs (7)
306        self.archive.add_part(stream_id, &data, 7)?;
307
308        Ok(())
309    }
310
311    /// Finalize compression and write all segments
312    pub fn finalize(&mut self) -> Result<()> {
313        if self.config.verbosity > 0 {
314            println!("Finalizing compression...");
315            println!("Total segments: {}", self.total_segments);
316            println!("Segment groups: {}", self.segment_groups.len());
317        }
318
319        // Process each segment group with packed-contig mode
320        let mut group_id = 0u32;
321        const PACK_CARDINALITY: usize = 50; // C++ default
322        const NO_RAW_GROUPS: u32 = 16; // C++ expects first 16 groups to be raw-only
323
324        for (key, segments) in &self.segment_groups {
325            if segments.is_empty() {
326                continue;
327            }
328
329            // Use first segment as reference (only if using LZ encoding)
330            let reference = &segments[0].data;
331            self.group_references.insert(key.clone(), reference.clone());
332
333            let archive_version = AGC_FILE_MAJOR * 1000 + AGC_FILE_MINOR;
334            let stream_name = stream_delta_name(archive_version, group_id);
335            let stream_id = self.archive.register_stream(&stream_name);
336
337            // Determine if this group should use LZ encoding
338            // Groups 0-15 must be raw-only for C++ compatibility
339            let use_lz_encoding = group_id >= NO_RAW_GROUPS;
340
341            // Pack contigs into batches of PACK_CARDINALITY (50)
342            for (pack_idx, pack_segments) in segments.chunks(PACK_CARDINALITY).enumerate() {
343                let mut packed_data = Vec::new();
344
345                // Process segments (all raw for groups 0-15, LZ for groups 16+)
346                for (idx_in_pack, seg_info) in pack_segments.iter().enumerate() {
347                    let global_in_group_id = pack_idx * PACK_CARDINALITY + idx_in_pack;
348
349                    let contig_data = if !use_lz_encoding || global_in_group_id == 0 {
350                        // Raw segment (either in raw-only group, or first segment in LZ group)
351                        seg_info.data.clone()
352                    } else {
353                        // LZ-encoded segment (only for groups 16+ and not first segment)
354                        let mut lz_diff = LZDiff::new(self.config.min_match_len);
355                        lz_diff.prepare(reference);
356                        lz_diff.encode(&seg_info.data)
357                    };
358
359                    // Add contig data
360                    packed_data.extend_from_slice(&contig_data);
361                    // Add separator (0xFF)
362                    packed_data.push(CONTIG_SEPARATOR);
363
364                    // Register in collection
365                    self.collection.add_segment_placed(
366                        &seg_info.sample_name,
367                        &seg_info.contig_name,
368                        seg_info.seg_part_no,
369                        group_id,
370                        global_in_group_id as u32,
371                        seg_info.is_rev_comp,
372                        seg_info.data.len() as u32,
373                    )?;
374                }
375
376                // Compress and store the packed data
377                // The raw size must include separators (one per segment)
378                let total_raw_size = packed_data.len() as u64;
379
380                let mut compressed = compress_segment(&packed_data)?;
381                compressed.push(0); // Marker byte
382
383                self.archive
384                    .add_part(stream_id, &compressed, total_raw_size)?;
385            }
386
387            group_id += 1;
388        }
389
390        // Store params stream (C++ compatibility)
391        self.store_params_stream()?;
392
393        // Store empty splitters stream (C++ compatibility)
394        self.store_splitters_stream()?;
395
396        // Store empty segment-splitters stream (C++ compatibility)
397        self.store_segment_splitters_stream()?;
398
399        // Store collection metadata
400        self.collection
401            .store_batch_sample_names(&mut self.archive)?;
402
403        // Store all samples in one batch
404        let num_samples = self.collection.get_no_samples();
405        if num_samples > 0 {
406            self.collection
407                .store_contig_batch(&mut self.archive, 0, num_samples)?;
408        }
409
410        // Store file_type_info stream (must be last, C++ reads this first)
411        self.store_file_type_info()?;
412
413        // Close archive
414        self.archive.close()?;
415
416        if self.config.verbosity > 0 {
417            println!("Compression complete!");
418            println!("Total bases: {}", self.total_bases_processed);
419            println!("Groups created: {group_id}");
420        }
421
422        Ok(())
423    }
424}
425
426#[cfg(test)]
427mod tests {
428    use super::*;
429    use std::fs;
430
431    #[test]
432    fn test_compressor_basic() {
433        let archive_path = "/tmp/test_compress.agc";
434        let _ = fs::remove_file(archive_path); // Clean up if exists
435
436        let config = CompressorConfig::default();
437        let mut compressor = Compressor::new(archive_path, config).unwrap();
438
439        // Add a simple contig (numeric encoding: A=0, C=1, G=2, T=3)
440        let sequence = vec![0, 1, 2, 3, 0, 1, 2, 3, 0, 1, 2, 3];
441        compressor.add_contig("sample1", "chr1", sequence).unwrap();
442
443        compressor.finalize().unwrap();
444
445        // Verify archive was created
446        assert!(Path::new(archive_path).exists());
447
448        fs::remove_file(archive_path).unwrap();
449    }
450
451    #[test]
452    fn test_compressor_multiple_samples() {
453        let archive_path = "/tmp/test_compress_multi.agc";
454        let _ = fs::remove_file(archive_path);
455
456        let config = CompressorConfig::default();
457        let mut compressor = Compressor::new(archive_path, config).unwrap();
458
459        // Add multiple samples and contigs
460        let seq1 = vec![0, 1, 2, 3, 0, 1, 2, 3]; // ACGTACGT
461        let seq2 = vec![3, 2, 1, 0, 3, 2, 1, 0]; // TGCATGCA
462
463        compressor
464            .add_contig("sample1", "chr1", seq1.clone())
465            .unwrap();
466        compressor
467            .add_contig("sample1", "chr2", seq2.clone())
468            .unwrap();
469        compressor.add_contig("sample2", "chr1", seq1).unwrap();
470
471        compressor.finalize().unwrap();
472
473        assert!(Path::new(archive_path).exists());
474
475        // Verify we can read the collection back
476        let mut archive = Archive::new_reader();
477        archive.open(archive_path).unwrap();
478
479        let mut collection = CollectionV3::new();
480        collection.set_config(1000, 21, None);
481        collection.prepare_for_decompression(&archive).unwrap();
482        collection.load_batch_sample_names(&mut archive).unwrap();
483
484        assert_eq!(collection.get_no_samples(), 2);
485        let samples = collection.get_samples_list(false);
486        assert_eq!(samples.len(), 2);
487        assert!(samples.contains(&"sample1".to_string()));
488        assert!(samples.contains(&"sample2".to_string()));
489
490        fs::remove_file(archive_path).unwrap();
491    }
492
493    #[test]
494    fn test_compressor_from_fasta() {
495        let archive_path = "/tmp/test_compress_fasta.agc";
496        let _ = fs::remove_file(archive_path);
497
498        let config = CompressorConfig::default();
499        let mut compressor = Compressor::new(archive_path, config).unwrap();
500
501        // Use the test FASTA file
502        let fasta_path = Path::new("../test-data/test_simple.fasta");
503
504        if fasta_path.exists() {
505            compressor
506                .add_fasta_file("test_sample", fasta_path)
507                .unwrap();
508            compressor.finalize().unwrap();
509
510            assert!(Path::new(archive_path).exists());
511
512            // Verify collection metadata
513            let mut archive = Archive::new_reader();
514            archive.open(archive_path).unwrap();
515
516            let mut collection = CollectionV3::new();
517            collection.set_config(1000, 21, None);
518            collection.prepare_for_decompression(&archive).unwrap();
519            collection.load_batch_sample_names(&mut archive).unwrap();
520
521            assert_eq!(collection.get_no_samples(), 1);
522            let samples = collection.get_samples_list(false);
523            assert_eq!(samples, vec!["test_sample"]);
524
525            collection.load_contig_batch(&mut archive, 0).unwrap();
526            assert_eq!(collection.get_no_contigs("test_sample"), Some(2));
527
528            let contigs = collection.get_contig_list("test_sample").unwrap();
529            assert_eq!(contigs.len(), 2);
530            assert!(contigs.contains(&"chr1".to_string()));
531            assert!(contigs.contains(&"chr2".to_string()));
532
533            fs::remove_file(archive_path).unwrap();
534        }
535    }
536}