1use anyhow::{Result, bail};
8use noodles::sam::alignment::record::data::field::Tag;
9use noodles::sam::alignment::record_buf::RecordBuf;
10use std::io;
11
12use crate::unified_pipeline::{BatchWeight, DecodedRecord, MemoryEstimate};
13
14pub struct MiGroupIterator<I>
34where
35 I: Iterator<Item = Result<RecordBuf>>,
36{
37 record_iter: I,
38 tag: Tag,
39 current_mi: Option<String>,
40 current_group: Vec<RecordBuf>,
41 pending_error: Option<anyhow::Error>,
42 done: bool,
43}
44
45impl<I> MiGroupIterator<I>
46where
47 I: Iterator<Item = Result<RecordBuf>>,
48{
49 pub fn new(record_iter: I, tag_name: &str) -> Self {
60 assert!(tag_name.len() == 2, "Tag name must be exactly 2 characters");
61 let tag_bytes = tag_name.as_bytes();
62 let tag = Tag::from([tag_bytes[0], tag_bytes[1]]);
63
64 MiGroupIterator {
65 record_iter,
66 tag,
67 current_mi: None,
68 current_group: Vec::new(),
69 pending_error: None,
70 done: false,
71 }
72 }
73
74 fn get_mi(&self, record: &RecordBuf) -> Result<Option<String>> {
76 if let Some(tag_value) = record.data().get(&self.tag) {
77 match tag_value {
78 noodles::sam::alignment::record_buf::data::field::Value::String(s) => {
79 Ok(Some(s.to_string()))
80 }
81 _ => {
82 bail!("MI tag must be a string value");
83 }
84 }
85 } else {
86 Ok(None)
87 }
88 }
89}
90
91impl<I> Iterator for MiGroupIterator<I>
92where
93 I: Iterator<Item = Result<RecordBuf>>,
94{
95 type Item = Result<(String, Vec<RecordBuf>)>;
97
98 fn next(&mut self) -> Option<Self::Item> {
105 if self.done {
106 return None;
107 }
108
109 if let Some(e) = self.pending_error.take() {
111 self.done = true;
112 return Some(Err(e));
113 }
114
115 loop {
116 match self.record_iter.next() {
117 None => {
118 self.done = true;
120 if self.current_group.is_empty() {
121 return None;
122 }
123 let mi = self.current_mi.take().unwrap_or_default();
124 let group = std::mem::take(&mut self.current_group);
125 return Some(Ok((mi, group)));
126 }
127 Some(Err(e)) => {
128 if !self.current_group.is_empty() {
130 self.pending_error = Some(e);
131 let mi = self.current_mi.take().unwrap_or_default();
132 let group = std::mem::take(&mut self.current_group);
133 return Some(Ok((mi, group)));
134 }
135 self.done = true;
136 return Some(Err(e));
137 }
138 Some(Ok(record)) => {
139 let mi = match self.get_mi(&record) {
141 Ok(Some(mi)) => mi,
142 Ok(None) => {
143 continue;
145 }
146 Err(e) => {
147 if !self.current_group.is_empty() {
149 self.pending_error = Some(e);
150 let mi = self.current_mi.take().unwrap_or_default();
151 let group = std::mem::take(&mut self.current_group);
152 return Some(Ok((mi, group)));
153 }
154 self.done = true;
155 return Some(Err(e));
156 }
157 };
158
159 if self.current_group.is_empty() {
160 self.current_mi = Some(mi);
162 self.current_group.push(record);
163 } else if self.current_mi.as_ref() == Some(&mi) {
164 self.current_group.push(record);
166 } else {
167 let old_mi = self.current_mi.take().unwrap_or_default();
169 let group = std::mem::take(&mut self.current_group);
170 self.current_mi = Some(mi);
171 self.current_group.push(record);
172 return Some(Ok((old_mi, group)));
173 }
174 }
175 }
176 }
177 }
178}
179
180pub struct MiGroupIteratorWithTransform<I, F>
205where
206 I: Iterator<Item = Result<RecordBuf>>,
207 F: Fn(&str) -> String,
208{
209 record_iter: I,
210 tag: Tag,
211 key_transform: F,
212 current_key: Option<String>,
213 current_group: Vec<RecordBuf>,
214 pending_error: Option<anyhow::Error>,
215 done: bool,
216}
217
218impl<I, F> MiGroupIteratorWithTransform<I, F>
219where
220 I: Iterator<Item = Result<RecordBuf>>,
221 F: Fn(&str) -> String,
222{
223 pub fn new(record_iter: I, tag_name: &str, key_transform: F) -> Self {
235 assert!(tag_name.len() == 2, "Tag name must be exactly 2 characters");
236 let tag_bytes = tag_name.as_bytes();
237 let tag = Tag::from([tag_bytes[0], tag_bytes[1]]);
238
239 MiGroupIteratorWithTransform {
240 record_iter,
241 tag,
242 key_transform,
243 current_key: None,
244 current_group: Vec::new(),
245 pending_error: None,
246 done: false,
247 }
248 }
249
250 fn get_key(&self, record: &RecordBuf) -> Result<Option<String>> {
252 if let Some(tag_value) = record.data().get(&self.tag) {
253 match tag_value {
254 noodles::sam::alignment::record_buf::data::field::Value::String(s) => {
255 let raw = s.to_string();
256 Ok(Some((self.key_transform)(&raw)))
257 }
258 _ => {
259 bail!("Tag must be a string value");
260 }
261 }
262 } else {
263 Ok(None)
264 }
265 }
266}
267
268impl<I, F> Iterator for MiGroupIteratorWithTransform<I, F>
269where
270 I: Iterator<Item = Result<RecordBuf>>,
271 F: Fn(&str) -> String,
272{
273 type Item = Result<(String, Vec<RecordBuf>)>;
275
276 fn next(&mut self) -> Option<Self::Item> {
283 if self.done {
284 return None;
285 }
286
287 if let Some(e) = self.pending_error.take() {
289 self.done = true;
290 return Some(Err(e));
291 }
292
293 loop {
294 match self.record_iter.next() {
295 None => {
296 self.done = true;
298 if self.current_group.is_empty() {
299 return None;
300 }
301 let key = self.current_key.take().unwrap_or_default();
302 let group = std::mem::take(&mut self.current_group);
303 return Some(Ok((key, group)));
304 }
305 Some(Err(e)) => {
306 if !self.current_group.is_empty() {
308 self.pending_error = Some(e);
309 let key = self.current_key.take().unwrap_or_default();
310 let group = std::mem::take(&mut self.current_group);
311 return Some(Ok((key, group)));
312 }
313 self.done = true;
314 return Some(Err(e));
315 }
316 Some(Ok(record)) => {
317 let key = match self.get_key(&record) {
319 Ok(Some(key)) => key,
320 Ok(None) => {
321 continue;
323 }
324 Err(e) => {
325 if !self.current_group.is_empty() {
327 self.pending_error = Some(e);
328 let key = self.current_key.take().unwrap_or_default();
329 let group = std::mem::take(&mut self.current_group);
330 return Some(Ok((key, group)));
331 }
332 self.done = true;
333 return Some(Err(e));
334 }
335 };
336
337 if self.current_group.is_empty() {
338 self.current_key = Some(key);
340 self.current_group.push(record);
341 } else if self.current_key.as_ref() == Some(&key) {
342 self.current_group.push(record);
344 } else {
345 let old_key = self.current_key.take().unwrap_or_default();
347 let group = std::mem::take(&mut self.current_group);
348 self.current_key = Some(key);
349 self.current_group.push(record);
350 return Some(Ok((old_key, group)));
351 }
352 }
353 }
354 }
355 }
356}
357
358#[derive(Debug, Clone)]
367pub struct MiGroup {
368 pub mi: String,
370 pub records: Vec<RecordBuf>,
372}
373
374impl MiGroup {
375 #[must_use]
377 pub fn new(mi: String, records: Vec<RecordBuf>) -> Self {
378 Self { mi, records }
379 }
380}
381
382impl BatchWeight for MiGroup {
383 fn batch_weight(&self) -> usize {
384 self.records.len()
385 }
386}
387
388impl MemoryEstimate for MiGroup {
389 fn estimate_heap_size(&self) -> usize {
390 let mi_size = self.mi.capacity();
392
393 let records_size: usize = self.records.iter().map(MemoryEstimate::estimate_heap_size).sum();
395 let records_vec_overhead = self.records.capacity() * std::mem::size_of::<RecordBuf>();
396
397 mi_size + records_size + records_vec_overhead
398 }
399}
400
401impl MemoryEstimate for MiGroupBatch {
402 fn estimate_heap_size(&self) -> usize {
403 let groups_size: usize = self.groups.iter().map(MemoryEstimate::estimate_heap_size).sum();
404 let groups_vec_overhead = self.groups.capacity() * std::mem::size_of::<MiGroup>();
405 groups_size + groups_vec_overhead
406 }
407}
408
409#[derive(Default)]
418pub struct MiGroupBatch {
419 pub groups: Vec<MiGroup>,
421}
422
423impl MiGroupBatch {
424 #[must_use]
426 pub fn new() -> Self {
427 Self { groups: Vec::new() }
428 }
429
430 #[must_use]
432 pub fn with_capacity(capacity: usize) -> Self {
433 Self { groups: Vec::with_capacity(capacity) }
434 }
435
436 #[must_use]
438 pub fn len(&self) -> usize {
439 self.groups.len()
440 }
441
442 #[must_use]
444 pub fn is_empty(&self) -> bool {
445 self.groups.is_empty()
446 }
447
448 pub fn clear(&mut self) {
450 self.groups.clear();
451 }
452}
453
454impl BatchWeight for MiGroupBatch {
455 fn batch_weight(&self) -> usize {
456 self.groups.iter().map(|g| g.records.len()).sum()
457 }
458}
459
460use crate::unified_pipeline::Grouper;
465use std::collections::VecDeque;
466
467type RecordFilterFn = Box<dyn Fn(&RecordBuf) -> bool + Send + Sync>;
469
470type MiTransformFn = Box<dyn Fn(&str) -> String + Send + Sync>;
472
473pub struct MiGrouper {
504 tag: Tag,
506 batch_size: usize,
508 current_mi: Option<String>,
510 current_records: Vec<RecordBuf>,
512 pending_groups: VecDeque<MiGroup>,
514 finished: bool,
516 record_filter: Option<RecordFilterFn>,
518 mi_transform: Option<MiTransformFn>,
520}
521
522impl MiGrouper {
523 #[must_use]
533 pub fn new(tag_name: &str, batch_size: usize) -> Self {
534 assert!(tag_name.len() == 2, "Tag name must be exactly 2 characters");
535 let tag_bytes = tag_name.as_bytes();
536 let tag = Tag::from([tag_bytes[0], tag_bytes[1]]);
537
538 Self {
539 tag,
540 batch_size: batch_size.max(1),
541 current_mi: None,
542 current_records: Vec::new(),
543 pending_groups: VecDeque::new(),
544 finished: false,
545 record_filter: None,
546 mi_transform: None,
547 }
548 }
549
550 pub fn with_filter_and_transform<F, T>(
566 tag_name: &str,
567 batch_size: usize,
568 record_filter: F,
569 mi_transform: T,
570 ) -> Self
571 where
572 F: Fn(&RecordBuf) -> bool + Send + Sync + 'static,
573 T: Fn(&str) -> String + Send + Sync + 'static,
574 {
575 assert!(tag_name.len() == 2, "Tag name must be exactly 2 characters");
576 let tag_bytes = tag_name.as_bytes();
577 let tag = Tag::from([tag_bytes[0], tag_bytes[1]]);
578
579 Self {
580 tag,
581 batch_size: batch_size.max(1),
582 current_mi: None,
583 current_records: Vec::new(),
584 pending_groups: VecDeque::new(),
585 finished: false,
586 record_filter: Some(Box::new(record_filter)),
587 mi_transform: Some(Box::new(mi_transform)),
588 }
589 }
590
591 fn get_mi_tag(&self, record: &RecordBuf) -> Option<String> {
593 record.data().get(&self.tag).and_then(|v| match v {
594 noodles::sam::alignment::record_buf::data::field::Value::String(s) => {
595 let raw = s.to_string();
596 if let Some(ref transform) = self.mi_transform {
598 Some(transform(&raw))
599 } else {
600 Some(raw)
601 }
602 }
603 _ => None,
604 })
605 }
606
607 fn should_keep(&self, record: &RecordBuf) -> bool {
609 match &self.record_filter {
610 Some(filter) => filter(record),
611 None => true,
612 }
613 }
614
615 fn flush_current_group(&mut self) {
617 if let Some(mi) = self.current_mi.take() {
618 if !self.current_records.is_empty() {
619 let records = std::mem::take(&mut self.current_records);
620 self.pending_groups.push_back(MiGroup::new(mi, records));
621 }
622 }
623 }
624
625 fn drain_batches(&mut self) -> Vec<MiGroupBatch> {
627 let mut batches = Vec::new();
628 while self.pending_groups.len() >= self.batch_size {
629 let groups: Vec<MiGroup> = self.pending_groups.drain(..self.batch_size).collect();
630 batches.push(MiGroupBatch { groups });
631 }
632 batches
633 }
634}
635
636impl Grouper for MiGrouper {
637 type Group = MiGroupBatch;
638
639 fn add_records(&mut self, records: Vec<DecodedRecord>) -> io::Result<Vec<Self::Group>> {
640 for decoded in records {
641 let record = decoded.into_record().ok_or_else(|| {
642 io::Error::new(io::ErrorKind::InvalidData, "MiGrouper requires parsed records")
643 })?;
644 if !self.should_keep(&record) {
646 continue;
647 }
648
649 let mi = self.get_mi_tag(&record).unwrap_or_default();
651
652 match &self.current_mi {
654 Some(current) if current == &mi => {
655 self.current_records.push(record);
657 }
658 Some(_) => {
659 self.flush_current_group();
661 self.current_mi = Some(mi);
662 self.current_records.push(record);
663 }
664 None => {
665 self.current_mi = Some(mi);
667 self.current_records.push(record);
668 }
669 }
670 }
671
672 Ok(self.drain_batches())
674 }
675
676 fn finish(&mut self) -> io::Result<Option<Self::Group>> {
677 if self.finished {
678 return Ok(None);
679 }
680 self.finished = true;
681
682 self.flush_current_group();
684
685 if self.pending_groups.is_empty() {
687 Ok(None)
688 } else {
689 let groups: Vec<MiGroup> = self.pending_groups.drain(..).collect();
690 Ok(Some(MiGroupBatch { groups }))
691 }
692 }
693
694 fn has_pending(&self) -> bool {
695 !self.pending_groups.is_empty() || self.current_mi.is_some()
696 }
697}
698
699#[derive(Debug, Clone)]
705pub struct RawMiGroup {
706 pub mi: String,
708 pub records: Vec<Vec<u8>>,
710}
711
712impl RawMiGroup {
713 #[must_use]
715 pub fn new(mi: String, records: Vec<Vec<u8>>) -> Self {
716 Self { mi, records }
717 }
718}
719
720impl BatchWeight for RawMiGroup {
721 fn batch_weight(&self) -> usize {
722 self.records.len()
723 }
724}
725
726impl MemoryEstimate for RawMiGroup {
727 fn estimate_heap_size(&self) -> usize {
728 let mi_size = self.mi.capacity();
729 let records_size: usize = self.records.iter().map(std::vec::Vec::capacity).sum();
730 let records_vec_overhead = self.records.capacity() * std::mem::size_of::<Vec<u8>>();
731 mi_size + records_size + records_vec_overhead
732 }
733}
734
735#[derive(Default)]
737pub struct RawMiGroupBatch {
738 pub groups: Vec<RawMiGroup>,
740}
741
742impl RawMiGroupBatch {
743 #[must_use]
745 pub fn new() -> Self {
746 Self { groups: Vec::new() }
747 }
748}
749
750impl BatchWeight for RawMiGroupBatch {
751 fn batch_weight(&self) -> usize {
752 self.groups.iter().map(|g| g.records.len()).sum()
753 }
754}
755
756impl MemoryEstimate for RawMiGroupBatch {
757 fn estimate_heap_size(&self) -> usize {
758 let groups_size: usize = self.groups.iter().map(MemoryEstimate::estimate_heap_size).sum();
759 let groups_vec_overhead = self.groups.capacity() * std::mem::size_of::<RawMiGroup>();
760 groups_size + groups_vec_overhead
761 }
762}
763
764type RawMiTransformFn = Box<dyn Fn(&[u8]) -> String + Send + Sync>;
766
767type RawRecordFilterFn = Box<dyn Fn(&[u8]) -> bool + Send + Sync>;
769
770pub struct RawMiGrouper {
776 tag: [u8; 2],
778 batch_size: usize,
780 current_mi: Option<String>,
782 current_records: Vec<Vec<u8>>,
784 pending_groups: VecDeque<RawMiGroup>,
786 finished: bool,
788 mi_transform: Option<RawMiTransformFn>,
790 record_filter: Option<RawRecordFilterFn>,
792}
793
794impl RawMiGrouper {
795 #[must_use]
801 pub fn new(tag_name: &str, batch_size: usize) -> Self {
802 assert!(tag_name.len() == 2, "Tag name must be exactly 2 characters");
803 let tag_bytes = tag_name.as_bytes();
804
805 Self {
806 tag: [tag_bytes[0], tag_bytes[1]],
807 batch_size: batch_size.max(1),
808 current_mi: None,
809 current_records: Vec::new(),
810 pending_groups: VecDeque::new(),
811 finished: false,
812 mi_transform: None,
813 record_filter: None,
814 }
815 }
816
817 pub fn with_filter_and_transform<F, T>(
823 tag_name: &str,
824 batch_size: usize,
825 record_filter: F,
826 mi_transform: T,
827 ) -> Self
828 where
829 F: Fn(&[u8]) -> bool + Send + Sync + 'static,
830 T: Fn(&[u8]) -> String + Send + Sync + 'static,
831 {
832 assert!(tag_name.len() == 2, "Tag name must be exactly 2 characters");
833 let tag_bytes = tag_name.as_bytes();
834
835 Self {
836 tag: [tag_bytes[0], tag_bytes[1]],
837 batch_size: batch_size.max(1),
838 current_mi: None,
839 current_records: Vec::new(),
840 pending_groups: VecDeque::new(),
841 finished: false,
842 mi_transform: Some(Box::new(mi_transform)),
843 record_filter: Some(Box::new(record_filter)),
844 }
845 }
846
847 fn get_mi_tag(&self, bam: &[u8]) -> Option<String> {
849 use crate::sort::bam_fields;
850 let value = bam_fields::find_string_tag_in_record(bam, &self.tag)?;
851 if let Some(ref transform) = self.mi_transform {
852 Some(transform(value))
853 } else {
854 Some(String::from_utf8_lossy(value).into_owned())
855 }
856 }
857
858 fn should_keep(&self, bam: &[u8]) -> bool {
860 match &self.record_filter {
861 Some(filter) => filter(bam),
862 None => true,
863 }
864 }
865
866 fn flush_current_group(&mut self) {
868 if let Some(mi) = self.current_mi.take() {
869 if !self.current_records.is_empty() {
870 let records = std::mem::take(&mut self.current_records);
871 self.pending_groups.push_back(RawMiGroup::new(mi, records));
872 }
873 }
874 }
875
876 fn drain_batches(&mut self) -> Vec<RawMiGroupBatch> {
878 let mut batches = Vec::new();
879 while self.pending_groups.len() >= self.batch_size {
880 let groups: Vec<RawMiGroup> = self.pending_groups.drain(..self.batch_size).collect();
881 batches.push(RawMiGroupBatch { groups });
882 }
883 batches
884 }
885}
886
887impl Grouper for RawMiGrouper {
888 type Group = RawMiGroupBatch;
889
890 fn add_records(&mut self, records: Vec<DecodedRecord>) -> io::Result<Vec<Self::Group>> {
891 for decoded in records {
892 let raw = decoded.into_raw_bytes().ok_or_else(|| {
893 io::Error::new(io::ErrorKind::InvalidData, "RawMiGrouper requires raw byte records")
894 })?;
895
896 if !self.should_keep(&raw) {
898 continue;
899 }
900
901 let mi = self.get_mi_tag(&raw).unwrap_or_default();
903
904 match &self.current_mi {
906 Some(current) if current == &mi => {
907 self.current_records.push(raw);
908 }
909 Some(_) => {
910 self.flush_current_group();
911 self.current_mi = Some(mi);
912 self.current_records.push(raw);
913 }
914 None => {
915 self.current_mi = Some(mi);
916 self.current_records.push(raw);
917 }
918 }
919 }
920
921 Ok(self.drain_batches())
922 }
923
924 fn finish(&mut self) -> io::Result<Option<Self::Group>> {
925 if self.finished {
926 return Ok(None);
927 }
928 self.finished = true;
929
930 self.flush_current_group();
931
932 if self.pending_groups.is_empty() {
933 Ok(None)
934 } else {
935 let groups: Vec<RawMiGroup> = self.pending_groups.drain(..).collect();
936 Ok(Some(RawMiGroupBatch { groups }))
937 }
938 }
939
940 fn has_pending(&self) -> bool {
941 !self.pending_groups.is_empty() || self.current_mi.is_some()
942 }
943}
944
945#[allow(clippy::type_complexity)]
949pub struct RawMiGroupIterator<I>
950where
951 I: Iterator<Item = Result<Vec<u8>>>,
952{
953 record_iter: I,
954 tag: [u8; 2],
955 cell_tag: Option<[u8; 2]>,
957 current_mi: Option<String>,
958 current_group: Vec<Vec<u8>>,
959 done: bool,
960 pending_error: Option<anyhow::Error>,
962 mi_transform: Option<Box<dyn Fn(&[u8]) -> String>>,
964}
965
966impl<I> RawMiGroupIterator<I>
967where
968 I: Iterator<Item = Result<Vec<u8>>>,
969{
970 pub fn new(record_iter: I, tag_name: &str) -> Self {
976 assert!(tag_name.len() == 2, "Tag name must be exactly 2 characters");
977 let tag_bytes = tag_name.as_bytes();
978 Self {
979 record_iter,
980 tag: [tag_bytes[0], tag_bytes[1]],
981 cell_tag: None,
982 current_mi: None,
983 current_group: Vec::new(),
984 done: false,
985 pending_error: None,
986 mi_transform: None,
987 }
988 }
989
990 pub fn with_transform<F>(record_iter: I, tag_name: &str, mi_transform: F) -> Self
996 where
997 F: Fn(&[u8]) -> String + 'static,
998 {
999 assert!(tag_name.len() == 2, "Tag name must be exactly 2 characters");
1000 let tag_bytes = tag_name.as_bytes();
1001 Self {
1002 record_iter,
1003 tag: [tag_bytes[0], tag_bytes[1]],
1004 cell_tag: None,
1005 current_mi: None,
1006 current_group: Vec::new(),
1007 done: false,
1008 pending_error: None,
1009 mi_transform: Some(Box::new(mi_transform)),
1010 }
1011 }
1012
1013 #[must_use]
1018 pub fn with_cell_tag(mut self, cell_tag: Option<[u8; 2]>) -> Self {
1019 self.cell_tag = cell_tag;
1020 self
1021 }
1022
1023 fn get_mi(&self, bam: &[u8]) -> Option<String> {
1028 use crate::sort::bam_fields;
1029 let value = bam_fields::find_string_tag_in_record(bam, &self.tag)?;
1030 let mut key = if let Some(ref transform) = self.mi_transform {
1031 transform(value)
1032 } else {
1033 String::from_utf8_lossy(value).into_owned()
1034 };
1035 if let Some(ct) = &self.cell_tag {
1036 key.push('\t');
1037 if let Some(cell_value) = bam_fields::find_string_tag_in_record(bam, ct) {
1038 key.push_str(&String::from_utf8_lossy(cell_value));
1039 }
1040 }
1041 Some(key)
1042 }
1043}
1044
1045impl<I> Iterator for RawMiGroupIterator<I>
1046where
1047 I: Iterator<Item = Result<Vec<u8>>>,
1048{
1049 type Item = Result<(String, Vec<Vec<u8>>)>;
1050
1051 fn next(&mut self) -> Option<Self::Item> {
1052 if self.done {
1053 return None;
1054 }
1055
1056 if let Some(e) = self.pending_error.take() {
1058 self.done = true;
1059 return Some(Err(e));
1060 }
1061
1062 loop {
1063 match self.record_iter.next() {
1064 None => {
1065 self.done = true;
1066 if self.current_group.is_empty() {
1067 return None;
1068 }
1069 let mi = self.current_mi.take().unwrap_or_default();
1070 let group = std::mem::take(&mut self.current_group);
1071 return Some(Ok((mi, group)));
1072 }
1073 Some(Err(e)) => {
1074 if !self.current_group.is_empty() {
1075 self.pending_error = Some(e);
1076 let mi = self.current_mi.take().unwrap_or_default();
1077 let group = std::mem::take(&mut self.current_group);
1078 return Some(Ok((mi, group)));
1079 }
1080 self.done = true;
1081 return Some(Err(e));
1082 }
1083 Some(Ok(raw)) => {
1084 let Some(mi) = self.get_mi(&raw) else {
1085 continue;
1086 };
1087
1088 if self.current_group.is_empty() {
1089 self.current_mi = Some(mi);
1090 self.current_group.push(raw);
1091 } else if self.current_mi.as_ref() == Some(&mi) {
1092 self.current_group.push(raw);
1093 } else {
1094 let old_mi = self.current_mi.take().unwrap_or_default();
1095 let group = std::mem::take(&mut self.current_group);
1096 self.current_mi = Some(mi);
1097 self.current_group.push(raw);
1098 return Some(Ok((old_mi, group)));
1099 }
1100 }
1101 }
1102 }
1103 }
1104}
1105
1106#[cfg(test)]
1107#[allow(clippy::similar_names)]
1108mod tests {
1109 use super::*;
1110 use crate::sam::builder::RecordBuilder;
1111 use crate::umi::extract_mi_base;
1112
1113 fn create_record_with_mi(mi: &str) -> RecordBuf {
1114 RecordBuilder::new()
1115 .sequence("ACGT") .tag("MI", mi)
1117 .build()
1118 }
1119
1120 fn create_record_without_mi() -> RecordBuf {
1121 RecordBuilder::new()
1122 .sequence("ACGT") .build()
1124 }
1125
1126 #[test]
1127 fn test_empty_iterator() {
1128 let records: Vec<Result<RecordBuf>> = vec![];
1129 let mut iter = MiGroupIterator::new(records.into_iter(), "MI");
1130 assert!(iter.next().is_none());
1131 }
1132
1133 #[test]
1134 fn test_single_group() {
1135 let records: Vec<Result<RecordBuf>> = vec![
1136 Ok(create_record_with_mi("0")),
1137 Ok(create_record_with_mi("0")),
1138 Ok(create_record_with_mi("0")),
1139 ];
1140 let mut iter = MiGroupIterator::new(records.into_iter(), "MI");
1141
1142 let result = iter.next().unwrap().unwrap();
1143 assert_eq!(result.0, "0");
1144 assert_eq!(result.1.len(), 3);
1145
1146 assert!(iter.next().is_none());
1147 }
1148
1149 #[test]
1150 fn test_multiple_groups() {
1151 let records: Vec<Result<RecordBuf>> = vec![
1152 Ok(create_record_with_mi("0")),
1153 Ok(create_record_with_mi("0")),
1154 Ok(create_record_with_mi("1")),
1155 Ok(create_record_with_mi("1")),
1156 Ok(create_record_with_mi("1")),
1157 Ok(create_record_with_mi("2")),
1158 ];
1159 let mut iter = MiGroupIterator::new(records.into_iter(), "MI");
1160
1161 let result = iter.next().unwrap().unwrap();
1162 assert_eq!(result.0, "0");
1163 assert_eq!(result.1.len(), 2);
1164
1165 let result = iter.next().unwrap().unwrap();
1166 assert_eq!(result.0, "1");
1167 assert_eq!(result.1.len(), 3);
1168
1169 let result = iter.next().unwrap().unwrap();
1170 assert_eq!(result.0, "2");
1171 assert_eq!(result.1.len(), 1);
1172
1173 assert!(iter.next().is_none());
1174 }
1175
1176 #[test]
1177 fn test_skips_records_without_mi_tag() {
1178 let records: Vec<Result<RecordBuf>> = vec![
1179 Ok(create_record_with_mi("0")),
1180 Ok(create_record_without_mi()),
1181 Ok(create_record_with_mi("0")),
1182 Ok(create_record_without_mi()),
1183 Ok(create_record_with_mi("1")),
1184 ];
1185 let mut iter = MiGroupIterator::new(records.into_iter(), "MI");
1186
1187 let result = iter.next().unwrap().unwrap();
1188 assert_eq!(result.0, "0");
1189 assert_eq!(result.1.len(), 2); let result = iter.next().unwrap().unwrap();
1192 assert_eq!(result.0, "1");
1193 assert_eq!(result.1.len(), 1);
1194
1195 assert!(iter.next().is_none());
1196 }
1197
1198 #[test]
1199 fn test_error_propagation() {
1200 let records: Vec<Result<RecordBuf>> = vec![
1201 Ok(create_record_with_mi("0")),
1202 Err(anyhow::anyhow!("test error")),
1203 Ok(create_record_with_mi("1")),
1204 ];
1205 let mut iter = MiGroupIterator::new(records.into_iter(), "MI");
1206
1207 let result = iter.next().unwrap().unwrap();
1209 assert_eq!(result.0, "0");
1210 assert_eq!(result.1.len(), 1);
1211
1212 let result = iter.next().unwrap();
1214 assert!(result.is_err());
1215
1216 assert!(iter.next().is_none());
1218 }
1219
1220 #[test]
1221 fn test_custom_tag() {
1222 let record1 = RecordBuilder::new().sequence("ACGT").tag("RX", "ACGT").build();
1223
1224 let record2 = RecordBuilder::new().sequence("ACGT").tag("RX", "ACGT").build();
1225
1226 let records: Vec<Result<RecordBuf>> = vec![Ok(record1), Ok(record2)];
1227 let mut iter = MiGroupIterator::new(records.into_iter(), "RX");
1228
1229 let result = iter.next().unwrap().unwrap();
1230 assert_eq!(result.0, "ACGT");
1231 assert_eq!(result.1.len(), 2);
1232
1233 assert!(iter.next().is_none());
1234 }
1235
1236 #[test]
1237 #[should_panic(expected = "Tag name must be exactly 2 characters")]
1238 fn test_invalid_tag_length() {
1239 let records: Vec<Result<RecordBuf>> = vec![];
1240 let _ = MiGroupIterator::new(records.into_iter(), "M");
1241 }
1242
1243 #[test]
1245 fn test_transform_groups_by_base_mi() {
1246 let records: Vec<Result<RecordBuf>> = vec![
1248 Ok(create_record_with_mi("1/A")),
1249 Ok(create_record_with_mi("1/A")),
1250 Ok(create_record_with_mi("1/B")),
1251 Ok(create_record_with_mi("1/B")),
1252 Ok(create_record_with_mi("2/A")),
1253 Ok(create_record_with_mi("2/B")),
1254 ];
1255 let mut iter = MiGroupIteratorWithTransform::new(records.into_iter(), "MI", |mi| {
1256 extract_mi_base(mi).to_string()
1257 });
1258
1259 let result = iter.next().unwrap().unwrap();
1261 assert_eq!(result.0, "1");
1262 assert_eq!(result.1.len(), 4);
1263
1264 let result = iter.next().unwrap().unwrap();
1266 assert_eq!(result.0, "2");
1267 assert_eq!(result.1.len(), 2);
1268
1269 assert!(iter.next().is_none());
1270 }
1271
1272 #[test]
1273 fn test_transform_empty_iterator() {
1274 let records: Vec<Result<RecordBuf>> = vec![];
1275 let mut iter = MiGroupIteratorWithTransform::new(records.into_iter(), "MI", |mi| {
1276 extract_mi_base(mi).to_string()
1277 });
1278 assert!(iter.next().is_none());
1279 }
1280
1281 #[test]
1282 fn test_transform_single_group() {
1283 let records: Vec<Result<RecordBuf>> = vec![
1284 Ok(create_record_with_mi("0/A")),
1285 Ok(create_record_with_mi("0/B")),
1286 Ok(create_record_with_mi("0/A")),
1287 ];
1288 let mut iter = MiGroupIteratorWithTransform::new(records.into_iter(), "MI", |mi| {
1289 extract_mi_base(mi).to_string()
1290 });
1291
1292 let result = iter.next().unwrap().unwrap();
1293 assert_eq!(result.0, "0");
1294 assert_eq!(result.1.len(), 3);
1295
1296 assert!(iter.next().is_none());
1297 }
1298
1299 #[test]
1300 fn test_transform_error_propagation() {
1301 let records: Vec<Result<RecordBuf>> = vec![
1302 Ok(create_record_with_mi("0/A")),
1303 Err(anyhow::anyhow!("test error")),
1304 Ok(create_record_with_mi("1/B")),
1305 ];
1306 let mut iter = MiGroupIteratorWithTransform::new(records.into_iter(), "MI", |mi| {
1307 extract_mi_base(mi).to_string()
1308 });
1309
1310 let result = iter.next().unwrap().unwrap();
1312 assert_eq!(result.0, "0");
1313 assert_eq!(result.1.len(), 1);
1314
1315 let result = iter.next().unwrap();
1317 assert!(result.is_err());
1318
1319 assert!(iter.next().is_none());
1321 }
1322
1323 #[test]
1324 fn test_transform_custom_function() {
1325 let records: Vec<Result<RecordBuf>> = vec![
1327 Ok(create_record_with_mi("abc")),
1328 Ok(create_record_with_mi("ABC")),
1329 Ok(create_record_with_mi("Abc")),
1330 ];
1331 let mut iter =
1332 MiGroupIteratorWithTransform::new(records.into_iter(), "MI", str::to_uppercase);
1333
1334 let result = iter.next().unwrap().unwrap();
1336 assert_eq!(result.0, "ABC");
1337 assert_eq!(result.1.len(), 3);
1338
1339 assert!(iter.next().is_none());
1340 }
1341
1342 #[allow(clippy::cast_possible_truncation)]
1366 fn make_raw_bam_with_tag(tag_name: &str, tag_value: &str) -> Vec<u8> {
1367 let name = b"read";
1368 let l_read_name: u8 = (name.len() + 1) as u8; let seq_len: u32 = 4; let seq_bytes = seq_len.div_ceil(2) as usize;
1371
1372 let tag_bytes = tag_name.as_bytes();
1374 let aux: Vec<u8> =
1375 [&[tag_bytes[0], tag_bytes[1], b'Z'], tag_value.as_bytes(), &[0u8]].concat();
1376
1377 let total = 32 + l_read_name as usize + seq_bytes + seq_len as usize + aux.len();
1378 let mut buf = vec![0u8; total];
1379
1380 buf[0..4].copy_from_slice(&(-1i32).to_le_bytes());
1382 buf[4..8].copy_from_slice(&(-1i32).to_le_bytes());
1384 buf[8] = l_read_name;
1386 buf[12..14].copy_from_slice(&0u16.to_le_bytes());
1388 buf[16..20].copy_from_slice(&seq_len.to_le_bytes());
1390 buf[20..24].copy_from_slice(&(-1i32).to_le_bytes());
1392 buf[24..28].copy_from_slice(&(-1i32).to_le_bytes());
1394
1395 let name_start = 32;
1397 buf[name_start..name_start + name.len()].copy_from_slice(name);
1398 buf[name_start + name.len()] = 0;
1399
1400 let aux_start = 32 + l_read_name as usize + seq_bytes + seq_len as usize;
1404 buf[aux_start..aux_start + aux.len()].copy_from_slice(&aux);
1405
1406 buf
1407 }
1408
1409 #[allow(clippy::cast_possible_truncation)]
1411 fn make_raw_bam_without_tag() -> Vec<u8> {
1412 let name = b"read";
1413 let l_read_name: u8 = (name.len() + 1) as u8;
1414 let seq_len: u32 = 4;
1415 let seq_bytes = seq_len.div_ceil(2) as usize;
1416
1417 let total = 32 + l_read_name as usize + seq_bytes + seq_len as usize;
1418 let mut buf = vec![0u8; total];
1419
1420 buf[0..4].copy_from_slice(&(-1i32).to_le_bytes());
1421 buf[4..8].copy_from_slice(&(-1i32).to_le_bytes());
1422 buf[8] = l_read_name;
1423 buf[12..14].copy_from_slice(&0u16.to_le_bytes());
1424 buf[16..20].copy_from_slice(&seq_len.to_le_bytes());
1425 buf[20..24].copy_from_slice(&(-1i32).to_le_bytes());
1426 buf[24..28].copy_from_slice(&(-1i32).to_le_bytes());
1427
1428 let name_start = 32;
1429 buf[name_start..name_start + name.len()].copy_from_slice(name);
1430 buf[name_start + name.len()] = 0;
1431
1432 buf
1433 }
1434
1435 #[test]
1440 fn test_raw_empty_iterator() {
1441 let records: Vec<Result<Vec<u8>>> = vec![];
1442 let mut iter = RawMiGroupIterator::new(records.into_iter(), "MI");
1443 assert!(iter.next().is_none());
1444 }
1445
1446 #[test]
1447 fn test_raw_single_group() {
1448 let records: Vec<Result<Vec<u8>>> = vec![
1449 Ok(make_raw_bam_with_tag("MI", "0")),
1450 Ok(make_raw_bam_with_tag("MI", "0")),
1451 Ok(make_raw_bam_with_tag("MI", "0")),
1452 ];
1453 let mut iter = RawMiGroupIterator::new(records.into_iter(), "MI");
1454
1455 let result = iter.next().unwrap().unwrap();
1456 assert_eq!(result.0, "0");
1457 assert_eq!(result.1.len(), 3);
1458
1459 assert!(iter.next().is_none());
1460 }
1461
1462 #[test]
1463 fn test_raw_multiple_groups() {
1464 let records: Vec<Result<Vec<u8>>> = vec![
1465 Ok(make_raw_bam_with_tag("MI", "0")),
1466 Ok(make_raw_bam_with_tag("MI", "0")),
1467 Ok(make_raw_bam_with_tag("MI", "1")),
1468 Ok(make_raw_bam_with_tag("MI", "1")),
1469 Ok(make_raw_bam_with_tag("MI", "1")),
1470 Ok(make_raw_bam_with_tag("MI", "2")),
1471 ];
1472 let mut iter = RawMiGroupIterator::new(records.into_iter(), "MI");
1473
1474 let result = iter.next().unwrap().unwrap();
1475 assert_eq!(result.0, "0");
1476 assert_eq!(result.1.len(), 2);
1477
1478 let result = iter.next().unwrap().unwrap();
1479 assert_eq!(result.0, "1");
1480 assert_eq!(result.1.len(), 3);
1481
1482 let result = iter.next().unwrap().unwrap();
1483 assert_eq!(result.0, "2");
1484 assert_eq!(result.1.len(), 1);
1485
1486 assert!(iter.next().is_none());
1487 }
1488
1489 #[test]
1490 fn test_raw_skips_records_without_mi_tag() {
1491 let records: Vec<Result<Vec<u8>>> = vec![
1492 Ok(make_raw_bam_with_tag("MI", "0")),
1493 Ok(make_raw_bam_without_tag()),
1494 Ok(make_raw_bam_with_tag("MI", "0")),
1495 Ok(make_raw_bam_without_tag()),
1496 Ok(make_raw_bam_with_tag("MI", "1")),
1497 ];
1498 let mut iter = RawMiGroupIterator::new(records.into_iter(), "MI");
1499
1500 let result = iter.next().unwrap().unwrap();
1501 assert_eq!(result.0, "0");
1502 assert_eq!(result.1.len(), 2); let result = iter.next().unwrap().unwrap();
1505 assert_eq!(result.0, "1");
1506 assert_eq!(result.1.len(), 1);
1507
1508 assert!(iter.next().is_none());
1509 }
1510
1511 #[test]
1512 fn test_raw_error_propagation() {
1513 let records: Vec<Result<Vec<u8>>> = vec![
1514 Ok(make_raw_bam_with_tag("MI", "0")),
1515 Err(anyhow::anyhow!("test error")),
1516 Ok(make_raw_bam_with_tag("MI", "1")),
1517 ];
1518 let mut iter = RawMiGroupIterator::new(records.into_iter(), "MI");
1519
1520 let result = iter.next().unwrap().unwrap();
1522 assert_eq!(result.0, "0");
1523 assert_eq!(result.1.len(), 1);
1524
1525 let err = iter.next().unwrap();
1527 assert!(err.is_err());
1528
1529 assert!(iter.next().is_none());
1530 }
1531
1532 #[test]
1533 fn test_raw_error_with_no_pending_group() {
1534 let records: Vec<Result<Vec<u8>>> =
1535 vec![Err(anyhow::anyhow!("immediate error")), Ok(make_raw_bam_with_tag("MI", "0"))];
1536 let mut iter = RawMiGroupIterator::new(records.into_iter(), "MI");
1537
1538 let result = iter.next().unwrap();
1540 assert!(result.is_err());
1541
1542 assert!(iter.next().is_none());
1544 }
1545
1546 #[test]
1547 fn test_raw_custom_tag() {
1548 let records: Vec<Result<Vec<u8>>> =
1549 vec![Ok(make_raw_bam_with_tag("RX", "ACGT")), Ok(make_raw_bam_with_tag("RX", "ACGT"))];
1550 let mut iter = RawMiGroupIterator::new(records.into_iter(), "RX");
1551
1552 let result = iter.next().unwrap().unwrap();
1553 assert_eq!(result.0, "ACGT");
1554 assert_eq!(result.1.len(), 2);
1555
1556 assert!(iter.next().is_none());
1557 }
1558
1559 #[test]
1560 #[should_panic(expected = "Tag name must be exactly 2 characters")]
1561 fn test_raw_invalid_tag_length() {
1562 let records: Vec<Result<Vec<u8>>> = vec![];
1563 let _ = RawMiGroupIterator::new(records.into_iter(), "M");
1564 }
1565
1566 #[test]
1567 fn test_raw_with_transform() {
1568 let records: Vec<Result<Vec<u8>>> = vec![
1570 Ok(make_raw_bam_with_tag("MI", "1/A")),
1571 Ok(make_raw_bam_with_tag("MI", "1/A")),
1572 Ok(make_raw_bam_with_tag("MI", "1/B")),
1573 Ok(make_raw_bam_with_tag("MI", "1/B")),
1574 Ok(make_raw_bam_with_tag("MI", "2/A")),
1575 Ok(make_raw_bam_with_tag("MI", "2/B")),
1576 ];
1577 let mut iter = RawMiGroupIterator::with_transform(records.into_iter(), "MI", |raw| {
1578 let s = String::from_utf8_lossy(raw);
1579 extract_mi_base(&s).to_string()
1580 });
1581
1582 let result = iter.next().unwrap().unwrap();
1584 assert_eq!(result.0, "1");
1585 assert_eq!(result.1.len(), 4);
1586
1587 let result = iter.next().unwrap().unwrap();
1589 assert_eq!(result.0, "2");
1590 assert_eq!(result.1.len(), 2);
1591
1592 assert!(iter.next().is_none());
1593 }
1594
1595 #[test]
1596 fn test_raw_with_transform_empty() {
1597 let records: Vec<Result<Vec<u8>>> = vec![];
1598 let mut iter = RawMiGroupIterator::with_transform(records.into_iter(), "MI", |raw| {
1599 String::from_utf8_lossy(raw).to_uppercase()
1600 });
1601 assert!(iter.next().is_none());
1602 }
1603
1604 #[test]
1605 #[should_panic(expected = "Tag name must be exactly 2 characters")]
1606 fn test_raw_with_transform_invalid_tag_length() {
1607 let records: Vec<Result<Vec<u8>>> = vec![];
1608 let _ = RawMiGroupIterator::with_transform(records.into_iter(), "ABC", |raw| {
1609 String::from_utf8_lossy(raw).into_owned()
1610 });
1611 }
1612
1613 #[test]
1614 fn test_raw_get_mi_without_transform() {
1615 let iter = RawMiGroupIterator::new(std::iter::empty::<Result<Vec<u8>>>(), "MI");
1616 let bam = make_raw_bam_with_tag("MI", "42");
1617 assert_eq!(iter.get_mi(&bam), Some("42".to_string()));
1618 }
1619
1620 #[test]
1621 fn test_raw_get_mi_with_transform() {
1622 let iter = RawMiGroupIterator::with_transform(
1623 std::iter::empty::<Result<Vec<u8>>>(),
1624 "MI",
1625 |raw| {
1626 let s = String::from_utf8_lossy(raw);
1627 s.to_uppercase()
1628 },
1629 );
1630 let bam = make_raw_bam_with_tag("MI", "abc");
1631 assert_eq!(iter.get_mi(&bam), Some("ABC".to_string()));
1632 }
1633
1634 #[test]
1635 fn test_raw_get_mi_missing_tag() {
1636 let iter = RawMiGroupIterator::new(std::iter::empty::<Result<Vec<u8>>>(), "MI");
1637 let bam = make_raw_bam_without_tag();
1638 assert_eq!(iter.get_mi(&bam), None);
1639 }
1640
1641 #[test]
1642 fn test_raw_get_mi_wrong_tag() {
1643 let iter = RawMiGroupIterator::new(std::iter::empty::<Result<Vec<u8>>>(), "MI");
1644 let bam = make_raw_bam_with_tag("RX", "ACGT");
1645 assert_eq!(iter.get_mi(&bam), None);
1646 }
1647
1648 #[allow(clippy::cast_possible_truncation)]
1650 fn make_raw_bam_with_two_tags(tag1: &str, val1: &str, tag2: &str, val2: &str) -> Vec<u8> {
1651 let name = b"read";
1652 let l_read_name: u8 = (name.len() + 1) as u8;
1653 let seq_len: u32 = 4;
1654 let seq_bytes = seq_len.div_ceil(2) as usize;
1655
1656 let t1 = tag1.as_bytes();
1657 let t2 = tag2.as_bytes();
1658 let aux: Vec<u8> = [
1659 &[t1[0], t1[1], b'Z'],
1660 val1.as_bytes(),
1661 &[0u8],
1662 &[t2[0], t2[1], b'Z'],
1663 val2.as_bytes(),
1664 &[0u8],
1665 ]
1666 .concat();
1667
1668 let total = 32 + l_read_name as usize + seq_bytes + seq_len as usize + aux.len();
1669 let mut buf = vec![0u8; total];
1670
1671 buf[0..4].copy_from_slice(&(-1i32).to_le_bytes());
1672 buf[4..8].copy_from_slice(&(-1i32).to_le_bytes());
1673 buf[8] = l_read_name;
1674 buf[12..14].copy_from_slice(&0u16.to_le_bytes());
1675 buf[16..20].copy_from_slice(&seq_len.to_le_bytes());
1676 buf[20..24].copy_from_slice(&(-1i32).to_le_bytes());
1677 buf[24..28].copy_from_slice(&(-1i32).to_le_bytes());
1678
1679 let name_start = 32;
1680 buf[name_start..name_start + name.len()].copy_from_slice(name);
1681 buf[name_start + name.len()] = 0;
1682
1683 let aux_start = 32 + l_read_name as usize + seq_bytes + seq_len as usize;
1684 buf[aux_start..aux_start + aux.len()].copy_from_slice(&aux);
1685
1686 buf
1687 }
1688
1689 #[test]
1690 fn test_raw_cell_tag_composite_grouping() {
1691 let records: Vec<Result<Vec<u8>>> = vec![
1693 Ok(make_raw_bam_with_two_tags("MI", "1", "CB", "ACGT")),
1694 Ok(make_raw_bam_with_two_tags("MI", "1", "CB", "ACGT")),
1695 Ok(make_raw_bam_with_two_tags("MI", "1", "CB", "TGCA")),
1696 Ok(make_raw_bam_with_two_tags("MI", "1", "CB", "TGCA")),
1697 ];
1698 let mut iter =
1699 RawMiGroupIterator::new(records.into_iter(), "MI").with_cell_tag(Some([b'C', b'B']));
1700
1701 let result = iter.next().unwrap().unwrap();
1703 assert_eq!(result.0, "1\tACGT");
1704 assert_eq!(result.1.len(), 2);
1705
1706 let result = iter.next().unwrap().unwrap();
1708 assert_eq!(result.0, "1\tTGCA");
1709 assert_eq!(result.1.len(), 2);
1710
1711 assert!(iter.next().is_none());
1712 }
1713
1714 #[test]
1715 fn test_raw_cell_tag_none_groups_by_mi_only() {
1716 let records: Vec<Result<Vec<u8>>> = vec![
1718 Ok(make_raw_bam_with_two_tags("MI", "1", "CB", "ACGT")),
1719 Ok(make_raw_bam_with_two_tags("MI", "1", "CB", "TGCA")),
1720 ];
1721 let mut iter = RawMiGroupIterator::new(records.into_iter(), "MI").with_cell_tag(None);
1722
1723 let result = iter.next().unwrap().unwrap();
1724 assert_eq!(result.0, "1");
1725 assert_eq!(result.1.len(), 2); assert!(iter.next().is_none());
1728 }
1729
1730 #[test]
1731 fn test_raw_cell_tag_missing_cell_value() {
1732 let records: Vec<Result<Vec<u8>>> = vec![
1734 Ok(make_raw_bam_with_tag("MI", "1")),
1735 Ok(make_raw_bam_with_tag("MI", "1")),
1736 Ok(make_raw_bam_with_two_tags("MI", "1", "CB", "ACGT")),
1737 ];
1738 let mut iter =
1739 RawMiGroupIterator::new(records.into_iter(), "MI").with_cell_tag(Some([b'C', b'B']));
1740
1741 let result = iter.next().unwrap().unwrap();
1743 assert_eq!(result.0, "1\t");
1744 assert_eq!(result.1.len(), 2);
1745
1746 let result = iter.next().unwrap().unwrap();
1748 assert_eq!(result.0, "1\tACGT");
1749 assert_eq!(result.1.len(), 1);
1750
1751 assert!(iter.next().is_none());
1752 }
1753
1754 #[test]
1755 fn test_raw_cell_tag_with_transform() {
1756 let records: Vec<Result<Vec<u8>>> = vec![
1758 Ok(make_raw_bam_with_two_tags("MI", "1/A", "CB", "ACGT")),
1759 Ok(make_raw_bam_with_two_tags("MI", "1/B", "CB", "ACGT")),
1760 Ok(make_raw_bam_with_two_tags("MI", "1/A", "CB", "TGCA")),
1761 ];
1762 let mut iter = RawMiGroupIterator::with_transform(records.into_iter(), "MI", |raw| {
1763 let s = String::from_utf8_lossy(raw);
1764 extract_mi_base(&s).to_string()
1765 })
1766 .with_cell_tag(Some([b'C', b'B']));
1767
1768 let result = iter.next().unwrap().unwrap();
1771 assert_eq!(result.0, "1\tACGT");
1772 assert_eq!(result.1.len(), 2);
1773
1774 let result = iter.next().unwrap().unwrap();
1776 assert_eq!(result.0, "1\tTGCA");
1777 assert_eq!(result.1.len(), 1);
1778
1779 assert!(iter.next().is_none());
1780 }
1781
1782 fn make_raw_decoded_record(tag_name: &str, tag_value: &str) -> DecodedRecord {
1788 let raw = make_raw_bam_with_tag(tag_name, tag_value);
1789 let key = crate::unified_pipeline::GroupKey::single(0, 0, 0, 0, 0, 0);
1790 DecodedRecord::from_raw_bytes(raw, key)
1791 }
1792
1793 fn make_raw_decoded_record_no_tag() -> DecodedRecord {
1795 let raw = make_raw_bam_without_tag();
1796 let key = crate::unified_pipeline::GroupKey::single(0, 0, 0, 0, 0, 0);
1797 DecodedRecord::from_raw_bytes(raw, key)
1798 }
1799
1800 fn make_parsed_decoded_record(mi: &str) -> DecodedRecord {
1802 let record = create_record_with_mi(mi);
1803 let key = crate::unified_pipeline::GroupKey::single(0, 0, 0, 0, 0, 0);
1804 DecodedRecord::new(record, key)
1805 }
1806
1807 #[test]
1808 fn test_raw_grouper_single_mi_group() {
1809 let mut grouper = RawMiGrouper::new("MI", 10);
1810
1811 let records = vec![
1812 make_raw_decoded_record("MI", "0"),
1813 make_raw_decoded_record("MI", "0"),
1814 make_raw_decoded_record("MI", "0"),
1815 ];
1816
1817 let batches = grouper.add_records(records).unwrap();
1818 assert!(batches.is_empty());
1820 assert!(grouper.has_pending());
1821
1822 let final_batch = grouper.finish().unwrap().unwrap();
1823 assert_eq!(final_batch.groups.len(), 1);
1824 assert_eq!(final_batch.groups[0].mi, "0");
1825 assert_eq!(final_batch.groups[0].records.len(), 3);
1826 }
1827
1828 #[test]
1829 fn test_raw_grouper_multiple_mi_groups() {
1830 let mut grouper = RawMiGrouper::new("MI", 10);
1831
1832 let records = vec![
1833 make_raw_decoded_record("MI", "0"),
1834 make_raw_decoded_record("MI", "0"),
1835 make_raw_decoded_record("MI", "1"),
1836 make_raw_decoded_record("MI", "1"),
1837 make_raw_decoded_record("MI", "1"),
1838 make_raw_decoded_record("MI", "2"),
1839 ];
1840
1841 let batches = grouper.add_records(records).unwrap();
1842 assert!(batches.is_empty()); let final_batch = grouper.finish().unwrap().unwrap();
1845 assert_eq!(final_batch.groups.len(), 3);
1846 assert_eq!(final_batch.groups[0].mi, "0");
1847 assert_eq!(final_batch.groups[0].records.len(), 2);
1848 assert_eq!(final_batch.groups[1].mi, "1");
1849 assert_eq!(final_batch.groups[1].records.len(), 3);
1850 assert_eq!(final_batch.groups[2].mi, "2");
1851 assert_eq!(final_batch.groups[2].records.len(), 1);
1852 }
1853
1854 #[test]
1855 fn test_raw_grouper_batch_size_triggers() {
1856 let mut grouper = RawMiGrouper::new("MI", 2);
1857
1858 let records = vec![
1859 make_raw_decoded_record("MI", "0"),
1860 make_raw_decoded_record("MI", "1"),
1861 make_raw_decoded_record("MI", "2"),
1862 make_raw_decoded_record("MI", "3"),
1863 make_raw_decoded_record("MI", "4"),
1864 ];
1865
1866 let batches = grouper.add_records(records).unwrap();
1867 assert_eq!(batches.len(), 2);
1870 assert_eq!(batches[0].groups.len(), 2);
1871 assert_eq!(batches[0].groups[0].mi, "0");
1872 assert_eq!(batches[0].groups[1].mi, "1");
1873 assert_eq!(batches[1].groups.len(), 2);
1874 assert_eq!(batches[1].groups[0].mi, "2");
1875 assert_eq!(batches[1].groups[1].mi, "3");
1876
1877 assert!(grouper.has_pending());
1879
1880 let final_batch = grouper.finish().unwrap().unwrap();
1881 assert_eq!(final_batch.groups.len(), 1);
1882 assert_eq!(final_batch.groups[0].mi, "4");
1883 }
1884
1885 #[test]
1886 fn test_raw_grouper_groups_records_without_mi_under_empty_key() {
1887 let mut grouper = RawMiGrouper::new("MI", 10);
1888
1889 let records = vec![
1890 make_raw_decoded_record("MI", "0"),
1891 make_raw_decoded_record_no_tag(),
1892 make_raw_decoded_record("MI", "0"),
1893 ];
1894
1895 let batches = grouper.add_records(records).unwrap();
1896 assert!(batches.is_empty());
1897
1898 let final_batch = grouper.finish().unwrap().unwrap();
1900 assert_eq!(final_batch.groups.len(), 3);
1901 assert_eq!(final_batch.groups[0].mi, "0");
1902 assert_eq!(final_batch.groups[0].records.len(), 1);
1903 assert_eq!(final_batch.groups[1].mi, "");
1904 assert_eq!(final_batch.groups[1].records.len(), 1);
1905 assert_eq!(final_batch.groups[2].mi, "0");
1906 assert_eq!(final_batch.groups[2].records.len(), 1);
1907 }
1908
1909 #[test]
1910 fn test_raw_grouper_rejects_parsed_records() {
1911 let mut grouper = RawMiGrouper::new("MI", 10);
1912
1913 let records = vec![make_parsed_decoded_record("0")];
1914 let result = grouper.add_records(records);
1915 assert!(result.is_err());
1916 }
1917
1918 #[test]
1919 fn test_raw_grouper_finish_empty() {
1920 let mut grouper = RawMiGrouper::new("MI", 10);
1921 assert!(!grouper.has_pending());
1922
1923 let final_batch = grouper.finish().unwrap();
1924 assert!(final_batch.is_none());
1925 }
1926
1927 #[test]
1928 fn test_raw_grouper_finish_idempotent() {
1929 let mut grouper = RawMiGrouper::new("MI", 10);
1930
1931 let records = vec![make_raw_decoded_record("MI", "0")];
1932 grouper.add_records(records).unwrap();
1933
1934 let batch1 = grouper.finish().unwrap();
1935 assert!(batch1.is_some());
1936
1937 let batch2 = grouper.finish().unwrap();
1938 assert!(batch2.is_none());
1939 }
1940
1941 #[test]
1942 fn test_raw_grouper_has_pending_states() {
1943 let mut grouper = RawMiGrouper::new("MI", 10);
1944
1945 assert!(!grouper.has_pending());
1947
1948 let records = vec![make_raw_decoded_record("MI", "0")];
1950 grouper.add_records(records).unwrap();
1951 assert!(grouper.has_pending());
1952
1953 let records = vec![make_raw_decoded_record("MI", "1")];
1955 grouper.add_records(records).unwrap();
1956 assert!(grouper.has_pending());
1957 }
1958
1959 #[test]
1960 fn test_raw_grouper_with_filter_and_transform() {
1961 let mut grouper = RawMiGrouper::with_filter_and_transform(
1964 "MI",
1965 10,
1966 |bam: &[u8]| {
1967 let flag = fgumi_raw_bam::flags(bam);
1969 flag & fgumi_raw_bam::flags::SECONDARY == 0 },
1971 |raw: &[u8]| {
1972 let s = String::from_utf8_lossy(raw);
1973 extract_mi_base(&s).to_string()
1974 },
1975 );
1976
1977 let rec_primary = make_raw_bam_with_tag("MI", "1/A");
1979 let mut rec_secondary = make_raw_bam_with_tag("MI", "1/A");
1982 rec_secondary[14..16].copy_from_slice(&0x100u16.to_le_bytes());
1984
1985 let rec_b = make_raw_bam_with_tag("MI", "1/B");
1986
1987 let key = crate::unified_pipeline::GroupKey::single(0, 0, 0, 0, 0, 0);
1988 let records = vec![
1989 DecodedRecord::from_raw_bytes(rec_primary, key),
1990 DecodedRecord::from_raw_bytes(rec_secondary, key),
1991 DecodedRecord::from_raw_bytes(rec_b, key),
1992 ];
1993
1994 let batches = grouper.add_records(records).unwrap();
1995 assert!(batches.is_empty());
1996
1997 let final_batch = grouper.finish().unwrap().unwrap();
1998 assert_eq!(final_batch.groups.len(), 1);
2000 assert_eq!(final_batch.groups[0].mi, "1");
2001 assert_eq!(final_batch.groups[0].records.len(), 2); }
2003
2004 fn make_decoded_record_for_mi_grouper(mi: &str) -> DecodedRecord {
2010 let record = create_record_with_mi(mi);
2011 let key = crate::unified_pipeline::GroupKey::single(0, 0, 0, 0, 0, 0);
2012 DecodedRecord::new(record, key)
2013 }
2014
2015 #[test]
2016 fn test_mi_grouper_single_group() {
2017 let mut grouper = MiGrouper::new("MI", 10);
2018
2019 let records = vec![
2020 make_decoded_record_for_mi_grouper("0"),
2021 make_decoded_record_for_mi_grouper("0"),
2022 make_decoded_record_for_mi_grouper("0"),
2023 ];
2024
2025 let batches = grouper.add_records(records).unwrap();
2026 assert!(batches.is_empty());
2027 assert!(grouper.has_pending());
2028
2029 let final_batch = grouper.finish().unwrap().unwrap();
2030 assert_eq!(final_batch.groups.len(), 1);
2031 assert_eq!(final_batch.groups[0].mi, "0");
2032 assert_eq!(final_batch.groups[0].records.len(), 3);
2033 }
2034
2035 #[test]
2036 fn test_mi_grouper_multiple_groups() {
2037 let mut grouper = MiGrouper::new("MI", 10);
2038
2039 let records = vec![
2040 make_decoded_record_for_mi_grouper("0"),
2041 make_decoded_record_for_mi_grouper("0"),
2042 make_decoded_record_for_mi_grouper("1"),
2043 make_decoded_record_for_mi_grouper("1"),
2044 make_decoded_record_for_mi_grouper("1"),
2045 make_decoded_record_for_mi_grouper("2"),
2046 ];
2047
2048 let batches = grouper.add_records(records).unwrap();
2049 assert!(batches.is_empty());
2050
2051 let final_batch = grouper.finish().unwrap().unwrap();
2052 assert_eq!(final_batch.groups.len(), 3);
2053 assert_eq!(final_batch.groups[0].mi, "0");
2054 assert_eq!(final_batch.groups[0].records.len(), 2);
2055 assert_eq!(final_batch.groups[1].mi, "1");
2056 assert_eq!(final_batch.groups[1].records.len(), 3);
2057 assert_eq!(final_batch.groups[2].mi, "2");
2058 assert_eq!(final_batch.groups[2].records.len(), 1);
2059 }
2060
2061 #[test]
2062 fn test_mi_grouper_batch_size_triggers() {
2063 let mut grouper = MiGrouper::new("MI", 2);
2064
2065 let records = vec![
2066 make_decoded_record_for_mi_grouper("0"),
2067 make_decoded_record_for_mi_grouper("1"),
2068 make_decoded_record_for_mi_grouper("2"),
2069 make_decoded_record_for_mi_grouper("3"),
2070 make_decoded_record_for_mi_grouper("4"),
2071 ];
2072
2073 let batches = grouper.add_records(records).unwrap();
2074 assert_eq!(batches.len(), 2);
2075 assert_eq!(batches[0].groups.len(), 2);
2076 assert_eq!(batches[0].groups[0].mi, "0");
2077 assert_eq!(batches[0].groups[1].mi, "1");
2078 assert_eq!(batches[1].groups.len(), 2);
2079 assert_eq!(batches[1].groups[0].mi, "2");
2080 assert_eq!(batches[1].groups[1].mi, "3");
2081
2082 assert!(grouper.has_pending());
2083
2084 let final_batch = grouper.finish().unwrap().unwrap();
2085 assert_eq!(final_batch.groups.len(), 1);
2086 assert_eq!(final_batch.groups[0].mi, "4");
2087 }
2088
2089 #[test]
2090 fn test_mi_grouper_rejects_raw_records() {
2091 let mut grouper = MiGrouper::new("MI", 10);
2092
2093 let records = vec![make_raw_decoded_record("MI", "0")];
2094 let result = grouper.add_records(records);
2095 assert!(result.is_err());
2096 }
2097
2098 #[test]
2099 fn test_mi_grouper_finish_empty() {
2100 let mut grouper = MiGrouper::new("MI", 10);
2101 assert!(!grouper.has_pending());
2102
2103 let final_batch = grouper.finish().unwrap();
2104 assert!(final_batch.is_none());
2105 }
2106
2107 #[test]
2108 fn test_mi_grouper_finish_idempotent() {
2109 let mut grouper = MiGrouper::new("MI", 10);
2110
2111 let records = vec![make_decoded_record_for_mi_grouper("0")];
2112 grouper.add_records(records).unwrap();
2113
2114 let batch1 = grouper.finish().unwrap();
2115 assert!(batch1.is_some());
2116
2117 let batch2 = grouper.finish().unwrap();
2118 assert!(batch2.is_none());
2119 }
2120
2121 #[test]
2122 fn test_mi_grouper_with_filter_and_transform() {
2123 let mut grouper = MiGrouper::with_filter_and_transform(
2124 "MI",
2125 10,
2126 |_r| true, |mi| extract_mi_base(mi).to_string(),
2128 );
2129
2130 let records = vec![
2131 make_decoded_record_for_mi_grouper("1/A"),
2132 make_decoded_record_for_mi_grouper("1/B"),
2133 make_decoded_record_for_mi_grouper("2/A"),
2134 ];
2135
2136 let batches = grouper.add_records(records).unwrap();
2137 assert!(batches.is_empty());
2138
2139 let final_batch = grouper.finish().unwrap().unwrap();
2140 assert_eq!(final_batch.groups.len(), 2);
2141 assert_eq!(final_batch.groups[0].mi, "1");
2142 assert_eq!(final_batch.groups[0].records.len(), 2);
2143 assert_eq!(final_batch.groups[1].mi, "2");
2144 assert_eq!(final_batch.groups[1].records.len(), 1);
2145 }
2146
2147 #[test]
2148 fn test_mi_grouper_has_pending_states() {
2149 let mut grouper = MiGrouper::new("MI", 10);
2150
2151 assert!(!grouper.has_pending());
2152
2153 let records = vec![make_decoded_record_for_mi_grouper("0")];
2154 grouper.add_records(records).unwrap();
2155 assert!(grouper.has_pending());
2156
2157 let records = vec![make_decoded_record_for_mi_grouper("1")];
2158 grouper.add_records(records).unwrap();
2159 assert!(grouper.has_pending());
2160 }
2161
2162 #[test]
2163 fn test_mi_grouper_record_without_mi_gets_empty_string() {
2164 let mut grouper = MiGrouper::new("MI", 10);
2165
2166 let record = create_record_without_mi();
2167 let key = crate::unified_pipeline::GroupKey::single(0, 0, 0, 0, 0, 0);
2168 let records = vec![DecodedRecord::new(record, key)];
2169
2170 let batches = grouper.add_records(records).unwrap();
2171 assert!(batches.is_empty());
2172
2173 let final_batch = grouper.finish().unwrap().unwrap();
2175 assert_eq!(final_batch.groups.len(), 1);
2176 assert_eq!(final_batch.groups[0].mi, "");
2177 assert_eq!(final_batch.groups[0].records.len(), 1);
2178 }
2179
2180 #[test]
2185 fn test_mi_group_batch_new() {
2186 let batch = MiGroupBatch::new();
2187 assert!(batch.is_empty());
2188 assert_eq!(batch.len(), 0);
2189 }
2190
2191 #[test]
2192 fn test_mi_group_batch_with_capacity() {
2193 let batch = MiGroupBatch::with_capacity(16);
2194 assert!(batch.is_empty());
2195 assert_eq!(batch.len(), 0);
2196 }
2197
2198 #[test]
2199 fn test_mi_group_batch_len_and_is_empty() {
2200 let mut batch = MiGroupBatch::new();
2201 assert!(batch.is_empty());
2202
2203 batch.groups.push(MiGroup::new("0".to_string(), vec![create_record_with_mi("0")]));
2204 assert!(!batch.is_empty());
2205 assert_eq!(batch.len(), 1);
2206
2207 batch.groups.push(MiGroup::new(
2208 "1".to_string(),
2209 vec![create_record_with_mi("1"), create_record_with_mi("1")],
2210 ));
2211 assert_eq!(batch.len(), 2);
2212 }
2213
2214 #[test]
2215 fn test_mi_group_batch_clear() {
2216 let mut batch = MiGroupBatch::new();
2217 batch.groups.push(MiGroup::new("0".to_string(), vec![create_record_with_mi("0")]));
2218 assert!(!batch.is_empty());
2219
2220 batch.clear();
2221 assert!(batch.is_empty());
2222 assert_eq!(batch.len(), 0);
2223 }
2224
2225 #[test]
2226 fn test_mi_group_batch_weight() {
2227 let mut batch = MiGroupBatch::new();
2228 assert_eq!(batch.batch_weight(), 0);
2229
2230 batch.groups.push(MiGroup::new(
2231 "0".to_string(),
2232 vec![create_record_with_mi("0"), create_record_with_mi("0")],
2233 ));
2234 assert_eq!(batch.batch_weight(), 2);
2235
2236 batch.groups.push(MiGroup::new("1".to_string(), vec![create_record_with_mi("1")]));
2237 assert_eq!(batch.batch_weight(), 3);
2238 }
2239
2240 #[test]
2241 fn test_mi_group_batch_default() {
2242 let batch = MiGroupBatch::default();
2243 assert!(batch.is_empty());
2244 }
2245
2246 #[test]
2247 fn test_mi_group_batch_memory_estimate() {
2248 let batch = MiGroupBatch::new();
2249 let _ = batch.estimate_heap_size(); let mut batch = MiGroupBatch::new();
2253 batch.groups.push(MiGroup::new("0".to_string(), vec![create_record_with_mi("0")]));
2254 let size = batch.estimate_heap_size();
2255 assert!(size > 0);
2256 }
2257
2258 #[test]
2259 fn test_mi_group_new() {
2260 let group = MiGroup::new("42".to_string(), vec![create_record_with_mi("42")]);
2261 assert_eq!(group.mi, "42");
2262 assert_eq!(group.records.len(), 1);
2263 }
2264
2265 #[test]
2266 fn test_mi_group_batch_weight_single() {
2267 let group = MiGroup::new(
2268 "0".to_string(),
2269 vec![
2270 create_record_with_mi("0"),
2271 create_record_with_mi("0"),
2272 create_record_with_mi("0"),
2273 ],
2274 );
2275 assert_eq!(group.batch_weight(), 3);
2276 }
2277
2278 #[test]
2279 fn test_mi_group_memory_estimate() {
2280 let group = MiGroup::new("0".to_string(), vec![create_record_with_mi("0")]);
2281 let size = group.estimate_heap_size();
2282 assert!(size > 0);
2283 }
2284
2285 #[test]
2290 fn test_raw_mi_group_new() {
2291 let raw = make_raw_bam_with_tag("MI", "42");
2292 let group = RawMiGroup::new("42".to_string(), vec![raw]);
2293 assert_eq!(group.mi, "42");
2294 assert_eq!(group.records.len(), 1);
2295 }
2296
2297 #[test]
2298 fn test_raw_mi_group_batch_weight() {
2299 let group = RawMiGroup::new(
2300 "0".to_string(),
2301 vec![make_raw_bam_with_tag("MI", "0"), make_raw_bam_with_tag("MI", "0")],
2302 );
2303 assert_eq!(group.batch_weight(), 2);
2304 }
2305
2306 #[test]
2307 fn test_raw_mi_group_memory_estimate() {
2308 let group = RawMiGroup::new("0".to_string(), vec![make_raw_bam_with_tag("MI", "0")]);
2309 let size = group.estimate_heap_size();
2310 assert!(size > 0);
2311 }
2312
2313 #[test]
2314 fn test_raw_mi_group_batch_new() {
2315 let batch = RawMiGroupBatch::new();
2316 assert!(batch.groups.is_empty());
2317 }
2318
2319 #[test]
2320 fn test_raw_mi_group_batch_default() {
2321 let batch = RawMiGroupBatch::default();
2322 assert!(batch.groups.is_empty());
2323 }
2324
2325 #[test]
2326 fn test_raw_mi_group_batch_weight_method() {
2327 let mut batch = RawMiGroupBatch::new();
2328 assert_eq!(batch.batch_weight(), 0);
2329
2330 batch.groups.push(RawMiGroup::new(
2331 "0".to_string(),
2332 vec![make_raw_bam_with_tag("MI", "0"), make_raw_bam_with_tag("MI", "0")],
2333 ));
2334 assert_eq!(batch.batch_weight(), 2);
2335
2336 batch.groups.push(RawMiGroup::new("1".to_string(), vec![make_raw_bam_with_tag("MI", "1")]));
2337 assert_eq!(batch.batch_weight(), 3);
2338 }
2339
2340 #[test]
2341 fn test_raw_mi_group_batch_memory_estimate() {
2342 let batch = RawMiGroupBatch::new();
2343 let _ = batch.estimate_heap_size(); let mut batch = RawMiGroupBatch::new();
2346 batch.groups.push(RawMiGroup::new("0".to_string(), vec![make_raw_bam_with_tag("MI", "0")]));
2347 let size = batch.estimate_heap_size();
2348 assert!(size > 0);
2349 }
2350}