av_format/buffer/
accreader.rs1use crate::buffer::Buffered;
8use std::cmp;
9use std::io;
10use std::io::{BufRead, Read, Result, Seek, SeekFrom};
11use std::iter;
12use std::iter::Iterator;
13
14pub struct AccReader<R> {
16 inner: R,
17 buf: Vec<u8>,
18 pos: usize,
19 end: usize,
20 index: usize,
22}
23
24impl<R: Read + Seek> AccReader<R> {
25 pub fn new(inner: R) -> AccReader<R> {
27 AccReader::with_capacity(4096, inner)
28 }
29
30 pub fn with_capacity(cap: usize, inner: R) -> AccReader<R> {
33 AccReader {
34 inner,
35 buf: iter::repeat(0).take(cap).collect::<Vec<_>>(),
36 pos: 0,
37 end: 0,
38 index: 0,
39 }
40 }
41
42 pub fn get_ref(&self) -> &R {
44 &self.inner
45 }
46
47 pub fn get_mut(&mut self) -> &mut R {
49 &mut self.inner
50 }
51
52 pub fn into_inner(self) -> R {
56 self.inner
57 }
58
59 pub fn reset_buffer_position(&mut self) {
63 log::trace!(
64 "resetting buffer at pos: {} capacity: {}",
65 self.pos,
66 self.end
67 );
68 if self.end - self.pos > 0 {
69 log::trace!("copying {} to beginning of buffer", self.end - self.pos);
70 self.buf.copy_within(self.pos..self.end, 0);
71 }
72 self.end -= self.pos;
73 self.pos = 0;
74 }
75
76 pub fn current_slice(&self) -> &[u8] {
78 log::trace!("current slice pos: {}, cap: {}", self.pos, self.end);
79 &self.buf[self.pos..self.end]
80 }
81
82 pub fn capacity(&self) -> usize {
84 self.end - self.pos
85 }
86}
87
88impl<R: Read + Seek + Send + Sync> Buffered for AccReader<R> {
89 fn data(&self) -> &[u8] {
90 &self.buf[self.pos..self.end]
91 }
92 fn grow(&mut self, len: usize) {
93 let l = self.buf.len() + len;
94 self.buf.resize(l, 0);
95 }
96}
97
98impl<R: Read + Seek> Read for AccReader<R> {
99 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
100 log::trace!(
101 "read pos: {} cap: {} buflen: {}",
102 self.pos,
103 self.end,
104 buf.len()
105 );
106 if buf.len() < self.end - self.pos {
107 match (&self.buf[self.pos..(self.pos + buf.len())]).read(buf) {
108 Ok(len) => {
109 self.consume(len);
110 Ok(len)
111 }
112 Err(e) => Err(e),
113 }
114 } else {
115 if buf.len() > self.buf.len() {
119 match (&self.buf[self.pos..self.end]).read(buf) {
120 Ok(len) => {
121 let total_len = self.inner.read(&mut buf[(self.end - self.pos)..])? + len;
122
123 self.consume(total_len);
124 self.reset_buffer_position();
125
126 Ok(total_len)
127 }
128 Err(e) => Err(e),
129 }
130 } else {
131 let nread = {
132 let mut rem = self.fill_buf()?;
133 rem.read(buf)?
134 };
135 self.consume(nread);
136 Ok(nread)
137 }
138 }
139 }
140}
141
142impl<R: Read + Seek> BufRead for AccReader<R> {
143 fn fill_buf(&mut self) -> io::Result<&[u8]> {
144 if self.pos != 0 || self.end != self.buf.len() {
146 self.reset_buffer_position();
147 log::trace!("buffer reset ended");
148 let read = self.inner.read(&mut self.buf[self.end..])?;
149 self.end += read;
150 log::trace!(
151 "new pos: {} and cap: {} -> current: {:?}",
152 self.pos,
153 self.end,
154 &self.buf[self.pos..self.end]
155 );
156 }
157 Ok(&self.buf[self.pos..self.end])
158 }
159
160 fn consume(&mut self, amt: usize) {
161 log::trace!("consumed {} bytes", amt);
162 self.pos = cmp::min(self.pos + amt, self.end);
163 self.index += amt;
164 }
165}
166
167impl<R: Read + Seek> Seek for AccReader<R> {
168 fn seek(&mut self, mut pos: SeekFrom) -> Result<u64> {
169 match pos {
170 SeekFrom::Start(sz) => {
171 let mv = sz as usize;
172 if mv >= self.index && mv < self.index + self.end - self.pos {
173 self.pos += mv - self.index;
174 self.index = mv;
175
176 return Ok(mv as u64);
177 }
178 }
179 SeekFrom::End(_) => {}
180 SeekFrom::Current(sz) => {
181 let remaining = self.end - self.pos;
182
183 if sz >= 0 {
184 if sz as usize <= remaining {
185 self.index += sz as usize;
186 self.pos += sz as usize;
187 return Ok(self.index as u64);
188 } else {
189 pos = SeekFrom::Current(sz - remaining as i64);
190 }
191 }
192 }
193 };
194
195 match self.inner.seek(pos) {
196 Ok(sz) => {
197 self.index = sz as usize;
198 self.pos = 0;
199 self.end = 0;
200 self.fill_buf()?;
201 Ok(sz)
202 }
203 Err(e) => Err(e),
204 }
205 }
206}
207#[cfg(test)]
217mod tests {
218 use super::*;
219 use crate::buffer::Buffered;
220 use std::io::{BufRead, Cursor};
221 use std::ops::Range;
222
223 fn assert_read_acc(bytes: &[u8], capacity: usize, ranges: &[Range<usize>]) {
224 let c = Cursor::new(bytes);
225 let mut vec = vec![0u8; bytes.len()];
226 let mut acc = AccReader::with_capacity(capacity, c);
227
228 for r in ranges {
229 acc.read_exact(&mut vec[r.clone()]).unwrap();
230 }
231
232 assert_eq!(bytes, &vec);
233 }
234
235 #[test]
236 fn same_capacity_full_read() {
237 let buf = (0u8..).take(20).collect::<Vec<u8>>();
238
239 #[allow(clippy::single_range_in_vec_init)]
241 assert_read_acc(&buf, 20, &[0..buf.len()]);
242 }
243
244 #[test]
245 fn split_read_1() {
246 let buf = (0u8..).take(31).collect::<Vec<u8>>();
247
248 assert_read_acc(&buf, 20, &[0..10, 10..buf.len()]);
249 }
250
251 #[test]
252 fn split_read_2() {
253 let buf = (0u8..).take(31).collect::<Vec<u8>>();
254
255 assert_read_acc(&buf, 20, &[0..3, 3..buf.len()]);
256 }
257
258 #[test]
259 fn seek_within_capacity() {
260 let buf = (0u8..).take(30).collect::<Vec<u8>>();
261 let c = Cursor::new(&buf[..]);
262
263 let mut acc = AccReader::with_capacity(15, c);
264
265 assert_eq!(5, acc.seek(SeekFrom::Current(5)).unwrap());
266 assert_eq!(10, acc.seek(SeekFrom::Current(5)).unwrap());
267 assert_eq!(15, acc.seek(SeekFrom::Current(5)).unwrap());
268 }
269
270 #[test]
271 fn seek_across_capacity() {
272 let buf = (0u8..).take(30).collect::<Vec<u8>>();
273 let c = Cursor::new(&buf[..]);
274
275 let mut acc = AccReader::with_capacity(15, c);
276
277 assert_eq!(5, acc.seek(SeekFrom::Current(5)).unwrap());
278 assert_eq!(20, acc.seek(SeekFrom::Current(15)).unwrap());
279 assert_eq!(5, acc.seek(SeekFrom::Start(5)).unwrap());
280 }
281
282 #[test]
283 fn seek_and_read() {
284 let len = 30;
285 let buf = (0u8..).take(len).collect::<Vec<u8>>();
286 let c = Cursor::new(&buf[..]);
287
288 let mut acc = AccReader::with_capacity(5, c);
289
290 assert_eq!(0, acc.stream_position().unwrap());
291
292 for i in 0..30 {
293 assert_eq!(i, read_byte(&mut acc).unwrap() as u64);
294 assert_eq!(i + 1, acc.stream_position().unwrap());
295 }
296 }
297
298 fn read_byte<R: Read + Seek>(acc: &mut AccReader<R>) -> io::Result<u8> {
299 let mut byte = [0];
300 acc.read_exact(&mut byte)?;
301 Ok(byte[0])
302 }
303
304 #[test]
305 fn reader_test() {
306 let buf = b"AAAA\nAAAB\nAAACAAADAAAEAAAF\ndabcdEEEE";
307 let c = Cursor::new(&buf[..]);
308
309 let acc = AccReader::with_capacity(20, c);
310
311 assert_eq!(4, acc.lines().count());
312 }
313
314 #[test]
315 fn grow() {
316 let buf = b"abcdefghilmnopqrst";
317 let c = Cursor::new(&buf[..]);
318
319 let mut acc = AccReader::with_capacity(4, c);
320 acc.fill_buf().unwrap();
321 assert_eq!(b"abcd", acc.data());
322 acc.consume(2);
323 assert_eq!(b"cd", acc.data());
324 acc.grow(4);
325 assert_eq!(b"cd", acc.data());
326 acc.fill_buf().unwrap();
327 assert_eq!(b"cdefghil", acc.data());
328 }
329}