1pub mod seekable_reader {
6 use std::fs;
7 use std::io;
8
9 pub trait SeekRead: io::Read + io::Seek {}
11
12 pub struct SeekableReader<R> {
13 inner: R, buffer: Vec<u8>, buffered_bytes: usize,
16 pos: usize, seekable: bool, }
19
20 impl SeekRead for fs::File {}
21 impl SeekRead for SeekableReader<fs::File> {}
22 impl SeekRead for SeekableReader<flate2::read::MultiGzDecoder<fs::File>> {}
23
24 const BUFFER_SIZE: usize = 8192;
25 impl<R: std::io::Read> SeekableReader<R> {
26 pub fn from_unbuffered_reader(reader: R, lines_to_buffer: Option<usize>) -> Self {
27 let mut inner = reader;
28 let mut buffer = Vec::<u8>::with_capacity(BUFFER_SIZE);
29 let mut lines = 0;
30 let mut bytes_read = 0;
31 loop {
32 let bytes_before = bytes_read;
33 buffer.append(&mut vec![0; BUFFER_SIZE - (buffer.len() - bytes_read)]);
34 bytes_read += inner
35 .read(&mut buffer[bytes_read..bytes_read + BUFFER_SIZE])
36 .unwrap();
37 lines += buffer[bytes_before..bytes_read]
38 .iter()
39 .filter(|&&x| x == 10)
40 .count();
41 if let Some(lines_to_buffer) = lines_to_buffer {
42 if lines > lines_to_buffer + 1 {
44 break;
45 }
46 }
47 if bytes_read - bytes_before == 0 {
48 break;
49 }
50 }
51 SeekableReader {
52 inner,
53 buffer,
54 buffered_bytes: bytes_read,
55 pos: 0,
56 seekable: true,
57 }
58 }
59 }
60
61 impl<R: std::io::Read> std::io::Read for SeekableReader<R> {
62 fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
63 let buf_len = buf.len();
64 if self.pos <= self.buffered_bytes {
65 if self.buffered_bytes - self.pos < buf_len {
66 buf[..self.buffered_bytes - self.pos]
67 .copy_from_slice(&self.buffer[self.pos..self.buffered_bytes]);
68 let len_read = self.buffered_bytes - self.pos;
69 self.pos = self.buffered_bytes;
70 Ok(len_read)
71 } else {
72 buf.copy_from_slice(&self.buffer[self.pos..self.pos + buf_len]);
73 self.pos += buf_len;
74 Ok(buf_len)
75 }
76 } else {
77 self.seekable = false;
78 self.inner.read(buf)
79 }
80 }
81 }
82
83 impl<R: io::Read> io::Seek for SeekableReader<R> {
84 fn seek(&mut self, pos: io::SeekFrom) -> Result<u64, io::Error> {
85 let error = Err(io::Error::new(
86 io::ErrorKind::InvalidInput,
87 "Seeking outside of buffer, please report to https://github.com/domoritz/arrow-tools/issues/new".to_string(),
88 ));
89 if self.seekable {
90 match pos {
91 io::SeekFrom::Start(pos) => {
92 if pos >= self.buffered_bytes as u64 {
93 error
94 } else {
95 self.pos = pos as usize;
96 Ok(pos)
97 }
98 }
99 io::SeekFrom::Current(pos) => {
100 let new_pos = self.pos as i64 + pos;
101 if 0 <= new_pos && new_pos < self.buffered_bytes as i64 {
102 self.pos = new_pos as usize;
103 Ok(new_pos as u64)
104 } else {
105 error
106 }
107 }
108 io::SeekFrom::End(_) => error,
109 }
110 } else {
111 error
112 }
113 }
114 }
115}
116
117#[cfg(test)]
118mod test;