xt 0.21.0

Translate between serialized data formats
Documentation
//! Support for splitting YAML 1.2 streams into their constituent documents.
//!
//! This is an awful hack to provide some level of streaming input support atop [`serde_yaml`],
//! which as of writing must buffer all input before parsing it (the convenience methods that parse
//! from readers secretly buffer for you). [`Chunker`] snoops on the reader's bytes as the
//! underlying parser from `serde_yaml` processes them, and chunks them up based on document start
//! and end offsets from the parser's event stream. Passing those chunks to `serde_yaml` for a
//! second pass should yield the same result as parsing the entire original stream.
//!
//! I'd love to have the time and energy someday to implement something more proper.

use std::io::{self, Read};
use std::mem;

mod parser;

use crate::input::cast_read_offset_usize;

use self::parser::{
	Parser, YAML_DOCUMENT_END_EVENT, YAML_DOCUMENT_START_EVENT, YAML_MAPPING_START_EVENT,
	YAML_SCALAR_EVENT, YAML_SEQUENCE_START_EVENT, YAML_STREAM_END_EVENT,
};

/// An iterator over individual raw documents in a UTF-8-encoded YAML stream.
pub(super) struct Chunker<R>
where
	R: Read,
{
	parser: Parser<ChunkReader<R>>,
	last_document: Option<Document>,
	current_document_kind: Option<DocumentKind>,
	stream_ended: bool,
}

impl<R> Chunker<R>
where
	R: Read,
{
	/// Creates a new chunker for the YAML stream produced by the reader.
	///
	/// YAML 1.2 allows several different text encodings for YAML streams, as well as byte order
	/// marks at the start of the stream or individual documents. However, `Chunker` requires a
	/// UTF-8 stream without BOMs. Consider using the [`encoding`](super::encoding) module to
	/// re-encode non-UTF-8 streams.
	pub(super) fn new(reader: R) -> Self {
		Self {
			parser: Parser::new(ChunkReader::new(reader)),
			last_document: None,
			current_document_kind: None,
			stream_ended: false,
		}
	}
}

impl<R> Iterator for Chunker<R>
where
	R: Read,
{
	type Item = io::Result<Document>;

	fn next(&mut self) -> Option<Self::Item> {
		if self.stream_ended {
			return None;
		}

		loop {
			let event = match self.parser.next_event() {
				Ok(event) => event,
				Err(err) => return Some(Err(io::Error::new(io::ErrorKind::InvalidData, err))),
			};

			// Note that while we chunk on DOCUMENT_END events, we don't emit the chunk until the
			// next DOCUMENT_START or STREAM_END. The parser sometimes sees valid documents in
			// non-YAML inputs, and only fails when it looks for the start of the next document.
			// This is bad when the chunker's output determines whether an arbitrary input is valid
			// YAML (e.g. xt's format detection).
			match event.event_type() {
				YAML_DOCUMENT_START_EVENT => {
					let offset = event.start_offset();
					self.parser.reader_mut().trim_to_offset(offset);
					self.current_document_kind = None;
					if let Some(doc) = self.last_document.take() {
						return Some(Ok(doc));
					}
				}
				YAML_SCALAR_EVENT => {
					self.current_document_kind
						.get_or_insert(DocumentKind::Scalar);
				}
				YAML_SEQUENCE_START_EVENT | YAML_MAPPING_START_EVENT => {
					self.current_document_kind
						.get_or_insert(DocumentKind::Collection);
				}
				YAML_DOCUMENT_END_EVENT => {
					let chunk = self.parser.reader_mut().take_to_offset(event.end_offset());
					self.last_document = Some(Document {
						kind: self.current_document_kind.take(),
						content: String::from_utf8(chunk)
							.expect("libyaml with YAML_UTF8_ENCODING should yield UTF-8 chunks"),
					});
				}
				YAML_STREAM_END_EVENT => {
					self.stream_ended = true;
					return self.last_document.take().map(Ok);
				}
				_ => {}
			};
		}
	}
}

