1use std::io;
10
11use noodles::sam::alignment::RecordBuf;
12
13use crate::sort::bam_fields;
14use crate::template::{Template, TemplateBatch};
15use crate::unified_pipeline::{BatchWeight, DecodedRecord, Grouper, MemoryEstimate};
16
17impl BatchWeight for RecordBuf {
23 fn batch_weight(&self) -> usize {
24 1
25 }
26}
27
28impl BatchWeight for Vec<u8> {
30 fn batch_weight(&self) -> usize {
31 1
32 }
33}
34
35impl BatchWeight for TemplateBatch {
37 fn batch_weight(&self) -> usize {
38 self.len()
39 }
40}
41
42#[derive(Default)]
53pub struct SingleRecordGrouper;
54
55impl SingleRecordGrouper {
56 #[must_use]
58 pub fn new() -> Self {
59 Self
60 }
61}
62
63impl Grouper for SingleRecordGrouper {
64 type Group = RecordBuf;
65
66 fn add_records(&mut self, records: Vec<DecodedRecord>) -> io::Result<Vec<Self::Group>> {
67 records
69 .into_iter()
70 .map(|d| {
71 d.into_record().ok_or_else(|| {
72 io::Error::new(
73 io::ErrorKind::InvalidData,
74 "SingleRecordGrouper requires parsed records, got raw bytes",
75 )
76 })
77 })
78 .collect()
79 }
80
81 fn finish(&mut self) -> io::Result<Option<Self::Group>> {
82 Ok(None)
84 }
85
86 fn has_pending(&self) -> bool {
87 false
89 }
90}
91
92#[derive(Default)]
101pub struct SingleRawRecordGrouper;
102
103impl SingleRawRecordGrouper {
104 #[must_use]
106 pub fn new() -> Self {
107 Self
108 }
109}
110
111impl Grouper for SingleRawRecordGrouper {
112 type Group = Vec<u8>;
113
114 fn add_records(&mut self, records: Vec<DecodedRecord>) -> io::Result<Vec<Self::Group>> {
115 records
116 .into_iter()
117 .map(|d| {
118 d.into_raw_bytes().ok_or_else(|| {
119 io::Error::new(
120 io::ErrorKind::InvalidData,
121 "SingleRawRecordGrouper requires raw bytes, got parsed record",
122 )
123 })
124 })
125 .collect()
126 }
127
128 fn finish(&mut self) -> io::Result<Option<Self::Group>> {
129 Ok(None)
130 }
131
132 fn has_pending(&self) -> bool {
133 false
134 }
135}
136
137use std::collections::VecDeque;
142
143pub struct TemplateGrouper {
159 batch_size: usize,
161 current_name: Option<Vec<u8>>,
163 current_name_hash: Option<u64>,
165 current_records: Vec<RecordBuf>,
167 current_raw_records: Vec<Vec<u8>>,
169 pending_templates: VecDeque<Template>,
171}
172
173impl TemplateGrouper {
174 #[must_use]
179 pub fn new(batch_size: usize) -> Self {
180 Self {
181 batch_size: batch_size.max(1),
182 current_name: None,
183 current_name_hash: None,
184 current_records: Vec::new(),
185 current_raw_records: Vec::new(),
186 pending_templates: VecDeque::new(),
187 }
188 }
189
190 fn flush_current_template(&mut self) -> io::Result<()> {
192 debug_assert!(
193 self.current_raw_records.is_empty() || self.current_records.is_empty(),
194 "mixed raw/parsed records in same template group"
195 );
196 if !self.current_raw_records.is_empty() {
197 let template =
198 Template::from_raw_records(std::mem::take(&mut self.current_raw_records))
199 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
200 self.pending_templates.push_back(template);
201 self.current_name = None;
202 self.current_name_hash = None;
203 } else if !self.current_records.is_empty() {
204 let template = Template::from_records(std::mem::take(&mut self.current_records))
205 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
206 self.pending_templates.push_back(template);
207 self.current_name = None;
208 self.current_name_hash = None;
209 }
210 Ok(())
211 }
212
213 fn collect_batches(&mut self) -> Vec<TemplateBatch> {
215 let mut batches = Vec::new();
216
217 while self.pending_templates.len() >= self.batch_size {
218 let mut batch = Vec::with_capacity(self.batch_size);
219 for _ in 0..self.batch_size {
220 if let Some(template) = self.pending_templates.pop_front() {
221 batch.push(template);
222 }
223 }
224 batches.push(batch);
225 }
226
227 batches
228 }
229}
230
231impl Grouper for TemplateGrouper {
232 type Group = TemplateBatch;
233
234 fn add_records(&mut self, records: Vec<DecodedRecord>) -> io::Result<Vec<Self::Group>> {
235 use crate::unified_pipeline::DecodedRecordData;
236
237 for decoded in records {
239 let name_hash = decoded.key.name_hash;
240
241 match decoded.data {
242 DecodedRecordData::Raw(raw) => {
243 match (self.current_name_hash, name_hash) {
245 (Some(current_hash), new_hash) if current_hash == new_hash => {
246 self.current_raw_records.push(raw);
247 }
248 _ => {
249 self.flush_current_template()?;
250 self.current_name = Some(bam_fields::read_name(&raw).to_vec());
251 self.current_name_hash = Some(name_hash);
252 self.current_raw_records.push(raw);
253 }
254 }
255 }
256 DecodedRecordData::Parsed(record) => {
257 let name = record.name().map(|n| Vec::from(<_ as AsRef<[u8]>>::as_ref(n)));
258 match (self.current_name_hash, name_hash) {
259 (Some(current_hash), new_hash) if current_hash == new_hash => {
260 self.current_records.push(record);
261 }
262 _ => {
263 self.flush_current_template()?;
264 self.current_name = name;
265 self.current_name_hash = Some(name_hash);
266 self.current_records.push(record);
267 }
268 }
269 }
270 }
271 }
272
273 Ok(self.collect_batches())
275 }
276
277 fn finish(&mut self) -> io::Result<Option<Self::Group>> {
278 self.flush_current_template()?;
280
281 if self.pending_templates.is_empty() {
283 return Ok(None);
284 }
285
286 let mut batch = Vec::with_capacity(self.pending_templates.len());
287 while let Some(template) = self.pending_templates.pop_front() {
288 batch.push(template);
289 }
290 Ok(Some(batch))
291 }
292
293 fn has_pending(&self) -> bool {
294 self.current_name.is_some()
295 || !self.pending_templates.is_empty()
296 || !self.current_raw_records.is_empty()
297 || !self.current_records.is_empty()
298 }
299}
300
301use noodles::sam::alignment::record::data::field::Tag;
302
303use crate::unified_pipeline::GroupKey;
304
305impl MemoryEstimate for ProcessedPositionGroup {
306 fn estimate_heap_size(&self) -> usize {
307 let templates_size: usize =
309 self.templates.iter().map(MemoryEstimate::estimate_heap_size).sum();
310 let templates_vec_overhead = self.templates.capacity() * std::mem::size_of::<Template>();
311
312 let family_sizes_size = self.family_sizes.len() * 24;
315
316 templates_size + templates_vec_overhead + family_sizes_size
320 }
321}
322
323#[derive(Default, Clone, Debug)]
325pub struct FilterMetrics {
326 pub total_templates: u64,
328 pub accepted_templates: u64,
330 pub discarded_non_pf: u64,
332 pub discarded_poor_alignment: u64,
334 pub discarded_ns_in_umi: u64,
336 pub discarded_umi_too_short: u64,
338}
339
340impl FilterMetrics {
341 #[must_use]
343 pub fn new() -> Self {
344 Self::default()
345 }
346
347 pub fn merge(&mut self, other: &FilterMetrics) {
349 self.total_templates += other.total_templates;
350 self.accepted_templates += other.accepted_templates;
351 self.discarded_non_pf += other.discarded_non_pf;
352 self.discarded_poor_alignment += other.discarded_poor_alignment;
353 self.discarded_ns_in_umi += other.discarded_ns_in_umi;
354 self.discarded_umi_too_short += other.discarded_umi_too_short;
355 }
356}
357
358#[derive(Debug)]
360pub struct ProcessedPositionGroup {
361 pub templates: Vec<Template>,
364 pub family_sizes: ahash::AHashMap<usize, u64>,
366 pub filter_metrics: FilterMetrics,
368 pub input_record_count: u64,
370}
371
372type PositionKeyTuple = (i32, i32, u8, i32, i32, u8, u16, u64);
378
379#[derive(Debug)]
388pub struct RawPositionGroup {
389 pub group_key: GroupKey,
391 pub records: Vec<DecodedRecord>,
393}
394
395impl BatchWeight for RawPositionGroup {
396 fn batch_weight(&self) -> usize {
397 self.records.len().div_ceil(2)
399 }
400}
401
402impl MemoryEstimate for RawPositionGroup {
403 fn estimate_heap_size(&self) -> usize {
404 let records_size: usize = self.records.iter().map(MemoryEstimate::estimate_heap_size).sum();
405 let records_vec_overhead = self.records.capacity() * std::mem::size_of::<DecodedRecord>();
406 records_size + records_vec_overhead
407 }
408}
409
410pub struct RecordPositionGrouper {
428 current_position_key: Option<PositionKeyTuple>,
430 current_group_key: Option<GroupKey>,
432 current_records: Vec<DecodedRecord>,
434 mc_validated: bool,
436 include_secondary_supplementary: bool,
441}
442
443impl RecordPositionGrouper {
444 #[must_use]
448 pub fn new() -> Self {
449 Self {
450 current_position_key: None,
451 current_group_key: None,
452 current_records: Vec::new(),
453 mc_validated: false,
454 include_secondary_supplementary: false,
455 }
456 }
457
458 #[must_use]
463 pub fn with_secondary_supplementary() -> Self {
464 Self { include_secondary_supplementary: true, ..Self::new() }
465 }
466
467 fn validate_mc_tag(&mut self, decoded: &DecodedRecord) -> io::Result<()> {
472 use crate::sort::bam_fields;
473 use crate::unified_pipeline::DecodedRecordData;
474
475 if self.mc_validated {
476 return Ok(());
477 }
478
479 match &decoded.data {
480 DecodedRecordData::Raw(raw) => {
481 let flg = bam_fields::flags(raw);
482 let is_paired = (flg & bam_fields::flags::PAIRED) != 0;
483 let is_secondary = (flg & bam_fields::flags::SECONDARY) != 0;
484 let is_supplementary = (flg & bam_fields::flags::SUPPLEMENTARY) != 0;
485 let is_unmapped = (flg & bam_fields::flags::UNMAPPED) != 0;
486 let is_mate_unmapped = (flg & bam_fields::flags::MATE_UNMAPPED) != 0;
487
488 if is_paired
489 && !is_secondary
490 && !is_supplementary
491 && !is_unmapped
492 && !is_mate_unmapped
493 {
494 if bam_fields::find_mc_tag_in_record(raw).is_none() {
495 return Err(io::Error::new(
496 io::ErrorKind::InvalidData,
497 "RecordPositionGrouper requires MC tags on paired-end reads. \
498 Run `fgumi zipper` to add MC tags before `fgumi group`.",
499 ));
500 }
501 self.mc_validated = true;
502 }
503 }
504 DecodedRecordData::Parsed(record) => {
505 let flags = record.flags();
506 if flags.is_segmented()
507 && !flags.is_secondary()
508 && !flags.is_supplementary()
509 && !flags.is_unmapped()
510 && !flags.is_mate_unmapped()
511 {
512 let mc_tag = Tag::from([b'M', b'C']);
513 if record.data().get(&mc_tag).is_none() {
514 return Err(io::Error::new(
515 io::ErrorKind::InvalidData,
516 "RecordPositionGrouper requires MC tags on paired-end reads. \
517 Run `fgumi zipper` to add MC tags before `fgumi group`.",
518 ));
519 }
520 self.mc_validated = true;
521 }
522 }
523 }
524 Ok(())
525 }
526
527 fn process_record(&mut self, decoded: DecodedRecord) -> io::Result<Option<RawPositionGroup>> {
529 if decoded.key.ref_id1 == GroupKey::UNKNOWN_REF && !self.include_secondary_supplementary {
532 return Ok(None);
533 }
534
535 self.validate_mc_tag(&decoded)?;
537
538 let record_pos_key = decoded.key.position_key();
539
540 match self.current_position_key {
541 Some(current_pos) if current_pos == record_pos_key => {
542 self.current_records.push(decoded);
544 Ok(None)
545 }
546 Some(_)
547 if self
548 .current_records
549 .last()
550 .is_some_and(|last| last.key.name_hash == decoded.key.name_hash) =>
551 {
552 self.current_records.push(decoded);
558 Ok(None)
559 }
560 Some(_) => {
561 let finished_records = std::mem::take(&mut self.current_records);
563 let finished_key = self
564 .current_group_key
565 .take()
566 .expect("current_group_key set when current_position_key is set");
567
568 let group = RawPositionGroup { group_key: finished_key, records: finished_records };
569
570 self.current_position_key = Some(record_pos_key);
572 self.current_group_key = Some(decoded.key);
573 self.current_records.push(decoded);
574
575 Ok(Some(group))
576 }
577 None => {
578 self.current_position_key = Some(record_pos_key);
580 self.current_group_key = Some(decoded.key);
581 self.current_records.push(decoded);
582 Ok(None)
583 }
584 }
585 }
586}
587
588impl Default for RecordPositionGrouper {
589 fn default() -> Self {
590 Self::new()
591 }
592}
593
594impl Grouper for RecordPositionGrouper {
595 type Group = RawPositionGroup;
596
597 fn add_records(&mut self, records: Vec<DecodedRecord>) -> io::Result<Vec<Self::Group>> {
598 let mut completed_groups = Vec::new();
599 for decoded in records {
600 if let Some(group) = self.process_record(decoded)? {
601 completed_groups.push(group);
602 }
603 }
604 Ok(completed_groups)
605 }
606
607 fn finish(&mut self) -> io::Result<Option<Self::Group>> {
608 if !self.current_records.is_empty() {
609 debug_assert!(
610 self.current_group_key.is_some(),
611 "RecordPositionGrouper has {} buffered records but no group key",
612 self.current_records.len()
613 );
614 if let Some(key) = self.current_group_key.take() {
615 let records = std::mem::take(&mut self.current_records);
616 self.current_position_key = None;
617 return Ok(Some(RawPositionGroup { group_key: key, records }));
618 }
619 }
620 Ok(None)
621 }
622
623 fn has_pending(&self) -> bool {
624 !self.current_records.is_empty()
625 }
626}
627
628fn group_by_name_and_build<T>(
635 records: Vec<DecodedRecord>,
636 extract: impl Fn(DecodedRecord) -> io::Result<T>,
637 build: impl Fn(Vec<T>) -> anyhow::Result<Template>,
638) -> io::Result<Vec<Template>> {
639 let mut templates = Vec::new();
640 let mut current_name_hash: Option<u64> = None;
641 let mut current_items: Vec<T> = Vec::new();
642
643 for decoded in records {
644 let name_hash = decoded.key.name_hash;
645 let item = extract(decoded)?;
646
647 match current_name_hash {
648 Some(h) if h == name_hash => {
649 current_items.push(item);
650 }
651 Some(_) => {
652 let template = build(std::mem::take(&mut current_items))
653 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
654 templates.push(template);
655 current_name_hash = Some(name_hash);
656 current_items.push(item);
657 }
658 None => {
659 current_name_hash = Some(name_hash);
660 current_items.push(item);
661 }
662 }
663 }
664
665 if !current_items.is_empty() {
666 let template =
667 build(current_items).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
668 templates.push(template);
669 }
670
671 Ok(templates)
672}
673
674pub fn build_templates_from_records(records: Vec<DecodedRecord>) -> io::Result<Vec<Template>> {
684 use crate::unified_pipeline::DecodedRecordData;
685
686 if records.is_empty() {
687 return Ok(Vec::new());
688 }
689
690 let raw_byte_mode = matches!(records[0].data, DecodedRecordData::Raw(_));
691
692 if raw_byte_mode {
693 group_by_name_and_build(
694 records,
695 |d| match d.data {
696 DecodedRecordData::Raw(v) => Ok(v),
697 DecodedRecordData::Parsed(_) => Err(io::Error::new(
698 io::ErrorKind::InvalidData,
699 "expected raw-byte record but found parsed record",
700 )),
701 },
702 Template::from_raw_records,
703 )
704 } else {
705 group_by_name_and_build(
706 records,
707 |d| match d.data {
708 DecodedRecordData::Parsed(r) => Ok(r),
709 DecodedRecordData::Raw(_) => Err(io::Error::new(
710 io::ErrorKind::InvalidData,
711 "expected parsed record but found raw-byte record",
712 )),
713 },
714 Template::from_records,
715 )
716 }
717}
718
719#[derive(Debug, Clone)]
725pub struct FastqRecord {
726 pub name: Vec<u8>,
728 pub sequence: Vec<u8>,
730 pub quality: Vec<u8>,
732}
733
734#[derive(Debug)]
736enum FastqParseResult {
737 Incomplete,
739 Error(io::Error),
741}
742
743pub fn parse_fastq_records(data: &[u8]) -> io::Result<(Vec<FastqRecord>, Vec<u8>)> {
759 let mut records = Vec::new();
760 let mut pos = 0;
761
762 while pos < data.len() {
763 if data[pos] != b'@' {
765 while pos < data.len() && data[pos] != b'@' {
767 pos += 1;
768 }
769 if pos >= data.len() {
770 return Ok((records, Vec::new()));
771 }
772 }
773
774 match parse_single_fastq_record(&data[pos..]) {
776 Ok((record, consumed)) => {
777 records.push(record);
778 pos += consumed;
779 }
780 Err(FastqParseResult::Incomplete) => {
781 return Ok((records, data[pos..].to_vec()));
783 }
784 Err(FastqParseResult::Error(e)) => {
785 return Err(e);
786 }
787 }
788 }
789
790 Ok((records, Vec::new()))
791}
792
793fn parse_single_fastq_record(data: &[u8]) -> Result<(FastqRecord, usize), FastqParseResult> {
796 let mut pos = 0;
797
798 if data.is_empty() || data[0] != b'@' {
800 return Err(FastqParseResult::Error(io::Error::new(
801 io::ErrorKind::InvalidData,
802 "FASTQ record must start with @",
803 )));
804 }
805 pos += 1; let name_end = find_newline(&data[pos..]).ok_or(FastqParseResult::Incomplete)?;
808 let name = data[pos..pos + name_end].to_vec();
809 pos += name_end + 1; if pos >= data.len() {
813 return Err(FastqParseResult::Incomplete);
814 }
815 let seq_end = find_newline(&data[pos..]).ok_or(FastqParseResult::Incomplete)?;
816 let sequence = data[pos..pos + seq_end].to_vec();
817 pos += seq_end + 1;
818
819 if pos >= data.len() {
821 return Err(FastqParseResult::Incomplete);
822 }
823 let plus_end = find_newline(&data[pos..]).ok_or(FastqParseResult::Incomplete)?;
824 if data[pos] != b'+' {
825 return Err(FastqParseResult::Error(io::Error::new(
826 io::ErrorKind::InvalidData,
827 "FASTQ separator line must start with +",
828 )));
829 }
830 pos += plus_end + 1;
831
832 if pos >= data.len() {
834 return Err(FastqParseResult::Incomplete);
835 }
836 let qual_end = find_newline(&data[pos..]).ok_or(FastqParseResult::Incomplete)?;
837 let quality = data[pos..pos + qual_end].to_vec();
838 pos += qual_end + 1;
839
840 if sequence.len() != quality.len() {
842 return Err(FastqParseResult::Error(io::Error::new(
843 io::ErrorKind::InvalidData,
844 format!("Sequence length ({}) != quality length ({})", sequence.len(), quality.len()),
845 )));
846 }
847
848 Ok((FastqRecord { name, sequence, quality }, pos))
849}
850
851fn find_newline(data: &[u8]) -> Option<usize> {
853 data.iter().position(|&b| b == b'\n')
854}
855
856#[must_use]
858pub fn strip_read_suffix(name: &[u8]) -> &[u8] {
859 if name.len() >= 2 {
860 let suffix = &name[name.len() - 2..];
861 if suffix == b"/1" || suffix == b"/2" {
862 return &name[..name.len() - 2];
863 }
864 }
865 name
866}
867
868#[derive(Debug)]
875pub struct FastqTemplate {
876 pub records: Vec<FastqRecord>,
878 pub name: Vec<u8>,
880}
881
882impl MemoryEstimate for FastqRecord {
883 fn estimate_heap_size(&self) -> usize {
884 self.name.capacity() + self.sequence.capacity() + self.quality.capacity()
885 }
886}
887
888impl MemoryEstimate for FastqTemplate {
889 fn estimate_heap_size(&self) -> usize {
890 let records_heap: usize = self.records.iter().map(MemoryEstimate::estimate_heap_size).sum();
891 let records_vec_overhead = self.records.capacity() * std::mem::size_of::<FastqRecord>();
892 self.name.capacity() + records_heap + records_vec_overhead
893 }
894}
895
896pub struct FastqGrouper {
902 num_inputs: usize,
904 leftovers: Vec<Vec<u8>>,
906 pending_records: Vec<VecDeque<FastqRecord>>,
908}
909
910impl FastqGrouper {
911 #[must_use]
913 pub fn new(num_inputs: usize) -> Self {
914 log::debug!("FastqGrouper::new: num_inputs={num_inputs}");
915 Self {
916 num_inputs,
917 leftovers: vec![Vec::new(); num_inputs],
918 pending_records: (0..num_inputs).map(|_| VecDeque::new()).collect(),
919 }
920 }
921
922 pub fn add_bytes_for_stream(&mut self, stream_idx: usize, data: &[u8]) -> io::Result<()> {
931 log::trace!(
932 "FastqGrouper::add_bytes_for_stream: stream_idx={}, num_inputs={}, data_len={}",
933 stream_idx,
934 self.num_inputs,
935 data.len()
936 );
937 if stream_idx >= self.num_inputs {
938 return Err(io::Error::new(
939 io::ErrorKind::InvalidInput,
940 format!("Stream index {} out of range (max {})", stream_idx, self.num_inputs - 1),
941 ));
942 }
943
944 let combined = if self.leftovers[stream_idx].is_empty() {
946 data.to_vec()
947 } else {
948 let mut combined = std::mem::take(&mut self.leftovers[stream_idx]);
949 combined.extend_from_slice(data);
950 combined
951 };
952
953 let (records, leftover) = parse_fastq_records(&combined)?;
955 self.leftovers[stream_idx] = leftover;
956
957 self.pending_records[stream_idx].extend(records);
959
960 Ok(())
961 }
962
963 pub fn add_records_for_stream(
975 &mut self,
976 stream_idx: usize,
977 records: Vec<FastqRecord>,
978 ) -> io::Result<()> {
979 log::trace!(
980 "FastqGrouper::add_records_for_stream: stream_idx={}, num_inputs={}, records_len={}",
981 stream_idx,
982 self.num_inputs,
983 records.len()
984 );
985 if stream_idx >= self.num_inputs {
986 return Err(io::Error::new(
987 io::ErrorKind::InvalidInput,
988 format!("Stream index {} out of range (max {})", stream_idx, self.num_inputs - 1),
989 ));
990 }
991
992 self.pending_records[stream_idx].extend(records);
994
995 Ok(())
996 }
997
998 #[must_use]
1003 pub fn has_leftover_bytes(&self) -> bool {
1004 self.leftovers.iter().any(|l| !l.is_empty())
1005 }
1006
1007 pub fn drain_complete_templates(&mut self) -> io::Result<Vec<FastqTemplate>> {
1017 let mut templates = Vec::new();
1018
1019 while self.pending_records.iter().all(|q| !q.is_empty()) {
1021 let base_name = {
1023 let names: Vec<_> =
1025 self.pending_records.iter().map(|q| &q.front().unwrap().name).collect();
1026
1027 let base_name = strip_read_suffix(names[0]).to_vec();
1029
1030 for (i, name) in names.iter().enumerate().skip(1) {
1032 let other_base = strip_read_suffix(name);
1033 if base_name != other_base {
1034 return Err(io::Error::new(
1035 io::ErrorKind::InvalidData,
1036 format!(
1037 "FASTQ files out of sync: stream 0 has '{}', stream {} has '{}'",
1038 String::from_utf8_lossy(&base_name),
1039 i,
1040 String::from_utf8_lossy(other_base),
1041 ),
1042 ));
1043 }
1044 }
1045
1046 base_name };
1048
1049 let records: Vec<_> =
1051 self.pending_records.iter_mut().map(|q| q.pop_front().unwrap()).collect();
1052
1053 templates.push(FastqTemplate { name: base_name, records });
1054 }
1055
1056 Ok(templates)
1057 }
1058
1059 #[must_use]
1061 pub fn has_pending(&self) -> bool {
1062 self.leftovers.iter().any(|l| !l.is_empty())
1063 || self.pending_records.iter().any(|q| !q.is_empty())
1064 }
1065
1066 pub fn finish(&mut self) -> io::Result<Option<FastqTemplate>> {
1072 let templates = self.drain_complete_templates()?;
1074 if templates.len() == 1 {
1075 return Ok(templates.into_iter().next());
1076 } else if templates.len() > 1 {
1077 return Err(io::Error::new(
1078 io::ErrorKind::InvalidData,
1079 "Multiple templates remaining at finish",
1080 ));
1081 }
1082
1083 if self.leftovers.iter().any(|l| !l.is_empty()) {
1085 return Err(io::Error::new(
1086 io::ErrorKind::UnexpectedEof,
1087 "Incomplete FASTQ record at EOF",
1088 ));
1089 }
1090
1091 if self.pending_records.iter().any(|q| !q.is_empty()) {
1092 return Err(io::Error::new(
1093 io::ErrorKind::InvalidData,
1094 "Unmatched FASTQ records at EOF - files out of sync",
1095 ));
1096 }
1097
1098 Ok(None)
1099 }
1100}
1101
1102#[cfg(test)]
1107mod tests {
1108 use super::*;
1109
1110 #[test]
1111 fn test_single_record_grouper_empty() {
1112 let mut grouper = SingleRecordGrouper::new();
1113 assert!(!grouper.has_pending());
1114
1115 let result = grouper.finish().unwrap();
1116 assert!(result.is_none());
1117 }
1118
1119 #[test]
1120 fn test_single_raw_record_grouper_empty() {
1121 let mut grouper = SingleRawRecordGrouper::new();
1122 assert!(!grouper.has_pending());
1123
1124 let result = grouper.finish().unwrap();
1125 assert!(result.is_none());
1126 }
1127
1128 #[test]
1129 fn test_single_raw_record_grouper_emits_each_record() {
1130 use crate::unified_pipeline::{DecodedRecord, GroupKey};
1131
1132 let mut grouper = SingleRawRecordGrouper::new();
1133 let raw1 = vec![1u8; 36];
1134 let raw2 = vec![2u8; 36];
1135 let records = vec![
1136 DecodedRecord::from_raw_bytes(raw1.clone(), GroupKey::default()),
1137 DecodedRecord::from_raw_bytes(raw2.clone(), GroupKey::default()),
1138 ];
1139
1140 let groups = grouper.add_records(records).unwrap();
1141 assert_eq!(groups.len(), 2);
1142 assert_eq!(groups[0], raw1);
1143 assert_eq!(groups[1], raw2);
1144 }
1145
1146 #[test]
1147 fn test_single_raw_record_grouper_rejects_parsed() {
1148 use crate::sam::builder::RecordBuilder;
1149 use crate::unified_pipeline::{DecodedRecord, GroupKey};
1150
1151 let mut grouper = SingleRawRecordGrouper::new();
1152 let rec = RecordBuilder::new().sequence("ACGT").build();
1153 let records = vec![DecodedRecord::new(rec, GroupKey::default())];
1154
1155 let result = grouper.add_records(records);
1156 assert!(result.is_err());
1157 }
1158
1159 #[test]
1161 fn test_parse_single_fastq_record() {
1162 let data = b"@read1\nACGT\n+\nIIII\n";
1163 let (record, consumed) = parse_single_fastq_record(data).unwrap();
1164 assert_eq!(record.name, b"read1");
1165 assert_eq!(record.sequence, b"ACGT");
1166 assert_eq!(record.quality, b"IIII");
1167 assert_eq!(consumed, data.len());
1168 }
1169
1170 #[test]
1171 fn test_parse_fastq_records_multiple() {
1172 let data = b"@read1\nACGT\n+\nIIII\n@read2\nTGCA\n+\nJJJJ\n";
1173 let (records, leftover) = parse_fastq_records(data).unwrap();
1174 assert_eq!(records.len(), 2);
1175 assert_eq!(records[0].name, b"read1");
1176 assert_eq!(records[1].name, b"read2");
1177 assert!(leftover.is_empty());
1178 }
1179
1180 #[test]
1181 fn test_parse_fastq_incomplete_record() {
1182 let data = b"@read1\nACGT\n+\n";
1183 let (records, leftover) = parse_fastq_records(data).unwrap();
1184 assert!(records.is_empty());
1185 assert_eq!(leftover, data);
1186 }
1187
1188 #[test]
1189 fn test_parse_fastq_with_leftover() {
1190 let data = b"@read1\nACGT\n+\nIIII\n@read2\nTG";
1191 let (records, leftover) = parse_fastq_records(data).unwrap();
1192 assert_eq!(records.len(), 1);
1193 assert_eq!(records[0].name, b"read1");
1194 assert_eq!(leftover, b"@read2\nTG");
1195 }
1196
1197 #[test]
1198 fn test_strip_read_suffix() {
1199 assert_eq!(strip_read_suffix(b"read1/1"), b"read1");
1200 assert_eq!(strip_read_suffix(b"read1/2"), b"read1");
1201 assert_eq!(strip_read_suffix(b"read1"), b"read1");
1202 assert_eq!(strip_read_suffix(b"a"), b"a");
1203 assert_eq!(strip_read_suffix(b""), b"" as &[u8]);
1204 }
1205
1206 #[test]
1208 fn test_fastq_grouper_paired() {
1209 let mut grouper = FastqGrouper::new(2);
1210
1211 grouper.add_bytes_for_stream(0, b"@read1/1\nACGT\n+\nIIII\n").unwrap();
1213 grouper.add_bytes_for_stream(1, b"@read1/2\nTGCA\n+\nJJJJ\n").unwrap();
1215
1216 let templates = grouper.drain_complete_templates().unwrap();
1217 assert_eq!(templates.len(), 1);
1218 assert_eq!(templates[0].name, b"read1");
1219 assert_eq!(templates[0].records.len(), 2);
1220 assert_eq!(templates[0].records[0].sequence, b"ACGT");
1221 assert_eq!(templates[0].records[1].sequence, b"TGCA");
1222 }
1223
1224 #[test]
1225 fn test_fastq_grouper_multiple_templates() {
1226 let mut grouper = FastqGrouper::new(2);
1227
1228 grouper
1230 .add_bytes_for_stream(0, b"@read1/1\nACGT\n+\nIIII\n@read2/1\nAAAA\n+\nIIII\n")
1231 .unwrap();
1232 grouper
1234 .add_bytes_for_stream(1, b"@read1/2\nTGCA\n+\nJJJJ\n@read2/2\nTTTT\n+\nJJJJ\n")
1235 .unwrap();
1236
1237 let templates = grouper.drain_complete_templates().unwrap();
1238 assert_eq!(templates.len(), 2);
1239 assert_eq!(templates[0].name, b"read1");
1240 assert_eq!(templates[1].name, b"read2");
1241 }
1242
1243 #[test]
1244 fn test_fastq_grouper_partial_then_complete() {
1245 let mut grouper = FastqGrouper::new(2);
1246
1247 grouper.add_bytes_for_stream(0, b"@read1/1\nACGT\n+\nIIII\n").unwrap();
1249
1250 let templates = grouper.drain_complete_templates().unwrap();
1252 assert!(templates.is_empty());
1253 assert!(grouper.has_pending());
1254
1255 grouper.add_bytes_for_stream(1, b"@read1/2\nTGCA\n+\nJJJJ\n").unwrap();
1257
1258 let templates = grouper.drain_complete_templates().unwrap();
1259 assert_eq!(templates.len(), 1);
1260 }
1261
1262 #[test]
1263 fn test_fastq_grouper_finish_empty() {
1264 let mut grouper = FastqGrouper::new(2);
1265 let result = grouper.finish().unwrap();
1266 assert!(result.is_none());
1267 }
1268
1269 #[test]
1270 fn test_fastq_grouper_out_of_sync() {
1271 let mut grouper = FastqGrouper::new(2);
1272
1273 grouper.add_bytes_for_stream(0, b"@read1/1\nACGT\n+\nIIII\n").unwrap();
1275 grouper.add_bytes_for_stream(1, b"@read2/2\nTGCA\n+\nJJJJ\n").unwrap();
1276
1277 let result = grouper.drain_complete_templates();
1278 assert!(result.is_err());
1279 assert!(result.unwrap_err().to_string().contains("out of sync"));
1280 }
1281
1282 use crate::sam::builder::RecordBuilder;
1287
1288 fn make_decoded(
1290 key: GroupKey,
1291 paired: bool,
1292 first_segment: bool,
1293 mc: Option<&str>,
1294 ) -> DecodedRecord {
1295 let mut builder = RecordBuilder::new()
1296 .name("read1")
1297 .sequence("ACGT")
1298 .qualities(&[30, 30, 30, 30])
1299 .cigar("4M")
1300 .reference_sequence_id(0)
1301 .alignment_start(100)
1302 .paired(paired);
1303 if first_segment {
1304 builder = builder.first_segment(true);
1305 }
1306 if let Some(mc_val) = mc {
1307 builder = builder.tag("MC", mc_val);
1308 }
1309 DecodedRecord::new(builder.build(), key)
1310 }
1311
1312 fn make_secondary_decoded(name_hash: u64) -> DecodedRecord {
1314 let key = GroupKey { name_hash, ..GroupKey::default() };
1315 let record = RecordBuilder::new()
1316 .name("read1")
1317 .sequence("ACGT")
1318 .qualities(&[30, 30, 30, 30])
1319 .secondary(true)
1320 .build();
1321 DecodedRecord::new(record, key)
1322 }
1323
1324 #[test]
1325 fn test_record_position_grouper_empty() {
1326 let mut grouper = RecordPositionGrouper::new();
1327 assert!(!grouper.has_pending());
1328 let result = grouper.finish().unwrap();
1329 assert!(result.is_none());
1330 }
1331
1332 #[test]
1333 fn test_record_position_grouper_single_unpaired_record() {
1334 let mut grouper = RecordPositionGrouper::new();
1335 let key = GroupKey::single(0, 100, 0, 0, 0, 12345);
1336 let decoded = make_decoded(key, false, false, None);
1337
1338 let groups = grouper.add_records(vec![decoded]).unwrap();
1339 assert!(groups.is_empty()); assert!(grouper.has_pending());
1341
1342 let final_group = grouper.finish().unwrap().expect("should emit final group");
1343 assert_eq!(final_group.records.len(), 1);
1344 assert_eq!(final_group.group_key.ref_id1, 0);
1345 assert_eq!(final_group.group_key.pos1, 100);
1346 }
1347
1348 #[test]
1349 fn test_record_position_grouper_same_position_multiple_records() {
1350 let mut grouper = RecordPositionGrouper::new();
1351 let key1 = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 11111);
1352 let key2 = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 22222);
1353 let key3 = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 33333);
1354
1355 let records = vec![
1356 make_decoded(key1, true, true, Some("4M")),
1357 make_decoded(key2, true, true, Some("4M")),
1358 make_decoded(key3, true, true, Some("4M")),
1359 ];
1360
1361 let groups = grouper.add_records(records).unwrap();
1362 assert!(groups.is_empty()); let final_group = grouper.finish().unwrap().expect("should emit final group");
1365 assert_eq!(final_group.records.len(), 3);
1366 }
1367
1368 #[test]
1369 fn test_record_position_grouper_different_positions() {
1370 let mut grouper = RecordPositionGrouper::new();
1371 let key_pos1 = GroupKey::single(0, 100, 0, 0, 0, 11111);
1372 let key_pos2 = GroupKey::single(0, 200, 0, 0, 0, 22222);
1373 let key_pos3 = GroupKey::single(0, 300, 0, 0, 0, 33333);
1374
1375 let records = vec![
1376 make_decoded(key_pos1, false, false, None),
1377 make_decoded(key_pos2, false, false, None),
1378 make_decoded(key_pos3, false, false, None),
1379 ];
1380
1381 let groups = grouper.add_records(records).unwrap();
1382 assert_eq!(groups.len(), 2);
1384 assert_eq!(groups[0].group_key.pos1, 100);
1385 assert_eq!(groups[0].records.len(), 1);
1386 assert_eq!(groups[1].group_key.pos1, 200);
1387 assert_eq!(groups[1].records.len(), 1);
1388
1389 let final_group = grouper.finish().unwrap().expect("should emit final group");
1391 assert_eq!(final_group.group_key.pos1, 300);
1392 }
1393
1394 #[test]
1395 fn test_record_position_grouper_skips_secondary() {
1396 let mut grouper = RecordPositionGrouper::new();
1397 let primary_key = GroupKey::single(0, 100, 0, 0, 0, 11111);
1398
1399 let records = vec![
1400 make_decoded(primary_key, false, false, None),
1401 make_secondary_decoded(11111), ];
1403
1404 let groups = grouper.add_records(records).unwrap();
1405 assert!(groups.is_empty());
1406
1407 let final_group = grouper.finish().unwrap().expect("should emit final group");
1408 assert_eq!(final_group.records.len(), 1); }
1410
1411 #[test]
1412 fn test_record_position_grouper_paired_same_position_key() {
1413 let mut grouper = RecordPositionGrouper::new();
1415 let r1_key = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 12345);
1417 let r2_key = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 12345);
1418 assert_eq!(r1_key.position_key(), r2_key.position_key());
1420
1421 let records = vec![
1422 make_decoded(r1_key, true, true, Some("4M")),
1423 make_decoded(r2_key, true, false, Some("4M")),
1424 ];
1425
1426 let groups = grouper.add_records(records).unwrap();
1427 assert!(groups.is_empty()); let final_group = grouper.finish().unwrap().expect("should emit final group");
1430 assert_eq!(final_group.records.len(), 2);
1431 }
1432
1433 #[test]
1434 fn test_record_position_grouper_groups_records_by_position() {
1435 let mut grouper = RecordPositionGrouper::new();
1436 let key1 = GroupKey::single(0, 100, 0, 0, 0, 11111);
1438 let key2 = GroupKey::single(0, 100, 0, 0, 0, 22222);
1439 let key3 = GroupKey::single(0, 100, 0, 0, 0, 33333);
1440 let key4 = GroupKey::single(0, 200, 0, 0, 0, 44444);
1442 let key5 = GroupKey::single(0, 200, 0, 0, 0, 55555);
1443 let key6 = GroupKey::single(0, 300, 0, 0, 0, 66666);
1445
1446 let records = vec![
1447 make_decoded(key1, false, false, None),
1448 make_decoded(key2, false, false, None),
1449 make_decoded(key3, false, false, None),
1450 make_decoded(key4, false, false, None),
1451 make_decoded(key5, false, false, None),
1452 make_decoded(key6, false, false, None),
1453 ];
1454
1455 let groups = grouper.add_records(records).unwrap();
1456 assert_eq!(groups.len(), 2);
1457 assert_eq!(groups[0].records.len(), 3);
1458 assert_eq!(groups[1].records.len(), 2);
1459
1460 let final_group = grouper.finish().unwrap().expect("should emit final group");
1461 assert_eq!(final_group.records.len(), 1);
1462 }
1463
1464 #[test]
1465 fn test_record_position_grouper_coalesces_unmapped_mate_by_name_hash() {
1466 let mut grouper = RecordPositionGrouper::new();
1470 let name_hash = 12345_u64;
1471
1472 let r1_key = GroupKey::single(5, 100, 0, 0, 0, name_hash);
1474 let r1 = {
1475 let record = RecordBuilder::new()
1476 .name("read1")
1477 .sequence("ACGT")
1478 .qualities(&[30, 30, 30, 30])
1479 .cigar("4M")
1480 .reference_sequence_id(5)
1481 .alignment_start(100)
1482 .paired(true)
1483 .first_segment(true)
1484 .mate_unmapped(true)
1485 .build();
1486 DecodedRecord::new(record, r1_key)
1487 };
1488
1489 let r2_key = GroupKey::single(-1, 0, 0, 0, 0, name_hash);
1491 let r2 = {
1492 let record = RecordBuilder::new()
1493 .name("read1")
1494 .sequence("TGCA")
1495 .qualities(&[30, 30, 30, 30])
1496 .paired(true)
1497 .first_segment(false)
1498 .unmapped(true)
1499 .build();
1500 DecodedRecord::new(record, r2_key)
1501 };
1502
1503 assert_ne!(r1_key.position_key(), r2_key.position_key());
1505
1506 let groups = grouper.add_records(vec![r1, r2]).unwrap();
1507 assert!(groups.is_empty());
1509
1510 let final_group = grouper.finish().unwrap().expect("should emit final group");
1511 assert_eq!(final_group.records.len(), 2, "R1 and R2 should be in the same group");
1512 assert_eq!(final_group.group_key.ref_id1, 5, "Group key should use R1's position");
1513 }
1514
1515 #[test]
1516 fn test_record_position_grouper_does_not_coalesce_different_name_hash() {
1517 let mut grouper = RecordPositionGrouper::new();
1519
1520 let r1_key = GroupKey::single(0, 100, 0, 0, 0, 11111);
1521 let r2_key = GroupKey::single(0, 200, 0, 0, 0, 22222);
1522
1523 let records = vec![
1524 make_decoded(r1_key, false, false, None),
1525 make_decoded(r2_key, false, false, None),
1526 ];
1527
1528 let groups = grouper.add_records(records).unwrap();
1529 assert_eq!(groups.len(), 1);
1531 assert_eq!(groups[0].records.len(), 1);
1532 }
1533
1534 #[test]
1535 fn test_record_position_grouper_mc_validation_skips_unmapped_mate() {
1536 let mut grouper = RecordPositionGrouper::new();
1538 let key = GroupKey::single(0, 100, 0, 0, 0, 12345);
1539 let record = RecordBuilder::new()
1540 .name("read1")
1541 .sequence("ACGT")
1542 .qualities(&[30, 30, 30, 30])
1543 .cigar("4M")
1544 .reference_sequence_id(0)
1545 .alignment_start(100)
1546 .paired(true)
1547 .first_segment(true)
1548 .mate_unmapped(true)
1549 .build();
1550 let decoded = DecodedRecord::new(record, key);
1551
1552 let result = grouper.add_records(vec![decoded]);
1554 assert!(result.is_ok());
1555 assert!(!grouper.mc_validated); }
1557
1558 #[test]
1559 fn test_record_position_grouper_mc_validation_fails_without_mc() {
1560 let mut grouper = RecordPositionGrouper::new();
1561 let key = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 12345);
1563 let decoded = make_decoded(key, true, true, None); let result = grouper.add_records(vec![decoded]);
1566 assert!(result.is_err());
1567 let err_msg = result.unwrap_err().to_string();
1568 assert!(err_msg.contains("MC tags"), "Error should mention MC tags: {err_msg}");
1569 }
1570
1571 #[test]
1572 fn test_record_position_grouper_mc_validation_passes_with_mc() {
1573 let mut grouper = RecordPositionGrouper::new();
1574 let key = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 12345);
1575 let decoded = make_decoded(key, true, true, Some("4M"));
1576
1577 let result = grouper.add_records(vec![decoded]);
1578 assert!(result.is_ok());
1579 }
1580
1581 #[test]
1582 fn test_record_position_grouper_mc_validation_skips_unpaired() {
1583 let mut grouper = RecordPositionGrouper::new();
1585 let key = GroupKey::single(0, 100, 0, 0, 0, 12345);
1586 let decoded = make_decoded(key, false, false, None); let result = grouper.add_records(vec![decoded]);
1589 assert!(result.is_ok());
1590 assert!(!grouper.mc_validated); }
1592
1593 #[test]
1594 fn test_record_position_grouper_default_impl() {
1595 let grouper = RecordPositionGrouper::default();
1596 assert!(!grouper.has_pending());
1597 }
1598
1599 #[test]
1604 fn test_build_templates_empty() {
1605 let result = build_templates_from_records(vec![]).unwrap();
1606 assert!(result.is_empty());
1607 }
1608
1609 #[test]
1610 fn test_build_templates_single_record() {
1611 let key = GroupKey::single(0, 100, 0, 0, 0, 12345);
1612 let record = RecordBuilder::new()
1613 .name("read1")
1614 .sequence("ACGT")
1615 .qualities(&[30, 30, 30, 30])
1616 .cigar("4M")
1617 .reference_sequence_id(0)
1618 .alignment_start(100)
1619 .mapping_quality(60)
1620 .paired(true)
1621 .first_segment(true)
1622 .build();
1623 let decoded = DecodedRecord::new(record, key);
1624
1625 let templates = build_templates_from_records(vec![decoded]).unwrap();
1626 assert_eq!(templates.len(), 1);
1627 }
1628
1629 #[test]
1630 fn test_build_templates_paired_same_name_hash() {
1631 let r1_key = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 12345);
1633 let r2_key = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 12345);
1634
1635 let r1 = DecodedRecord::new(
1636 RecordBuilder::new()
1637 .name("read1")
1638 .sequence("ACGT")
1639 .qualities(&[30, 30, 30, 30])
1640 .cigar("4M")
1641 .reference_sequence_id(0)
1642 .alignment_start(100)
1643 .paired(true)
1644 .first_segment(true)
1645 .build(),
1646 r1_key,
1647 );
1648 let r2 = DecodedRecord::new(
1649 RecordBuilder::new()
1650 .name("read1")
1651 .sequence("TGCA")
1652 .qualities(&[30, 30, 30, 30])
1653 .cigar("4M")
1654 .reference_sequence_id(0)
1655 .alignment_start(200)
1656 .paired(true)
1657 .reverse_complement(true)
1658 .build(),
1659 r2_key,
1660 );
1661
1662 let templates = build_templates_from_records(vec![r1, r2]).unwrap();
1663 assert_eq!(templates.len(), 1);
1664 }
1665
1666 #[test]
1667 fn test_build_templates_multiple_qnames() {
1668 let key1 = GroupKey::single(0, 100, 0, 0, 0, 11111);
1670 let key2 = GroupKey::single(0, 100, 0, 0, 0, 22222);
1671 let key3 = GroupKey::single(0, 100, 0, 0, 0, 33333);
1672
1673 let records: Vec<DecodedRecord> = vec![
1674 DecodedRecord::new(
1675 RecordBuilder::new()
1676 .name("readA")
1677 .sequence("ACGT")
1678 .qualities(&[30, 30, 30, 30])
1679 .cigar("4M")
1680 .reference_sequence_id(0)
1681 .alignment_start(100)
1682 .build(),
1683 key1,
1684 ),
1685 DecodedRecord::new(
1686 RecordBuilder::new()
1687 .name("readB")
1688 .sequence("ACGT")
1689 .qualities(&[30, 30, 30, 30])
1690 .cigar("4M")
1691 .reference_sequence_id(0)
1692 .alignment_start(100)
1693 .build(),
1694 key2,
1695 ),
1696 DecodedRecord::new(
1697 RecordBuilder::new()
1698 .name("readC")
1699 .sequence("ACGT")
1700 .qualities(&[30, 30, 30, 30])
1701 .cigar("4M")
1702 .reference_sequence_id(0)
1703 .alignment_start(100)
1704 .build(),
1705 key3,
1706 ),
1707 ];
1708
1709 let templates = build_templates_from_records(records).unwrap();
1710 assert_eq!(templates.len(), 3);
1711 }
1712
1713 #[test]
1714 fn test_build_templates_from_raw_bytes() {
1715 use crate::sort::bam_fields;
1716 use crate::unified_pipeline::{DecodedRecord, GroupKey};
1717
1718 let key = GroupKey::single(0, 100, 0, 0, 0, 12345);
1719 let raw = bam_fields::make_bam_bytes(
1720 0, 100, bam_fields::flags::PAIRED | bam_fields::flags::FIRST_SEGMENT, b"read1", &[bam_fields::encode_op(0, 4)], 4, 0, 200, &[], );
1730 let decoded = DecodedRecord::from_raw_bytes(raw, key);
1731
1732 let templates = build_templates_from_records(vec![decoded]).unwrap();
1733 assert_eq!(templates.len(), 1);
1734 }
1735
1736 #[test]
1737 fn test_build_templates_from_raw_bytes_paired() {
1738 use crate::sort::bam_fields;
1739 use crate::unified_pipeline::{DecodedRecord, GroupKey};
1740
1741 let key1 = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 12345);
1742 let key2 = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 12345);
1743
1744 let r1 = bam_fields::make_bam_bytes(
1745 0,
1746 100,
1747 bam_fields::flags::PAIRED | bam_fields::flags::FIRST_SEGMENT,
1748 b"read1",
1749 &[bam_fields::encode_op(0, 4)],
1750 4,
1751 0,
1752 200,
1753 &[],
1754 );
1755 let r2 = bam_fields::make_bam_bytes(
1756 0,
1757 200,
1758 bam_fields::flags::PAIRED
1759 | bam_fields::flags::LAST_SEGMENT
1760 | bam_fields::flags::REVERSE,
1761 b"read1",
1762 &[bam_fields::encode_op(0, 4)],
1763 4,
1764 0,
1765 100,
1766 &[],
1767 );
1768
1769 let records =
1770 vec![DecodedRecord::from_raw_bytes(r1, key1), DecodedRecord::from_raw_bytes(r2, key2)];
1771
1772 let templates = build_templates_from_records(records).unwrap();
1773 assert_eq!(templates.len(), 1);
1774 }
1775
1776 #[test]
1781 fn test_raw_position_group_batch_weight() {
1782 let key = GroupKey::single(0, 100, 0, 0, 0, 12345);
1783 let records = vec![
1784 make_decoded(GroupKey::single(0, 100, 0, 0, 0, 11111), false, false, None),
1785 make_decoded(GroupKey::single(0, 100, 0, 0, 0, 22222), false, false, None),
1786 make_decoded(GroupKey::single(0, 100, 0, 0, 0, 33333), false, false, None),
1787 make_decoded(GroupKey::single(0, 100, 0, 0, 0, 44444), false, false, None),
1788 ];
1789 let group = RawPositionGroup { group_key: key, records };
1790
1791 assert_eq!(group.batch_weight(), 2);
1793 }
1794
1795 #[test]
1796 fn test_raw_position_group_batch_weight_single() {
1797 let key = GroupKey::single(0, 100, 0, 0, 0, 12345);
1798 let records =
1799 vec![make_decoded(GroupKey::single(0, 100, 0, 0, 0, 11111), false, false, None)];
1800 let group = RawPositionGroup { group_key: key, records };
1801
1802 assert_eq!(group.batch_weight(), 1);
1804 }
1805
1806 #[test]
1807 fn test_raw_position_group_memory_estimate() {
1808 let key = GroupKey::single(0, 100, 0, 0, 0, 12345);
1809 let records =
1810 vec![make_decoded(GroupKey::single(0, 100, 0, 0, 0, 11111), false, false, None)];
1811 let group = RawPositionGroup { group_key: key, records };
1812
1813 assert!(group.estimate_heap_size() > 0);
1815 }
1816}