av_format/buffer/
accreader.rs

1//! `AccReader` is like a `BufReader`, but supports partial consumption.
2//!
3//! Import new data with `fill_buf`, get the current buffer with
4//! `current_slice`, and indicate through the `consume` method how many bytes
5//! were used.
6
7use crate::buffer::Buffered;
8use std::cmp;
9use std::io;
10use std::io::{BufRead, Read, Result, Seek, SeekFrom};
11use std::iter;
12use std::iter::Iterator;
13
14/// Partial consumption buffer for any reader.
15pub struct AccReader<R> {
16    inner: R,
17    buf: Vec<u8>,
18    pos: usize,
19    end: usize,
20    // Position in the stream of the buffer's beginning
21    index: usize,
22}
23
24impl<R: Read + Seek> AccReader<R> {
25    /// Creates a new `AccReader` instance.
26    pub fn new(inner: R) -> AccReader<R> {
27        AccReader::with_capacity(4096, inner)
28    }
29
30    /// Creates a new `AccReader` instance of a determined capacity
31    /// for a reader.
32    pub fn with_capacity(cap: usize, inner: R) -> AccReader<R> {
33        AccReader {
34            inner,
35            buf: iter::repeat(0).take(cap).collect::<Vec<_>>(),
36            pos: 0,
37            end: 0,
38            index: 0,
39        }
40    }
41
42    /// Gets a reference to the underlying reader.
43    pub fn get_ref(&self) -> &R {
44        &self.inner
45    }
46
47    /// Gets a mutable reference to the underlying reader.
48    pub fn get_mut(&mut self) -> &mut R {
49        &mut self.inner
50    }
51
52    /// Unwraps the `AccReader`, returning the underlying reader.
53    ///
54    /// Note that any leftover data in the internal buffer is lost.
55    pub fn into_inner(self) -> R {
56        self.inner
57    }
58
59    /// Resets the buffer to the current position.
60    ///
61    /// All data before the current position is lost.
62    pub fn reset_buffer_position(&mut self) {
63        log::trace!(
64            "resetting buffer at pos: {} capacity: {}",
65            self.pos,
66            self.end
67        );
68        if self.end - self.pos > 0 {
69            log::trace!("copying {} to beginning of buffer", self.end - self.pos);
70            self.buf.copy_within(self.pos..self.end, 0);
71        }
72        self.end -= self.pos;
73        self.pos = 0;
74    }
75
76    /// Returns buffer data.
77    pub fn current_slice(&self) -> &[u8] {
78        log::trace!("current slice pos: {}, cap: {}", self.pos, self.end);
79        &self.buf[self.pos..self.end]
80    }
81
82    /// Returns buffer capacity.
83    pub fn capacity(&self) -> usize {
84        self.end - self.pos
85    }
86}
87
88impl<R: Read + Seek + Send + Sync> Buffered for AccReader<R> {
89    fn data(&self) -> &[u8] {
90        &self.buf[self.pos..self.end]
91    }
92    fn grow(&mut self, len: usize) {
93        let l = self.buf.len() + len;
94        self.buf.resize(l, 0);
95    }
96}
97
98impl<R: Read + Seek> Read for AccReader<R> {
99    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
100        log::trace!(
101            "read pos: {} cap: {} buflen: {}",
102            self.pos,
103            self.end,
104            buf.len()
105        );
106        if buf.len() < self.end - self.pos {
107            match (&self.buf[self.pos..(self.pos + buf.len())]).read(buf) {
108                Ok(len) => {
109                    self.consume(len);
110                    Ok(len)
111                }
112                Err(e) => Err(e),
113            }
114        } else {
115            // If we don't have any buffered data and we're doing a massive read
116            // (larger than our internal buffer), bypass our internal buffer
117            // entirely.
118            if buf.len() > self.buf.len() {
119                match (&self.buf[self.pos..self.end]).read(buf) {
120                    Ok(len) => {
121                        let total_len = self.inner.read(&mut buf[(self.end - self.pos)..])? + len;
122
123                        self.consume(total_len);
124                        self.reset_buffer_position();
125
126                        Ok(total_len)
127                    }
128                    Err(e) => Err(e),
129                }
130            } else {
131                let nread = {
132                    let mut rem = self.fill_buf()?;
133                    rem.read(buf)?
134                };
135                self.consume(nread);
136                Ok(nread)
137            }
138        }
139    }
140}
141
142impl<R: Read + Seek> BufRead for AccReader<R> {
143    fn fill_buf(&mut self) -> io::Result<&[u8]> {
144        // trace!("fillbuf current: {:?}", str::from_utf8(&self.buf[self.pos..self.end]).unwrap());
145        if self.pos != 0 || self.end != self.buf.len() {
146            self.reset_buffer_position();
147            log::trace!("buffer reset ended");
148            let read = self.inner.read(&mut self.buf[self.end..])?;
149            self.end += read;
150            log::trace!(
151                "new pos: {} and cap: {} -> current: {:?}",
152                self.pos,
153                self.end,
154                &self.buf[self.pos..self.end]
155            );
156        }
157        Ok(&self.buf[self.pos..self.end])
158    }
159
160    fn consume(&mut self, amt: usize) {
161        log::trace!("consumed {} bytes", amt);
162        self.pos = cmp::min(self.pos + amt, self.end);
163        self.index += amt;
164    }
165}
166
167impl<R: Read + Seek> Seek for AccReader<R> {
168    fn seek(&mut self, mut pos: SeekFrom) -> Result<u64> {
169        match pos {
170            SeekFrom::Start(sz) => {
171                let mv = sz as usize;
172                if mv >= self.index && mv < self.index + self.end - self.pos {
173                    self.pos += mv - self.index;
174                    self.index = mv;
175
176                    return Ok(mv as u64);
177                }
178            }
179            SeekFrom::End(_) => {}
180            SeekFrom::Current(sz) => {
181                let remaining = self.end - self.pos;
182
183                if sz >= 0 {
184                    if sz as usize <= remaining {
185                        self.index += sz as usize;
186                        self.pos += sz as usize;
187                        return Ok(self.index as u64);
188                    } else {
189                        pos = SeekFrom::Current(sz - remaining as i64);
190                    }
191                }
192            }
193        };
194
195        match self.inner.seek(pos) {
196            Ok(sz) => {
197                self.index = sz as usize;
198                self.pos = 0;
199                self.end = 0;
200                self.fill_buf()?;
201                Ok(sz)
202            }
203            Err(e) => Err(e),
204        }
205    }
206}
207// impl<R> fmt::Debug for AccReader<R> where R: fmt::Debug {
208// fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
209// fmt.debug_struct("AccReader")
210// .field("reader", &self.inner)
211// .field("buffer", &format_args!("{}/{}", self.end - self.pos, self.buf.len()))
212// .finish()
213// }
214// }
215
216#[cfg(test)]
217mod tests {
218    use super::*;
219    use crate::buffer::Buffered;
220    use std::io::{BufRead, Cursor};
221    use std::ops::Range;
222
223    fn assert_read_acc(bytes: &[u8], capacity: usize, ranges: &[Range<usize>]) {
224        let c = Cursor::new(bytes);
225        let mut vec = vec![0u8; bytes.len()];
226        let mut acc = AccReader::with_capacity(capacity, c);
227
228        for r in ranges {
229            acc.read_exact(&mut vec[r.clone()]).unwrap();
230        }
231
232        assert_eq!(bytes, &vec);
233    }
234
235    #[test]
236    fn same_capacity_full_read() {
237        let buf = (0u8..).take(20).collect::<Vec<u8>>();
238
239        // This is actually exactly what we want
240        #[allow(clippy::single_range_in_vec_init)]
241        assert_read_acc(&buf, 20, &[0..buf.len()]);
242    }
243
244    #[test]
245    fn split_read_1() {
246        let buf = (0u8..).take(31).collect::<Vec<u8>>();
247
248        assert_read_acc(&buf, 20, &[0..10, 10..buf.len()]);
249    }
250
251    #[test]
252    fn split_read_2() {
253        let buf = (0u8..).take(31).collect::<Vec<u8>>();
254
255        assert_read_acc(&buf, 20, &[0..3, 3..buf.len()]);
256    }
257
258    #[test]
259    fn seek_within_capacity() {
260        let buf = (0u8..).take(30).collect::<Vec<u8>>();
261        let c = Cursor::new(&buf[..]);
262
263        let mut acc = AccReader::with_capacity(15, c);
264
265        assert_eq!(5, acc.seek(SeekFrom::Current(5)).unwrap());
266        assert_eq!(10, acc.seek(SeekFrom::Current(5)).unwrap());
267        assert_eq!(15, acc.seek(SeekFrom::Current(5)).unwrap());
268    }
269
270    #[test]
271    fn seek_across_capacity() {
272        let buf = (0u8..).take(30).collect::<Vec<u8>>();
273        let c = Cursor::new(&buf[..]);
274
275        let mut acc = AccReader::with_capacity(15, c);
276
277        assert_eq!(5, acc.seek(SeekFrom::Current(5)).unwrap());
278        assert_eq!(20, acc.seek(SeekFrom::Current(15)).unwrap());
279        assert_eq!(5, acc.seek(SeekFrom::Start(5)).unwrap());
280    }
281
282    #[test]
283    fn seek_and_read() {
284        let len = 30;
285        let buf = (0u8..).take(len).collect::<Vec<u8>>();
286        let c = Cursor::new(&buf[..]);
287
288        let mut acc = AccReader::with_capacity(5, c);
289
290        assert_eq!(0, acc.stream_position().unwrap());
291
292        for i in 0..30 {
293            assert_eq!(i, read_byte(&mut acc).unwrap() as u64);
294            assert_eq!(i + 1, acc.stream_position().unwrap());
295        }
296    }
297
298    fn read_byte<R: Read + Seek>(acc: &mut AccReader<R>) -> io::Result<u8> {
299        let mut byte = [0];
300        acc.read_exact(&mut byte)?;
301        Ok(byte[0])
302    }
303
304    #[test]
305    fn reader_test() {
306        let buf = b"AAAA\nAAAB\nAAACAAADAAAEAAAF\ndabcdEEEE";
307        let c = Cursor::new(&buf[..]);
308
309        let acc = AccReader::with_capacity(20, c);
310
311        assert_eq!(4, acc.lines().count());
312    }
313
314    #[test]
315    fn grow() {
316        let buf = b"abcdefghilmnopqrst";
317        let c = Cursor::new(&buf[..]);
318
319        let mut acc = AccReader::with_capacity(4, c);
320        acc.fill_buf().unwrap();
321        assert_eq!(b"abcd", acc.data());
322        acc.consume(2);
323        assert_eq!(b"cd", acc.data());
324        acc.grow(4);
325        assert_eq!(b"cd", acc.data());
326        acc.fill_buf().unwrap();
327        assert_eq!(b"cdefghil", acc.data());
328    }
329}