/// A UTF-8 encoded YAML document.
pub(super) struct Document {
	content: String,
	kind: Option<DocumentKind>,
}

/// The type of content contained in a YAML document.
pub(super) enum DocumentKind {
	Scalar,
	Collection,
}

impl Document {
	/// Returns the original text of the document.
	pub(super) fn content(&self) -> &str {
		&self.content
	}

	/// Returns true if the content of the document is a collection (sequence or mapping).
	pub(super) fn is_collection(&self) -> bool {
		matches!(self.kind, Some(DocumentKind::Collection))
	}
}

/// A reader that captures bytes read from a source and provides them in chunks.
struct ChunkReader<R>
where
	R: Read,
{
	reader: R,
	captured: Vec<u8>,
	captured_start_offset: u64,
}

impl<R> ChunkReader<R>
where
	R: Read,
{
	fn new(reader: R) -> Self {
		Self {
			reader,
			captured: vec![],
			captured_start_offset: 0,
		}
	}

	/// Trims from the start of the capture buffer so the next chunk will begin at the specified
	/// reader offset.
	fn trim_to_offset(&mut self, offset: u64) {
		let trim_len = cast_read_offset_usize(offset - self.captured_start_offset);
		self.captured_start_offset = offset;
		self.captured.drain(..trim_len);
	}

	/// Takes the chunk from the start of the capture buffer up to the specified reader offset,
	/// leaving bytes beyond the offset in the capture buffer.
	fn take_to_offset(&mut self, offset: u64) -> Vec<u8> {
		let take_len = cast_read_offset_usize(offset - self.captured_start_offset);
		let tail = self.captured.split_off(take_len);
		self.captured_start_offset = offset;
		mem::replace(&mut self.captured, tail)
	}
}

impl<R> Read for ChunkReader<R>
where
	R: Read,
{
	fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
		// The `read` docs recommend against us reading from `buf`, but also require callers to
		// assume we might do this. Letting our consumer drive the frequency and size of source
		// reads makes our presence more transparent to both sides. We try to be good citizens by
		// only reading the parts of `buf` the source tells us were freshly written.
		let len = self.reader.read(buf)?;
		self.captured.extend_from_slice(&buf[..len]);
		Ok(len)
	}
}

#[cfg(test)]
mod tests {
	use super::*;

	#[test]
	fn chunker_normal_usage() {
		const INPUT: &str = r"---
test: true
---
12345
---
[list, of strings]
";

		let chunker = Chunker::new(INPUT.as_bytes());
		let docs = chunker.collect::<Result<Vec<_>, io::Error>>().unwrap();

		let contents = docs.iter().map(|doc| doc.content()).collect::<Vec<_>>();
		assert_eq!(
			&contents,
			&[
				"---\ntest: true\n",
				"---\n12345\n",
				"---\n[list, of strings]\n",
			]
		);

		let collections: Vec<_> = docs.iter().map(Document::is_collection).collect();
		assert_eq!(&collections, &[true, false, true]);
	}

	// Tests that a YAML document consisting solely of an unknown anchor doesn't crash the chunker.
	// Fuzzing revealed this bug (via this exact input) in a previous implementation.
	#[test]
	fn chunker_unknown_anchor() {
		const INPUT: &str = "*y";
		let chunker = Chunker::new(INPUT.as_bytes());
		chunker.collect::<Result<Vec<_>, io::Error>>().unwrap();
	}

	#[test]
	#[should_panic]
	fn chunker_misbehaving_reader() {
		let chunker = Chunker::new(MisbehavingReader("---\nevil: true".as_bytes()));
		let _ = chunker.collect::<Vec<_>>();
	}

	/// A reader that always reports having read 1 byte more than the length of the buffer provided
	/// to [`read`](Read::read), regardless of the actual size of the underlying read.
	struct MisbehavingReader<R: Read>(R);

	impl<R: Read> Read for MisbehavingReader<R> {
		fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
			self.0.read(buf).and(Ok(buf.len() + 1))
		}
	}
}