1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
// Copyright 2017 Johannes Köster.
// Licensed under the MIT license (http://opensource.org/licenses/MIT)
// This file may not be copied, modified, or distributed
// except according to those terms.

use std::collections::{vec_deque, VecDeque};
use std::mem;
use std::rc::Rc;
use std::str;

use crate::bam;
use crate::bam::Read;
use crate::errors::{Error, Result};
/// A buffer for BAM records. This allows access regions in a sorted BAM file while iterating
/// over it in a single pass.
/// The buffer is implemented as a ringbuffer, such that extension or movement to the right has
/// linear complexity. The buffer makes use of indexed random access. Hence, when fetching a
/// region at the very end of the BAM, everything before is omitted without cost.
#[derive(Debug)]
pub struct RecordBuffer {
    reader: bam::IndexedReader,
    inner: VecDeque<Rc<bam::Record>>,
    overflow: Option<Rc<bam::Record>>,
    cache_cigar: bool,
    min_refetch_distance: u64,
    buffer_record: Rc<bam::Record>,
    start_pos: Option<u64>,
}

unsafe impl Sync for RecordBuffer {}
unsafe impl Send for RecordBuffer {}

impl RecordBuffer {
    /// Create a new `RecordBuffer`.
    ///
    /// # Arguments
    ///
    /// * `bam` - BAM reader
    /// * `cache_cigar` - whether to call `bam::Record::cache_cigar()` for each record.
    pub fn new(bam: bam::IndexedReader, cache_cigar: bool) -> Self {
        RecordBuffer {
            reader: bam,
            inner: VecDeque::new(),
            overflow: None,
            cache_cigar,
            min_refetch_distance: 1,
            buffer_record: Rc::new(bam::Record::new()),
            start_pos: Some(0),
        }
    }

    /// maximum distance to previous fetch window such that a
    /// new fetch operation is performed. If the distance is smaller, buffer will simply
    /// read through until the start of the new fetch window (probably saving some time
    /// by avoiding the random access).
    pub fn set_min_refetch_distance(&mut self, min_refetch_distance: u64) {
        self.min_refetch_distance = min_refetch_distance;
    }

    /// Return start position of buffer
    pub fn start(&self) -> Option<u64> {
        self.inner.front().map(|rec| rec.pos() as u64)
    }

    /// Return end position of buffer.
    pub fn end(&self) -> Option<u64> {
        self.inner.back().map(|rec| rec.pos() as u64)
    }

    pub fn tid(&self) -> Option<i32> {
        self.inner.back().map(|rec| rec.tid())
    }

    /// Fill buffer at the given interval. If the start coordinate is left of
    /// the previous start coordinate, this will use an additional BAM fetch IO operation.
    /// Coordinates are 0-based, and end is exclusive.
    /// Returns tuple with numbers of added and deleted records since the previous fetch.
    #[allow(unused_assignments)] // TODO this is needed because rustc thinks that deleted is unused
    pub fn fetch(&mut self, chrom: &[u8], start: u64, end: u64) -> Result<(usize, usize)> {
        let mut added = 0;
        // move overflow from last fetch into ringbuffer
        if self.overflow.is_some() {
            added += 1;
            self.inner.push_back(self.overflow.take().unwrap());
        }

        if let Some(tid) = self.reader.header.tid(chrom) {
            let mut deleted = 0;
            let window_start = start;
            if self.inner.is_empty()
                || window_start.saturating_sub(self.end().unwrap()) >= self.min_refetch_distance
                || self.tid().unwrap() != tid as i32
                || self.start().unwrap() > self.start_pos.unwrap()
            {
                let end = self.reader.header.target_len(tid).unwrap();
                self.reader.fetch((tid, window_start, end))?;
                deleted = self.inner.len();
                self.inner.clear();
            } else {
                // remove records too far left
                let to_remove = self
                    .inner
                    .iter()
                    .take_while(|rec| rec.pos() < window_start as i64)
                    .count();
                for _ in 0..to_remove {
                    self.inner.pop_front();
                }
                deleted = to_remove;
            }

            // extend to the right
            loop {
                match self
                    .reader
                    .read(Rc::get_mut(&mut self.buffer_record).unwrap())
                {
                    None => break,
                    Some(res) => res?,
                }

                if self.buffer_record.is_unmapped() {
                    continue;
                }

                let pos = self.buffer_record.pos();

                // skip records before the start
                if pos < start as i64 {
                    continue;
                }

                // Record is kept, do not reuse it for next iteration
                // and thus create a new one.
                let mut record = mem::replace(&mut self.buffer_record, Rc::new(bam::Record::new()));

                if self.cache_cigar {
                    Rc::get_mut(&mut record).unwrap().cache_cigar();
                }

                if pos >= end as i64 {
                    self.overflow = Some(record);
                    break;
                } else {
                    self.inner.push_back(record);
                    added += 1;
                }
            }
            self.start_pos = Some(self.start().unwrap_or(window_start));

            Ok((added, deleted))
        } else {
            Err(Error::UnknownSequence {
                sequence: str::from_utf8(chrom).unwrap().to_owned(),
            })
        }
    }

    /// Iterate over records that have been fetched with `fetch`.
    pub fn iter(&self) -> vec_deque::Iter<Rc<bam::Record>> {
        self.inner.iter()
    }

    /// Iterate over mutable references to records that have been fetched with `fetch`.
    pub fn iter_mut(&mut self) -> vec_deque::IterMut<Rc<bam::Record>> {
        self.inner.iter_mut()
    }

    pub fn len(&self) -> usize {
        self.inner.len()
    }

    pub fn is_empty(&self) -> bool {
        self.len() == 0
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::bam;

    #[test]
    fn test_buffer() {
        let reader = bam::IndexedReader::from_path(&"test/test.bam").unwrap();
        let mut buffer = RecordBuffer::new(reader, false);

        buffer.fetch(b"CHROMOSOME_I", 1, 5).unwrap();
        {
            let records: Vec<_> = buffer.iter().collect();
            assert_eq!(records.len(), 6);
            assert_eq!(records[0].pos(), 1);
            assert_eq!(records[1].pos(), 1);
            assert_eq!(records[2].pos(), 1);
            assert_eq!(records[3].pos(), 1);
            assert_eq!(records[4].pos(), 1);
            assert_eq!(records[5].pos(), 1);
        }
    }
}