xt 0.15.1

Translate between serialized data formats
Documentation
//! Support for splitting YAML 1.2 streams into their constituent documents.
//!
//! This is an awful hack to provide some level of streaming input support atop
//! `serde_yaml`, which as of this writing requires buffering all input before
//! parsing it (the convenience methods that parse from readers simply do this
//! buffering for you). Using the same underlying parser as `serde_yaml` (a Rust
//! translation of the venerable [libyaml][libyaml]), a [`Chunker`] iterates
//! over the documents in a YAML stream as `String`s, which can be provided one
//! by one to `serde_yaml` for actual deserialization.
//!
//! I sincerely hope that I will someday have the time and energy to implement
//! true streaming support in `serde_yaml` itself (unless, of course, someone
//! beats me to it), and that this implementation will serve as a stepping stone
//! toward that goal.
//!
//! [libyaml]: https://pyyaml.org/wiki/LibYAML

use std::error::Error;
use std::ffi::{c_void, CStr};
use std::fmt::Display;
use std::io::{self, Read};
use std::mem::{self, MaybeUninit};
use std::ops::Deref;
use std::os::raw::c_char;
use std::ptr;

use unsafe_libyaml::{
	yaml_event_delete, yaml_event_t, yaml_mark_t, yaml_parser_delete, yaml_parser_initialize,
	yaml_parser_parse, yaml_parser_set_encoding, yaml_parser_set_input, yaml_parser_t,
	YAML_DOCUMENT_END_EVENT, YAML_DOCUMENT_START_EVENT, YAML_MAPPING_START_EVENT,
	YAML_SCALAR_EVENT, YAML_SEQUENCE_START_EVENT, YAML_STREAM_END_EVENT, YAML_UTF8_ENCODING,
};

/// An iterator over individual raw documents in a UTF-8-encoded YAML stream.
pub(super) struct Chunker<R>
where
	R: Read,
{
	parser: *mut yaml_parser_t,
	read_state: *mut ReadState<R>,
	last_document: Option<Document>,
	current_document_kind: Option<DocumentKind>,
	stream_ended: bool,
}

/// The state for the libyaml input callback.
struct ReadState<R>
where
	R: Read,
{
	reader: ChunkReader<R>,
	buffer: Vec<u8>,
	error: Option<io::Error>,
}

