Skip to main content

rust_htslib/bam/
buffer.rs

1// Copyright 2017 Johannes Köster.
2// Licensed under the MIT license (http://opensource.org/licenses/MIT)
3// This file may not be copied, modified, or distributed
4// except according to those terms.
5
6use std::collections::{vec_deque, VecDeque};
7use std::mem;
8use std::str;
9use std::sync::Arc;
10
11use crate::bam;
12use crate::bam::Read;
13use crate::errors::{Error, Result};
14/// A buffer for BAM records. This allows access regions in a sorted BAM file while iterating
15/// over it in a single pass.
16/// The buffer is implemented as a ringbuffer, such that extension or movement to the right has
17/// linear complexity. The buffer makes use of indexed random access. Hence, when fetching a
18/// region at the very end of the BAM, everything before is omitted without cost.
19#[derive(Debug)]
20pub struct RecordBuffer {
21    reader: bam::IndexedReader,
22    inner: VecDeque<Arc<bam::Record>>,
23    overflow: Option<Arc<bam::Record>>,
24    cache_cigar: bool,
25    min_refetch_distance: u64,
26    buffer_record: Arc<bam::Record>,
27    start_pos: Option<u64>,
28}
29
30unsafe impl Sync for RecordBuffer {}
31unsafe impl Send for RecordBuffer {}
32
33impl RecordBuffer {
34    /// Create a new `RecordBuffer`.
35    ///
36    /// # Arguments
37    ///
38    /// * `bam` - BAM reader
39    /// * `cache_cigar` - whether to call `bam::Record::cache_cigar()` for each record.
40    pub fn new(bam: bam::IndexedReader, cache_cigar: bool) -> Self {
41        RecordBuffer {
42            reader: bam,
43            inner: VecDeque::new(),
44            overflow: None,
45            cache_cigar,
46            min_refetch_distance: 1,
47            buffer_record: Arc::new(bam::Record::new()),
48            start_pos: Some(0),
49        }
50    }
51
52    /// maximum distance to previous fetch window such that a
53    /// new fetch operation is performed. If the distance is smaller, buffer will simply
54    /// read through until the start of the new fetch window (probably saving some time
55    /// by avoiding the random access).
56    pub fn set_min_refetch_distance(&mut self, min_refetch_distance: u64) {
57        self.min_refetch_distance = min_refetch_distance;
58    }
59
60    /// Return start position of buffer
61    pub fn start(&self) -> Option<u64> {
62        self.inner.front().map(|rec| rec.pos() as u64)
63    }
64
65    /// Return end position of buffer.
66    pub fn end(&self) -> Option<u64> {
67        self.inner.back().map(|rec| rec.pos() as u64)
68    }
69
70    pub fn tid(&self) -> Option<i32> {
71        self.inner.back().map(|rec| rec.tid())
72    }
73
74    /// Fill buffer at the given interval. If the start coordinate is left of
75    /// the previous start coordinate, this will use an additional BAM fetch IO operation.
76    /// Coordinates are 0-based, and end is exclusive.
77    /// Returns tuple with numbers of added and deleted records since the previous fetch.
78    #[allow(unused_assignments)] // TODO this is needed because rustc thinks that deleted is unused
79    pub fn fetch(&mut self, chrom: &[u8], start: u64, end: u64) -> Result<(usize, usize)> {
80        let mut added = 0;
81        // move overflow from last fetch into ringbuffer
82        if self.overflow.is_some() {
83            added += 1;
84            self.inner.push_back(self.overflow.take().unwrap());
85        }
86
87        if let Some(tid) = self.reader.header.tid(chrom) {
88            let mut deleted = 0;
89            let window_start = start;
90            if self.inner.is_empty()
91                || window_start.saturating_sub(self.end().unwrap()) >= self.min_refetch_distance
92                || self.tid().unwrap() != tid as i32
93                || self.start().unwrap() > self.start_pos.unwrap()
94            {
95                let end = self.reader.header.target_len(tid).unwrap();
96                self.reader.fetch((tid, window_start, end))?;
97                deleted = self.inner.len();
98                self.inner.clear();
99            } else {
100                // remove records too far left
101                let to_remove = self
102                    .inner
103                    .iter()
104                    .take_while(|rec| rec.pos() < window_start as i64)
105                    .count();
106                for _ in 0..to_remove {
107                    self.inner.pop_front();
108                }
109                deleted = to_remove;
110            }
111
112            // extend to the right
113            loop {
114                match self
115                    .reader
116                    .read(Arc::get_mut(&mut self.buffer_record).unwrap())
117                {
118                    None => break,
119                    Some(res) => res?,
120                }
121
122                if self.buffer_record.is_unmapped() {
123                    continue;
124                }
125
126                let pos = self.buffer_record.pos();
127
128                // skip records before the start
129                if pos < start as i64 {
130                    continue;
131                }
132
133                // Record is kept, do not reuse it for next iteration
134                // and thus create a new one.
135                let mut record =
136                    mem::replace(&mut self.buffer_record, Arc::new(bam::Record::new()));
137
138                if self.cache_cigar {
139                    Arc::get_mut(&mut record).unwrap().cache_cigar();
140                }
141
142                if pos >= end as i64 {
143                    self.overflow = Some(record);
144                    break;
145                } else {
146                    self.inner.push_back(record);
147                    added += 1;
148                }
149            }
150            self.start_pos = Some(self.start().unwrap_or(window_start));
151
152            Ok((added, deleted))
153        } else {
154            Err(Error::UnknownSequence {
155                sequence: str::from_utf8(chrom).unwrap().to_owned(),
156            })
157        }
158    }
159
160    /// Iterate over records that have been fetched with `fetch`.
161    pub fn iter(&self) -> vec_deque::Iter<'_, Arc<bam::Record>> {
162        self.inner.iter()
163    }
164
165    /// Iterate over mutable references to records that have been fetched with `fetch`.
166    pub fn iter_mut(&mut self) -> vec_deque::IterMut<'_, Arc<bam::Record>> {
167        self.inner.iter_mut()
168    }
169
170    pub fn len(&self) -> usize {
171        self.inner.len()
172    }
173
174    pub fn is_empty(&self) -> bool {
175        self.len() == 0
176    }
177}
178
179#[cfg(test)]
180mod tests {
181    use super::*;
182    use crate::bam;
183
184    #[test]
185    fn test_buffer() {
186        let reader = bam::IndexedReader::from_path("test/test.bam").unwrap();
187        let mut buffer = RecordBuffer::new(reader, false);
188
189        buffer.fetch(b"CHROMOSOME_I", 1, 5).unwrap();
190        {
191            let records: Vec<_> = buffer.iter().collect();
192            assert_eq!(records.len(), 6);
193            assert_eq!(records[0].pos(), 1);
194            assert_eq!(records[1].pos(), 1);
195            assert_eq!(records[2].pos(), 1);
196            assert_eq!(records[3].pos(), 1);
197            assert_eq!(records[4].pos(), 1);
198            assert_eq!(records[5].pos(), 1);
199        }
200    }
201}