use std::collections::VecDeque;
#[derive(Debug)]
pub struct LineReader<T: AsRef<[u8]>> {
queue: VecDeque<T>,
read_pos: usize,
search_pos: usize,
buf: Vec<u8>,
}
impl<T: AsRef<[u8]>> LineReader<T> {
pub fn new() -> LineReader<T> {
Self {
queue: VecDeque::new(),
read_pos: 0,
search_pos: 0,
buf: Vec::new(),
}
}
pub fn push(&mut self, b: T) {
self.queue.push_back(b);
}
fn drop_previous_line(&mut self) {
while self.read_pos > 0
&& self.read_pos >= self.queue.front().map(|f| f.as_ref().len()).unwrap_or(0)
{
self.read_pos -= self.queue.front().map(|f| f.as_ref().len()).unwrap_or(0);
self.queue.pop_front();
if self.search_pos > 0 {
self.search_pos -= 1;
}
}
self.buf.clear();
}
#[allow(unused)]
pub fn line_or_drain(&mut self) -> Option<&[u8]> {
self.line_with_drain(true)
}
#[allow(unused)]
pub fn line(&mut self) -> Option<&[u8]> {
self.line_with_drain(false)
}
fn find_newline(&mut self) -> Option<(usize, usize)> {
let mut offset = 0;
for (idx, buf) in self.queue.iter().enumerate() {
let buf = buf.as_ref();
if idx < self.search_pos {
offset += buf.len();
continue;
}
let pos = buf
.iter()
.enumerate()
.skip(if idx == 0 { self.read_pos } else { 0 })
.find(|(_, b)| **b == b'\n')
.map(|(idx, _)| idx);
if let Some(pos) = pos {
self.search_pos = idx;
return Some((idx, offset + pos + 1));
}
self.search_pos = idx + 1;
offset += buf.len();
}
None
}
fn copy_into_internal_buffer(&mut self, last_idx: usize, offset: usize, len: usize) {
if self.buf.capacity() < len {
self.buf.reserve(len - self.buf.capacity());
}
for (idx, buf) in self.queue.iter().enumerate().take(last_idx + 1) {
let buf = buf.as_ref();
let rem = len - self.buf.len();
assert!(rem > 0);
let buf_len = if idx == 0 {
assert!(offset < buf.len());
buf.len() - offset
} else {
buf.len()
};
let copy_len = if rem > buf_len { buf_len } else { rem };
assert!(copy_len > 0);
if idx == 0 {
self.buf
.extend_from_slice(&buf[offset..(offset + copy_len)]);
} else {
self.buf.extend_from_slice(&buf[..copy_len]);
}
}
assert_eq!(self.buf.len(), len);
}
pub fn line_with_drain(&mut self, drain: bool) -> Option<&[u8]> {
self.drop_previous_line();
assert!(
self.read_pos == 0
|| self.read_pos < self.queue.front().map(|f| f.as_ref().len()).unwrap_or(0)
);
if let Some((idx, pos)) = self.find_newline() {
let old_read_pos = self.read_pos;
self.read_pos = pos;
assert!(self.read_pos > old_read_pos);
assert!(idx < self.queue.len());
if idx == 0 {
let buf = self.queue.front().unwrap().as_ref();
return Some(&buf[old_read_pos..self.read_pos]);
} else {
let len = self.read_pos - old_read_pos;
self.copy_into_internal_buffer(idx, old_read_pos, len);
return Some(&self.buf[0..len]);
}
}
if !drain {
return None;
}
if self.queue.is_empty() {
return None;
}
if self.queue.len() == 1 {
let res = &self.queue.front().unwrap().as_ref()[self.read_pos..];
self.read_pos += res.len();
self.search_pos = 1;
return Some(res);
}
let len = self.queue.iter().map(|v| v.as_ref().len()).sum::<usize>();
if self.buf.capacity() < len {
self.buf.reserve(len - self.buf.capacity());
}
for (idx, ref v) in self.queue.iter().enumerate() {
if idx == 0 {
self.buf.extend_from_slice(&v.as_ref()[self.read_pos..]);
} else {
self.buf.extend_from_slice(v.as_ref());
}
}
self.read_pos += self.buf.len();
self.search_pos = self.queue.len();
Some(self.buf.as_ref())
}
pub fn clear(&mut self) {
self.queue.clear();
self.read_pos = 0;
self.search_pos = 0;
self.buf.clear();
}
}
#[cfg(test)]
mod tests {
use super::LineReader;
#[test]
fn test_single_buffer() {
let mut r = LineReader::new();
r.push(Vec::from(b"abcd\nefgh\nijkl\n".as_ref()));
assert_eq!(r.line(), Some(b"abcd\n".as_ref()));
assert_eq!(r.line(), Some(b"efgh\n".as_ref()));
assert_eq!(r.line(), Some(b"ijkl\n".as_ref()));
assert_eq!(r.line(), None);
}
#[test]
fn test_empty_line() {
let mut r = LineReader::new();
r.push(Vec::from(b"abcd\nefgh\n\nijkl\n".as_ref()));
assert_eq!(r.line(), Some(b"abcd\n".as_ref()));
assert_eq!(r.line(), Some(b"efgh\n".as_ref()));
assert_eq!(r.line(), Some(b"\n".as_ref()));
assert_eq!(r.line(), Some(b"ijkl\n".as_ref()));
assert_eq!(r.line(), None);
}
#[test]
fn test_multi_buffer_split() {
let mut r = LineReader::new();
r.push(Vec::from(b"abcd\nef".as_ref()));
r.push(Vec::from(b"gh\nijkl\n".as_ref()));
assert_eq!(r.line(), Some(b"abcd\n".as_ref()));
assert_eq!(r.line(), Some(b"efgh\n".as_ref()));
assert_eq!(r.line(), Some(b"ijkl\n".as_ref()));
assert_eq!(r.line(), None);
}
#[test]
fn test_multi_buffer_split_2() {
let mut r = LineReader::new();
r.push(Vec::from(b"abcd\ne".as_ref()));
r.push(Vec::from(b"f".as_ref()));
r.push(Vec::from(b"g".as_ref()));
r.push(Vec::from(b"h\nijkl\n".as_ref()));
assert_eq!(r.line(), Some(b"abcd\n".as_ref()));
assert_eq!(r.line(), Some(b"efgh\n".as_ref()));
assert_eq!(r.line(), Some(b"ijkl\n".as_ref()));
assert_eq!(r.line(), None);
}
#[test]
fn test_single_buffer_drain() {
let mut r = LineReader::new();
r.push(Vec::from(b"abcd\nefgh\nijkl".as_ref()));
assert_eq!(r.line(), Some(b"abcd\n".as_ref()));
assert_eq!(r.line(), Some(b"efgh\n".as_ref()));
assert_eq!(r.line(), None);
assert_eq!(r.line_or_drain(), Some(b"ijkl".as_ref()));
assert_eq!(r.line_or_drain(), None);
}
#[test]
fn test_single_buffer_drain_multi_line() {
let mut r = LineReader::new();
r.push(Vec::from(b"abcd\nefgh\n".as_ref()));
r.push(Vec::from(b"ijkl".as_ref()));
assert_eq!(r.line(), Some(b"abcd\n".as_ref()));
assert_eq!(r.line(), Some(b"efgh\n".as_ref()));
assert_eq!(r.line(), None);
assert_eq!(r.line_or_drain(), Some(b"ijkl".as_ref()));
assert_eq!(r.line_or_drain(), None);
}
#[test]
fn test_single_buffer_drain_multi_line_2() {
let mut r = LineReader::new();
r.push(Vec::from(b"abcd\nefgh\ni".as_ref()));
r.push(Vec::from(b"j".as_ref()));
r.push(Vec::from(b"k".as_ref()));
r.push(Vec::from(b"l".as_ref()));
assert_eq!(r.line(), Some(b"abcd\n".as_ref()));
assert_eq!(r.line(), Some(b"efgh\n".as_ref()));
assert_eq!(r.line(), None);
assert_eq!(r.line_or_drain(), Some(b"ijkl".as_ref()));
assert_eq!(r.line_or_drain(), None);
}
}