1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
// Copyright 2017 Johannes Köster.
// Licensed under the MIT license (http://opensource.org/licenses/MIT)
// This file may not be copied, modified, or distributed
// except according to those terms.

use std::collections::{vec_deque, VecDeque};
use std::mem;

use crate::bcf::errors::Result;
use crate::bcf::{self, Read};

/// A buffer for BCF records. This allows access regions in a sorted BCF file while iterating
/// over it in a single pass.
/// The buffer is implemented as a ringbuffer, such that extension or movement to the right has
/// linear complexity. The buffer does not use any indexed random access. Hence, for getting a
/// region at the very end of the BCF, you will have to wait until all records before have
/// been read.
#[derive(Debug)]
pub struct RecordBuffer {
    reader: bcf::Reader,
    ringbuffer: VecDeque<bcf::Record>,
    ringbuffer2: VecDeque<bcf::Record>,
    overflow: Option<bcf::Record>,
}

unsafe impl Sync for RecordBuffer {}
unsafe impl Send for RecordBuffer {}

impl RecordBuffer {
    /// Create new buffer.
    pub fn new(reader: bcf::Reader) -> Self {
        RecordBuffer {
            reader,
            ringbuffer: VecDeque::new(),
            ringbuffer2: VecDeque::new(),
            overflow: None,
        }
    }

    fn last_rid(&self) -> Option<u32> {
        self.ringbuffer.back().map(|rec| rec.rid().unwrap())
    }

    fn next_rid(&self) -> Option<u32> {
        self.ringbuffer2.back().map(|rec| rec.rid().unwrap())
    }

    fn swap_buffers(&mut self) {
        // swap with buffer for next rid
        mem::swap(&mut self.ringbuffer2, &mut self.ringbuffer);
        // clear second buffer
        self.ringbuffer2.clear();
    }

    fn drain_left(&mut self, rid: u32, window_start: u64) -> usize {
        // remove records too far left or from wrong rid
        // rec.rid() will always yield Some(), because otherwise we won't put the rec into the
        // buffer.
        let to_remove = self
            .ringbuffer
            .iter()
            .take_while(|rec| (rec.pos() as u64) < window_start || rec.rid().unwrap() != rid)
            .count();
        self.ringbuffer.drain(..to_remove);
        to_remove
    }

    /// Fill the buffer with variants in the given window. The start coordinate has to be right of
    /// the start coordinate of any previous `fill` operation.
    /// Coordinates are 0-based, and end is exclusive.
    /// Returns tuple with numbers of added and deleted records compared to previous fetch.
    pub fn fetch(&mut self, chrom: &[u8], start: u64, end: u64) -> Result<(usize, usize)> {
        // TODO panic if start is left of previous start or we have moved past the given chrom
        // before.
        let rid = self.reader.header.name2rid(chrom)?;
        let mut added = 0;
        let mut deleted = 0;

        // shrink and swap
        match (self.last_rid(), self.next_rid()) {
            (Some(last_rid), _) => {
                if last_rid != rid {
                    deleted = self.ringbuffer.len();
                    self.swap_buffers();
                    added = self.ringbuffer.len();
                // TODO drain left?
                } else {
                    deleted = self.drain_left(rid, start);
                }
            }
            (_, Some(_)) => {
                // TODO is this really necessary? If there was no fetch before, there is nothing
                // to delete.
                deleted = self.ringbuffer.len();
                self.swap_buffers();
                deleted += self.drain_left(rid, start);
                added = self.ringbuffer.len();
            }
            _ => (),
        }

        if !self.ringbuffer2.is_empty() {
            // We have already read beyond the current rid. Hence we can't extend to the right for
            // this rid.
            return Ok((added, deleted));
        }

        // move overflow from last fill into ringbuffer
        if self.overflow.is_some() {
            let pos = self.overflow.as_ref().unwrap().pos() as u64;
            if pos >= start {
                if pos <= end {
                    self.ringbuffer.push_back(self.overflow.take().unwrap());
                    added += 1;
                } else {
                    return Ok((added, deleted));
                }
            } else {
                // discard overflow
                self.overflow.take();
            }
        }

        // extend to the right
        loop {
            let mut rec = self.reader.empty_record();

            if !self.reader.read(&mut rec)? {
                // EOF
                break;
            }
            let pos = rec.pos() as u64;
            if let Some(rec_rid) = rec.rid() {
                if rec_rid == rid {
                    if pos >= end {
                        // Record is beyond our window. Store it anyways but stop.
                        self.overflow = Some(rec);
                        break;
                    } else if pos >= start {
                        // Record is within our window.
                        self.ringbuffer.push_back(rec);
                        added += 1;
                    } else {
                        // Record is upstream of our window, ignore it
                        continue;
                    }
                } else if rec_rid > rid {
                    // record comes from next rid. Store it in second buffer but stop filling.
                    self.ringbuffer2.push_back(rec);
                    break;
                } else {
                    // Record comes from previous rid. Ignore it.
                    continue;
                }
            } else {
                // skip records without proper rid
                continue;
            }
        }

        Ok((added, deleted))
    }

    /// Iterate over records that have been fetched with `fetch`.
    pub fn iter(&self) -> vec_deque::Iter<'_, bcf::Record> {
        self.ringbuffer.iter()
    }

    /// Iterate over mutable references to records that have been fetched with `fetch`.
    pub fn iter_mut(&mut self) -> vec_deque::IterMut<'_, bcf::Record> {
        self.ringbuffer.iter_mut()
    }

    pub fn len(&self) -> usize {
        self.ringbuffer.len()
    }

    pub fn is_empty(&self) -> bool {
        self.len() == 0
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::bcf;
    use itertools::Itertools;

    #[test]
    fn test_buffer() {
        let reader = bcf::Reader::from_path(&"test/test.bcf").unwrap();
        let mut buffer = RecordBuffer::new(reader);

        buffer.fetch(b"1", 100, 10023).unwrap();
        {
            let records = buffer.iter().collect_vec();
            assert_eq!(records.len(), 2);
            assert_eq!(records[0].pos(), 10021);
            assert_eq!(records[1].pos(), 10022);
        }

        buffer.fetch(b"1", 10023, 10024).unwrap();
        {
            let records = buffer.iter().collect_vec();
            assert_eq!(records.len(), 1);
            assert_eq!(records[0].pos(), 10023);
        }
    }
}