tomq 0.1.2

jq, but from TOML
use crate::error::TomqError;
use std::io::{ErrorKind, Read};
use toml_edit::Document;

/// Json document iterator alias.
pub(crate) trait JsonIter: Iterator<Item = crate::error::Result<serde_json::Value>> {}
/// Toml iterator alias.
pub(crate) trait TomqIter<T>: Iterator<Item = crate::error::Result<T>> {}

impl<T, E> TomqIter<E> for T where T: Iterator<Item = crate::error::Result<E>> {}

pub(crate) struct DocumentIterator<R> {
    reader: DelimiterTerminationReader<R>,
    ignore_error: bool,
}

impl<R: Read> DocumentIterator<R> {
    pub(crate) fn new(reader: R, separator: String, ignore_error: bool) -> Self {
        Self {
            reader: DelimiterTerminationReader::new(reader, separator.into_bytes()),
            ignore_error,
        }
    }
}

impl<R: Read> Iterator for DocumentIterator<R> {
    type Item = Result<Document, TomqError>;

    fn next(&mut self) -> Option<Self::Item> {
        loop {
            let r = match self.reader.read_until_delimiter() {
                Ok(s) => Some(s.parse::<Document>().map_err(|e| TomqError::TomlParse(e))),
                Err(e) if e.kind() == ErrorKind::UnexpectedEof => None,
                Err(e) => Some(Err(TomqError::Io(e))),
            };

            match r {
                Some(Err(_)) if self.ignore_error => continue,
                _ => return r,
            }
        }
    }
}

pub(crate) struct DelimiterTerminationReader<R> {
    reader: R,
    /// Holds the entire string, since `toml_edit` doesn't support `Read`, this is not a problem.
    buf: Vec<u8>,
    delimiter: Vec<u8>,
}

impl<R: Read> DelimiterTerminationReader<R> {
    pub(crate) fn new(reader: R, delimiter: Vec<u8>) -> Self {
        Self {
            reader,
            buf: Vec::new(),
            delimiter,
        }
    }

    pub(crate) fn read_until_delimiter(&mut self) -> std::io::Result<String> {
        loop {
            let mut local_buf = [0; 4096];
            match self.reader.read(&mut local_buf) {
                Ok(n) => {
                    if n == 0 {
                        if self.buf.is_empty() {
                            return Err(ErrorKind::UnexpectedEof.into());
                        } else if self.buf.len() < self.delimiter.len() {
                            let s = String::from_utf8(self.buf.clone())
                                .map_err(|e| std::io::Error::new(ErrorKind::InvalidData, e))?;
                            self.buf.clear();
                            return Ok(s);
                        } else if self.buf[..] == self.delimiter[..] {
                            self.buf.clear();
                            return Ok(String::new());
                        }
                    }

                    // Fast path to skip windowed search if possible.
                    // This path will always be followed whenever `MultiFileRead` is used
                    // since it only fills the buffer with the delimiter after there is no
                    // more data to read from the left side of the chain (so the last read is all
                    // from the left side, and the next one starts from the right side, even if
                    // the right side already had data available).
                    if n == self.delimiter.len() && &local_buf[..n] == self.delimiter.as_slice() {
                        let s = String::from_utf8(self.buf.clone())
                            .map_err(|e| std::io::Error::new(ErrorKind::InvalidData, e))?;
                        self.buf.clear();
                        return Ok(s);
                    } else {
                        self.buf.extend_from_slice(&local_buf[..n]);
                    }

                    // We could skip some characters if we guarantee that the buf size is never smaller
                    // than the delimiter size. It's possible to go with a very big buffer size
                    // or heap allocated buffer which uses the delimiter size if it is bigger than
                    // default buffer size.
                    if let Some(pos) = self
                        .buf
                        .windows(self.delimiter.len())
                        .position(|w| w == &self.delimiter[..])
                    {
                        let data = self.buf.split_off(pos + self.delimiter.len());
                        let s = String::from_utf8(self.buf[..pos].to_vec())
                            .map_err(|e| std::io::Error::new(ErrorKind::InvalidData, e))?;

                        self.buf = data;
                        return Ok(s);
                    } else if n == 0 && !self.buf.is_empty() {
                        let s = String::from_utf8(self.buf.clone())
                            .map_err(|e| std::io::Error::new(ErrorKind::InvalidData, e))?;
                        self.buf.clear();
                        return Ok(s);
                    }
                }
                Err(e) if e.kind() == ErrorKind::Interrupted => continue,
                Err(e) => return Err(e),
            }
        }
    }
}