use crate::error::TomqError;
use std::io::{ErrorKind, Read};
use toml_edit::Document;
pub(crate) trait JsonIter: Iterator<Item = crate::error::Result<serde_json::Value>> {}
pub(crate) trait TomqIter<T>: Iterator<Item = crate::error::Result<T>> {}
impl<T, E> TomqIter<E> for T where T: Iterator<Item = crate::error::Result<E>> {}
pub(crate) struct DocumentIterator<R> {
reader: DelimiterTerminationReader<R>,
ignore_error: bool,
}
impl<R: Read> DocumentIterator<R> {
pub(crate) fn new(reader: R, separator: String, ignore_error: bool) -> Self {
Self {
reader: DelimiterTerminationReader::new(reader, separator.into_bytes()),
ignore_error,
}
}
}
impl<R: Read> Iterator for DocumentIterator<R> {
type Item = Result<Document, TomqError>;
fn next(&mut self) -> Option<Self::Item> {
loop {
let r = match self.reader.read_until_delimiter() {
Ok(s) => Some(s.parse::<Document>().map_err(|e| TomqError::TomlParse(e))),
Err(e) if e.kind() == ErrorKind::UnexpectedEof => None,
Err(e) => Some(Err(TomqError::Io(e))),
};
match r {
Some(Err(_)) if self.ignore_error => continue,
_ => return r,
}
}
}
}
pub(crate) struct DelimiterTerminationReader<R> {
reader: R,
buf: Vec<u8>,
delimiter: Vec<u8>,
}
impl<R: Read> DelimiterTerminationReader<R> {
pub(crate) fn new(reader: R, delimiter: Vec<u8>) -> Self {
Self {
reader,
buf: Vec::new(),
delimiter,
}
}
pub(crate) fn read_until_delimiter(&mut self) -> std::io::Result<String> {
loop {
let mut local_buf = [0; 4096];
match self.reader.read(&mut local_buf) {
Ok(n) => {
if n == 0 {
if self.buf.is_empty() {
return Err(ErrorKind::UnexpectedEof.into());
} else if self.buf.len() < self.delimiter.len() {
let s = String::from_utf8(self.buf.clone())
.map_err(|e| std::io::Error::new(ErrorKind::InvalidData, e))?;
self.buf.clear();
return Ok(s);
} else if self.buf[..] == self.delimiter[..] {
self.buf.clear();
return Ok(String::new());
}
}
if n == self.delimiter.len() && &local_buf[..n] == self.delimiter.as_slice() {
let s = String::from_utf8(self.buf.clone())
.map_err(|e| std::io::Error::new(ErrorKind::InvalidData, e))?;
self.buf.clear();
return Ok(s);
} else {
self.buf.extend_from_slice(&local_buf[..n]);
}
if let Some(pos) = self
.buf
.windows(self.delimiter.len())
.position(|w| w == &self.delimiter[..])
{
let data = self.buf.split_off(pos + self.delimiter.len());
let s = String::from_utf8(self.buf[..pos].to_vec())
.map_err(|e| std::io::Error::new(ErrorKind::InvalidData, e))?;
self.buf = data;
return Ok(s);
} else if n == 0 && !self.buf.is_empty() {
let s = String::from_utf8(self.buf.clone())
.map_err(|e| std::io::Error::new(ErrorKind::InvalidData, e))?;
self.buf.clear();
return Ok(s);
}
}
Err(e) if e.kind() == ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
}
}
}
}