use std::io::{self, Read};
use std::mem;
mod parser;
use self::parser::{
Parser, YAML_DOCUMENT_END_EVENT, YAML_DOCUMENT_START_EVENT, YAML_MAPPING_START_EVENT,
YAML_SCALAR_EVENT, YAML_SEQUENCE_START_EVENT, YAML_STREAM_END_EVENT,
};
pub(super) struct Chunker<R>
where
R: Read,
{
parser: Parser<ChunkReader<R>>,
last_document: Option<Document>,
current_document_kind: Option<DocumentKind>,
stream_ended: bool,
}
impl<R> Chunker<R>
where
R: Read,
{
pub(super) fn new(reader: R) -> Self {
Self {
parser: Parser::new(ChunkReader::new(reader)),
last_document: None,
current_document_kind: None,
stream_ended: false,
}
}
}
impl<R> Iterator for Chunker<R>
where
R: Read,
{
type Item = io::Result<Document>;
fn next(&mut self) -> Option<Self::Item> {
if self.stream_ended {
return None;
}
loop {
let event = match self.parser.next_event() {
Ok(event) => event,
Err(err) => return Some(Err(io::Error::new(io::ErrorKind::InvalidData, err))),
};
match event.event_type() {
YAML_DOCUMENT_START_EVENT => {
let offset = event.start_offset();
self.parser.reader_mut().trim_to_offset(offset);
self.current_document_kind = None;
if let Some(doc) = self.last_document.take() {
return Some(Ok(doc));
}
}
YAML_SCALAR_EVENT => {
self.current_document_kind
.get_or_insert(DocumentKind::Scalar);
}
YAML_SEQUENCE_START_EVENT | YAML_MAPPING_START_EVENT => {
self.current_document_kind
.get_or_insert(DocumentKind::Collection);
}
YAML_DOCUMENT_END_EVENT => {
let chunk = self.parser.reader_mut().take_to_offset(event.end_offset());
self.last_document = Some(Document {
content: String::from_utf8(chunk).unwrap(),
kind: self.current_document_kind.take(),
});
}
YAML_STREAM_END_EVENT => {
self.stream_ended = true;
return self.last_document.take().map(Ok);
}
_ => {}
};
}
}
}
pub(super) struct Document {
content: String,
kind: Option<DocumentKind>,
}
pub(super) enum DocumentKind {
Scalar,
Collection,
}
impl Document {
pub(super) fn content(&self) -> &str {
&self.content
}
pub(super) fn is_collection(&self) -> bool {
matches!(self.kind, Some(DocumentKind::Collection))
}
}
struct ChunkReader<R>
where
R: Read,
{
reader: R,
captured: Vec<u8>,
captured_start_offset: u64,
}
impl<R> ChunkReader<R>
where
R: Read,
{
fn new(reader: R) -> Self {
Self {
reader,
captured: vec![],
captured_start_offset: 0,
}
}
fn trim_to_offset(&mut self, offset: u64) {
let trim_len = usize::try_from(offset - self.captured_start_offset).unwrap();
self.captured_start_offset = offset;
self.captured.drain(..trim_len);
}
fn take_to_offset(&mut self, offset: u64) -> Vec<u8> {
let take_len = usize::try_from(offset - self.captured_start_offset).unwrap();
let tail = self.captured.split_off(take_len);
self.captured_start_offset = offset;
mem::replace(&mut self.captured, tail)
}
}
impl<R> Read for ChunkReader<R>
where
R: Read,
{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let len = self.reader.read(buf)?;
self.captured.extend_from_slice(&buf[..len]);
Ok(len)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn chunker_normal_usage() {
const INPUT: &str = r"---
test: true
---
12345
---
[list, of strings]
";
let chunker = Chunker::new(INPUT.as_bytes());
let docs = chunker.collect::<Result<Vec<_>, io::Error>>().unwrap();
let contents = docs.iter().map(|doc| doc.content()).collect::<Vec<_>>();
assert_eq!(
&contents,
&[
"---\ntest: true\n",
"---\n12345\n",
"---\n[list, of strings]\n",
]
);
let collections: Vec<_> = docs.iter().map(Document::is_collection).collect();
assert_eq!(&collections, &[true, false, true]);
}
#[test]
fn chunker_unknown_anchor() {
const INPUT: &str = "*y";
let chunker = Chunker::new(INPUT.as_bytes());
chunker.collect::<Result<Vec<_>, io::Error>>().unwrap();
}
#[test]
#[should_panic]
fn chunker_misbehaving_reader() {
let chunker = Chunker::new(MisbehavingReader("---\nevil: true".as_bytes()));
let _ = chunker.collect::<Vec<_>>();
}
struct MisbehavingReader<R: Read>(R);
impl<R: Read> Read for MisbehavingReader<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.0.read(buf).and(Ok(buf.len() + 1))
}
}
}