impl<R> Chunker<R>
where
	R: Read,
{
	/// Creates a new chunker for the YAML stream produced by the reader.
	///
	/// YAML 1.2 allows several different text encodings for YAML streams, as
	/// well as the presence of byte order marks at the start of the stream or
	/// individual documents. However, `Chunker` requires a UTF-8 stream without
	/// BOMs. Consider using the [`encoding`](super::encoding) module to
	/// re-encode non-UTF-8 streams.
	pub(super) fn new(reader: R) -> Self {
		let mut parser = Box::new(MaybeUninit::<yaml_parser_t>::uninit());
		// SAFETY: libyaml functions are assumed to work correctly.
		if unsafe { yaml_parser_initialize(parser.as_mut_ptr()) }.fail {
			panic!("out of memory for libyaml parser initialization");
		}

		// NOTE: Nothing after this point is expected to panic or fail, so we
		// should not need to worry about leaking this memory before we have a
		// chance to construct the return value.
		let parser = Box::into_raw(parser).cast::<yaml_parser_t>();
		let read_state = Box::into_raw(Box::new(ReadState {
			reader: ChunkReader::new(reader),
			buffer: vec![],
			error: None,
		}));

		// SAFETY: libyaml functions are assumed to work correctly. As required
		// by `read_handler`, the data pointer points to a `ReadState<R>`.
		unsafe { yaml_parser_set_input(parser, Self::read_handler, read_state.cast::<c_void>()) };

		// SAFETY: libyaml functions are assumed to work correctly.
		unsafe { yaml_parser_set_encoding(parser, YAML_UTF8_ENCODING) };

		Self {
			parser,
			read_state,
			last_document: None,
			current_document_kind: None,
			stream_ended: false,
		}
	}

	/// Implements [`yaml_read_handler_t`](unsafe_libyaml::yaml_read_handler_t).
	///
	/// # Safety
	///
	/// The `data` pointer provided to [`yaml_parser_set_input`] alongside this
	/// function must be a valid pointer to an initialized `ReadState<R>`.
	unsafe fn read_handler(
		read_state: *mut c_void,
		buffer: *mut u8,
		size: u64,
		size_read: *mut u64,
	) -> i32 {
		const READ_SUCCESS: i32 = 1;
		const READ_FAILURE: i32 = 0;

		let read_state = read_state.cast::<ReadState<R>>();

		// `size` represents the size of an in-memory buffer, which cannot
		// possibly exceed usize::MAX.
		#[allow(clippy::cast_possible_truncation)]
		let size = size as usize;

		// Manual review shows that libyaml uses `std::alloc::alloc` to allocate
		// the provided buffer, and performs no explicit initialization of its
		// own. Because `alloc` does not necessarily initialize memory, it would
		// be instant Undefined Behavior to form a Rust slice from this buffer,
		// and even if it weren't it would be unsound to expose this buffer to a
		// safe `Read` implementation. To ensure soundness, we maintain our own
		// initialized buffer for the reader to populate, then copy that buffer
		// ourselves to libyaml.
		//
		// SAFETY: Our caller is responsible for the validity of `read_state`.
		unsafe { (*read_state).buffer.resize(size, 0) };

		// SAFETY: Our caller is responsible for the validity of `read_state`.
		match unsafe { (*read_state).reader.read(&mut (*read_state).buffer[..]) } {
			Ok(len) => {
				// SAFETY: The two buffers come from separate allocations, so
				// they cannot overlap unless the allocator is seriously broken.
				// Our caller is responsible for the validity of `read_state`.
				unsafe { ptr::copy_nonoverlapping((*read_state).buffer.as_ptr(), buffer, len) };

				// Note that libyaml's EOF condition is the same as Rust's: set
				// `size_read` to 0 and return success.
				//
				// SAFETY: libyaml is assumed to initialize the pointer correctly.
				unsafe { *size_read = len as u64 };

				// SAFETY: Our caller is responsible for the validity of `read_state`.
				unsafe { (*read_state).error = None };
				READ_SUCCESS
			}
			Err(err) => {
				// SAFETY: Our caller is responsible for the validity of `read_state`.
				unsafe { (*read_state).error = Some(err) };
				READ_FAILURE
			}
		}
	}
}

impl<R> Iterator for Chunker<R>
where
	R: Read,
{
	type Item = io::Result<Document>;

	fn next(&mut self) -> Option<Self::Item> {
		if self.stream_ended {
			return None;
		}

		loop {
			// SAFETY: `self.parser` and `self.read_state` should have been
			// properly initialized on Chunker construction.
			let event = unsafe {
				match Event::from_parser(self.parser) {
					Ok(event) => event,
					Err(err) => {
						return Some(Err((*self.read_state)
							.error
							.take()
							.unwrap_or_else(|| io::Error::new(io::ErrorKind::InvalidData, err))))
					}
				}
			};

			// Note that while we chunk the document as soon as we receive a
			// DOCUMENT_END event, we don't emit the chunk until the next
			// DOCUMENT_START or STREAM_END event. libyaml can sometimes parse
			// what looks like a valid YAML document from a non-YAML input, only
			// to error out when it looks for the start of the next document.
			// This is especially problematic when the chunker's output is used
			// to determine whether an arbitrary input is valid YAML (e.g. in
			// xt's parser-based format detection).
			match event.type_ {
				YAML_DOCUMENT_START_EVENT => {
					// SAFETY: `self.read_state` should have been properly
					// initialized on Chunker construction.
					unsafe {
						let offset = event.start_mark.index;
						(*self.read_state).reader.trim_to_offset(offset);
					}
					self.current_document_kind = None;
					if let Some(doc) = self.last_document.take() {
						return Some(Ok(doc));
					}
				}
				YAML_SCALAR_EVENT => {
					self.current_document_kind
						.get_or_insert(DocumentKind::Scalar);
				}
				YAML_SEQUENCE_START_EVENT | YAML_MAPPING_START_EVENT => {
					self.current_document_kind
						.get_or_insert(DocumentKind::Collection);
				}
				YAML_DOCUMENT_END_EVENT => {
					// SAFETY: `self.read_state` should have been properly
					// initialized on Chunker construction. libyaml validates
					// that the input is UTF-8 during parsing.
					let content = unsafe {
						let offset = event.end_mark.index;
						String::from_utf8_unchecked(
							(*self.read_state).reader.take_to_offset(offset),
						)
					};
					self.last_document = Some(Document {
						content,
						kind: self.current_document_kind.take().unwrap(),
					});
				}
				YAML_STREAM_END_EVENT => {
					self.stream_ended = true;
					return self.last_document.take().map(Ok);
				}
				_ => {}
			};
		}
	}
}

