rust_htslib/bam/
buffer.rs1use std::collections::{vec_deque, VecDeque};
7use std::mem;
8use std::str;
9use std::sync::Arc;
10
11use crate::bam;
12use crate::bam::Read;
13use crate::errors::{Error, Result};
14#[derive(Debug)]
20pub struct RecordBuffer {
21 reader: bam::IndexedReader,
22 inner: VecDeque<Arc<bam::Record>>,
23 overflow: Option<Arc<bam::Record>>,
24 cache_cigar: bool,
25 min_refetch_distance: u64,
26 buffer_record: Arc<bam::Record>,
27 start_pos: Option<u64>,
28}
29
30unsafe impl Sync for RecordBuffer {}
31unsafe impl Send for RecordBuffer {}
32
33impl RecordBuffer {
34 pub fn new(bam: bam::IndexedReader, cache_cigar: bool) -> Self {
41 RecordBuffer {
42 reader: bam,
43 inner: VecDeque::new(),
44 overflow: None,
45 cache_cigar,
46 min_refetch_distance: 1,
47 buffer_record: Arc::new(bam::Record::new()),
48 start_pos: Some(0),
49 }
50 }
51
52 pub fn set_min_refetch_distance(&mut self, min_refetch_distance: u64) {
57 self.min_refetch_distance = min_refetch_distance;
58 }
59
60 pub fn start(&self) -> Option<u64> {
62 self.inner.front().map(|rec| rec.pos() as u64)
63 }
64
65 pub fn end(&self) -> Option<u64> {
67 self.inner.back().map(|rec| rec.pos() as u64)
68 }
69
70 pub fn tid(&self) -> Option<i32> {
71 self.inner.back().map(|rec| rec.tid())
72 }
73
74 #[allow(unused_assignments)] pub fn fetch(&mut self, chrom: &[u8], start: u64, end: u64) -> Result<(usize, usize)> {
80 let mut added = 0;
81 if self.overflow.is_some() {
83 added += 1;
84 self.inner.push_back(self.overflow.take().unwrap());
85 }
86
87 if let Some(tid) = self.reader.header.tid(chrom) {
88 let mut deleted = 0;
89 let window_start = start;
90 if self.inner.is_empty()
91 || window_start.saturating_sub(self.end().unwrap()) >= self.min_refetch_distance
92 || self.tid().unwrap() != tid as i32
93 || self.start().unwrap() > self.start_pos.unwrap()
94 {
95 let end = self.reader.header.target_len(tid).unwrap();
96 self.reader.fetch((tid, window_start, end))?;
97 deleted = self.inner.len();
98 self.inner.clear();
99 } else {
100 let to_remove = self
102 .inner
103 .iter()
104 .take_while(|rec| rec.pos() < window_start as i64)
105 .count();
106 for _ in 0..to_remove {
107 self.inner.pop_front();
108 }
109 deleted = to_remove;
110 }
111
112 loop {
114 match self
115 .reader
116 .read(Arc::get_mut(&mut self.buffer_record).unwrap())
117 {
118 None => break,
119 Some(res) => res?,
120 }
121
122 if self.buffer_record.is_unmapped() {
123 continue;
124 }
125
126 let pos = self.buffer_record.pos();
127
128 if pos < start as i64 {
130 continue;
131 }
132
133 let mut record =
136 mem::replace(&mut self.buffer_record, Arc::new(bam::Record::new()));
137
138 if self.cache_cigar {
139 Arc::get_mut(&mut record).unwrap().cache_cigar();
140 }
141
142 if pos >= end as i64 {
143 self.overflow = Some(record);
144 break;
145 } else {
146 self.inner.push_back(record);
147 added += 1;
148 }
149 }
150 self.start_pos = Some(self.start().unwrap_or(window_start));
151
152 Ok((added, deleted))
153 } else {
154 Err(Error::UnknownSequence {
155 sequence: str::from_utf8(chrom).unwrap().to_owned(),
156 })
157 }
158 }
159
160 pub fn iter(&self) -> vec_deque::Iter<'_, Arc<bam::Record>> {
162 self.inner.iter()
163 }
164
165 pub fn iter_mut(&mut self) -> vec_deque::IterMut<'_, Arc<bam::Record>> {
167 self.inner.iter_mut()
168 }
169
170 pub fn len(&self) -> usize {
171 self.inner.len()
172 }
173
174 pub fn is_empty(&self) -> bool {
175 self.len() == 0
176 }
177}
178
179#[cfg(test)]
180mod tests {
181 use super::*;
182 use crate::bam;
183
184 #[test]
185 fn test_buffer() {
186 let reader = bam::IndexedReader::from_path("test/test.bam").unwrap();
187 let mut buffer = RecordBuffer::new(reader, false);
188
189 buffer.fetch(b"CHROMOSOME_I", 1, 5).unwrap();
190 {
191 let records: Vec<_> = buffer.iter().collect();
192 assert_eq!(records.len(), 6);
193 assert_eq!(records[0].pos(), 1);
194 assert_eq!(records[1].pos(), 1);
195 assert_eq!(records[2].pos(), 1);
196 assert_eq!(records[3].pos(), 1);
197 assert_eq!(records[4].pos(), 1);
198 assert_eq!(records[5].pos(), 1);
199 }
200 }
201}