arrow_tools/
lib.rs

1//! # Arrow-tools
2//! This crate serves a general util library to go along
3//! with all of the crates within the arrow-tools suite.
4
5pub mod seekable_reader {
6    use std::fs;
7    use std::io;
8
9    /// A trait for a reader that can seek to a position
10    pub trait SeekRead: io::Read + io::Seek {}
11
12    pub struct SeekableReader<R> {
13        inner: R,        // underlying reader
14        buffer: Vec<u8>, // buffer for the first n lines
15        buffered_bytes: usize,
16        pos: usize,     // current position in the buffer
17        seekable: bool, // whether seek is still possible
18    }
19
20    impl SeekRead for fs::File {}
21    impl SeekRead for SeekableReader<fs::File> {}
22    impl SeekRead for SeekableReader<flate2::read::MultiGzDecoder<fs::File>> {}
23
24    const BUFFER_SIZE: usize = 8192;
25    impl<R: std::io::Read> SeekableReader<R> {
26        pub fn from_unbuffered_reader(reader: R, lines_to_buffer: Option<usize>) -> Self {
27            let mut inner = reader;
28            let mut buffer = Vec::<u8>::with_capacity(BUFFER_SIZE);
29            let mut lines = 0;
30            let mut bytes_read = 0;
31            loop {
32                let bytes_before = bytes_read;
33                buffer.append(&mut vec![0; BUFFER_SIZE - (buffer.len() - bytes_read)]);
34                bytes_read += inner
35                    .read(&mut buffer[bytes_read..bytes_read + BUFFER_SIZE])
36                    .unwrap();
37                lines += buffer[bytes_before..bytes_read]
38                    .iter()
39                    .filter(|&&x| x == 10)
40                    .count();
41                if let Some(lines_to_buffer) = lines_to_buffer {
42                    // +1 because there may be a header
43                    if lines > lines_to_buffer + 1 {
44                        break;
45                    }
46                }
47                if bytes_read - bytes_before == 0 {
48                    break;
49                }
50            }
51            SeekableReader {
52                inner,
53                buffer,
54                buffered_bytes: bytes_read,
55                pos: 0,
56                seekable: true,
57            }
58        }
59    }
60
61    impl<R: std::io::Read> std::io::Read for SeekableReader<R> {
62        fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
63            let buf_len = buf.len();
64            if self.pos <= self.buffered_bytes {
65                if self.buffered_bytes - self.pos < buf_len {
66                    buf[..self.buffered_bytes - self.pos]
67                        .copy_from_slice(&self.buffer[self.pos..self.buffered_bytes]);
68                    let len_read = self.buffered_bytes - self.pos;
69                    self.pos = self.buffered_bytes;
70                    Ok(len_read)
71                } else {
72                    buf.copy_from_slice(&self.buffer[self.pos..self.pos + buf_len]);
73                    self.pos += buf_len;
74                    Ok(buf_len)
75                }
76            } else {
77                self.seekable = false;
78                self.inner.read(buf)
79            }
80        }
81    }
82
83    impl<R: io::Read> io::Seek for SeekableReader<R> {
84        fn seek(&mut self, pos: io::SeekFrom) -> Result<u64, io::Error> {
85            let error = Err(io::Error::new(
86                io::ErrorKind::InvalidInput,
87                "Seeking outside of buffer, please report to https://github.com/domoritz/arrow-tools/issues/new".to_string(),
88            ));
89            if self.seekable {
90                match pos {
91                    io::SeekFrom::Start(pos) => {
92                        if pos >= self.buffered_bytes as u64 {
93                            error
94                        } else {
95                            self.pos = pos as usize;
96                            Ok(pos)
97                        }
98                    }
99                    io::SeekFrom::Current(pos) => {
100                        let new_pos = self.pos as i64 + pos;
101                        if 0 <= new_pos && new_pos < self.buffered_bytes as i64 {
102                            self.pos = new_pos as usize;
103                            Ok(new_pos as u64)
104                        } else {
105                            error
106                        }
107                    }
108                    io::SeekFrom::End(_) => error,
109                }
110            } else {
111                error
112            }
113        }
114    }
115}
116
117#[cfg(test)]
118mod test;