impl<R> Drop for Chunker<R>
where
	R: Read,
{
	fn drop(&mut self) {
		// SAFETY: libyaml functions are assumed to work correctly.
		unsafe { yaml_parser_delete(self.parser) };

		// SAFETY: These pointers were obtained from Boxes on Chunker construction.
		unsafe {
			drop(Box::from_raw(self.parser));
			drop(Box::from_raw(self.read_state));
		}
	}
}

/// A UTF-8 encoded YAML document.
pub(super) struct Document {
	content: String,
	kind: DocumentKind,
}

/// The type of content contained in a YAML document.
pub(super) enum DocumentKind {
	Scalar,
	Collection,
}

impl Document {
	/// Returns true if the content of the document is a scalar rather than a
	/// collection (sequence or mapping).
	pub(super) fn is_scalar(&self) -> bool {
		matches!(self.kind, DocumentKind::Scalar)
	}
}

impl Deref for Document {
	type Target = str;

	fn deref(&self) -> &Self::Target {
		&self.content
	}
}

/// A libyaml event.
struct Event(*mut yaml_event_t);

impl Event {
	/// Runs the parser and returns its next event.
	///
	/// # Safety
	///
	/// `parser` must be a valid pointer to an initialized [`yaml_parser_t`].
	unsafe fn from_parser(parser: *mut yaml_parser_t) -> Result<Event, ParserError> {
		let mut event = Box::new(MaybeUninit::<yaml_event_t>::uninit());
		// SAFETY: libyaml functions are assumed to work correctly. Our caller
		// is responsible for the validity of `parser`.
		if unsafe { yaml_parser_parse(parser, event.as_mut_ptr()) }.fail {
			// SAFETY: Our caller is responsible for the validity of `parser`.
			return Err(unsafe { ParserError::from_parser(parser) });
		}
		Ok(Event(Box::into_raw(event).cast::<yaml_event_t>()))
	}
}

impl Deref for Event {
	type Target = yaml_event_t;

	fn deref(&self) -> &Self::Target {
		// SAFETY: This is the only place where we ever convert this pointer to
		// a reference. Because we have `&self`, Rust has already verified that
		// we're following the rules.
		unsafe { &*self.0 }
	}
}

impl Drop for Event {
	fn drop(&mut self) {
		// SAFETY: libyaml functions are assumed to work correctly.
		unsafe { yaml_event_delete(self.0) };

		// SAFETY: This pointer was obtained from a Box on Event construction.
		unsafe { drop(Box::from_raw(self.0)) };
	}
}

/// A libyaml parser error including context.
#[derive(Debug)]
struct ParserError {
	problem: Option<LocatedError>,
	context: Option<LocatedError>,
}

impl ParserError {
	/// Creates an error from the current state of a parser.
	///
	/// # Safety
	///
	/// `parser` must be a valid pointer to an initialized [`yaml_parser_t`].
	unsafe fn from_parser(parser: *mut yaml_parser_t) -> Self {
		// SAFETY: Our caller is responsible for the validity of `parser`. We
		// validate that the `description` for each `LocatedError::from_parts`
		// call is not null.
		unsafe {
			Self {
				problem: (!(*parser).problem.is_null()).then(|| {
					LocatedError::from_parts(
						(*parser).problem.cast::<c_char>(),
						(*parser).problem_mark,
						Some((*parser).problem_offset),
					)
				}),
				context: (!(*parser).context.is_null()).then(|| {
					LocatedError::from_parts(
						(*parser).context.cast::<c_char>(),
						(*parser).context_mark,
						None,
					)
				}),
			}
		}
	}
}

impl Error for ParserError {}

impl Display for ParserError {
	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
		match &self.problem {
			None => f.write_str("unknown libyaml error"),
			Some(problem) => match &self.context {
				None => Display::fmt(problem, f),
				Some(context) => write!(f, "{problem}, {context}"),
			},
		}
	}
}

/// A libyaml parser error with location information.
#[derive(Debug)]
struct LocatedError {
	description: String,
	offset: u64,
	line: u64,
	column: u64,
}

impl LocatedError {
	/// Creates an error from portions of the error state in a parser.
	///
	/// If `override_offset` is not `None`, it will replace the byte index
	/// reported by `mark` when the error is displayed.
	///
	/// # Safety
	///
	/// `description` must be a valid pointer to a valid C string.
	unsafe fn from_parts(
		description: *const c_char,
		mark: yaml_mark_t,
		override_offset: Option<u64>,
	) -> Self {
		Self {
			// SAFETY: Our caller is responsible for the validity of `description`.
			description: unsafe { CStr::from_ptr(description).to_string_lossy().into_owned() },
			line: mark.line + 1,
			column: mark.column + 1,
			offset: match mark.index > 0 {
				true => mark.index,
				false => override_offset.unwrap_or(0),
			},
		}
	}
}

impl Display for LocatedError {
	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
		if self.line == 1 && self.column == 1 {
			write!(
				f,
				"{issue} at position {byte}",
				issue = self.description,
				byte = self.offset
			)
		} else {
			write!(
				f,
				"{issue} at line {line} column {column}",
				issue = self.description,
				line = self.line,
				column = self.column,
			)
		}
	}
}

/// A reader that captures bytes read from a source and provides them in chunks.
struct ChunkReader<R>
where
	R: Read,
{
	reader: R,
	captured: Vec<u8>,
	captured_start_offset: u64,
}

impl<R> ChunkReader<R>
where
	R: Read,
{
	fn new(reader: R) -> Self {
		Self {
			reader,
			captured: vec![],
			captured_start_offset: 0,
		}
	}

	/// Trims from the start of the capture buffer so the next chunk will begin
	/// at the specified reader offset.
	fn trim_to_offset(&mut self, offset: u64) {
		let trim_len = usize::try_from(offset - self.captured_start_offset).unwrap();
		self.captured_start_offset = offset;
		self.captured.drain(..trim_len);
	}

	/// Takes the chunk from the start of the capture buffer up to the specified
	/// reader offset, leaving bytes beyond the offset in the capture buffer.
	fn take_to_offset(&mut self, offset: u64) -> Vec<u8> {
		let take_len = usize::try_from(offset - self.captured_start_offset).unwrap();
		let tail = self.captured.split_off(take_len);
		self.captured_start_offset = offset;
		mem::replace(&mut self.captured, tail)
	}
}

impl<R> Read for ChunkReader<R>
where
	R: Read,
{
	fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
		// While the `read` documentation recommends against reading from `buf`,
		// it does not prevent it, and does require callers of `read` to assume
		// we might do this. As consolation, note that we only read back bytes
		// that we know were freshly written, unless of course the source is
		// broken and lies about how many bytes it read.
		let len = self.reader.read(buf)?;
		self.captured.extend_from_slice(&buf[..len]);
		Ok(len)
	}
}