use alloc::vec::Vec;
use core::future::Future;
use serde::{de, ser};
use crate::core::{f16_to_f64, Header};
use crate::de::{Error, DEFAULT_RECURSION_LIMIT};
const CHUNK: usize = 4096;
pub trait AsyncRead {
fn read_exact(
&mut self,
buf: &mut [u8],
) -> impl Future<Output = Result<(), crate::io::Error>> + Send;
}
pub trait AsyncWrite {
fn write_all(
&mut self,
data: &[u8],
) -> impl Future<Output = Result<(), crate::io::Error>> + Send;
fn flush(&mut self) -> impl Future<Output = Result<(), crate::io::Error>> + Send;
}
impl<T: AsyncRead + Send + ?Sized> AsyncRead for &mut T {
#[inline]
async fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), crate::io::Error> {
(**self).read_exact(buf).await
}
}
impl<T: AsyncWrite + Send + ?Sized> AsyncWrite for &mut T {
#[inline]
async fn write_all(&mut self, data: &[u8]) -> Result<(), crate::io::Error> {
(**self).write_all(data).await
}
#[inline]
async fn flush(&mut self) -> Result<(), crate::io::Error> {
(**self).flush().await
}
}
#[cfg(feature = "futures")]
pub mod futures {
use core::{future::poll_fn, pin::Pin};
use serde::{de, ser};
use super::{AsyncRead, AsyncWrite};
use crate::de::Error;
struct Reader<'a, R: ?Sized>(&'a mut R);
impl<R> AsyncRead for Reader<'_, R>
where
R: futures_io::AsyncRead + Unpin + Send + ?Sized,
{
async fn read_exact(&mut self, mut buf: &mut [u8]) -> Result<(), crate::io::Error> {
while !buf.is_empty() {
let n = poll_fn(|cx| Pin::new(&mut *self.0).poll_read(cx, buf)).await?;
if n == 0 {
return Err(crate::io::ErrorKind::UnexpectedEof.into());
}
let rest = core::mem::take(&mut buf);
let (_, rest) = rest.split_at_mut(n);
buf = rest;
}
Ok(())
}
}
struct Writer<'a, W: ?Sized>(&'a mut W);
impl<W> AsyncWrite for Writer<'_, W>
where
W: futures_io::AsyncWrite + Unpin + Send + ?Sized,
{
async fn write_all(&mut self, mut data: &[u8]) -> Result<(), crate::io::Error> {
while !data.is_empty() {
let n = poll_fn(|cx| Pin::new(&mut *self.0).poll_write(cx, data)).await?;
if n == 0 {
return Err(crate::io::ErrorKind::WriteZero.into());
}
data = &data[n..];
}
Ok(())
}
async fn flush(&mut self) -> Result<(), crate::io::Error> {
poll_fn(|cx| Pin::new(&mut *self.0).poll_flush(cx)).await
}
}
pub async fn read_item<R>(reader: &mut R) -> Result<alloc::vec::Vec<u8>, Error>
where
R: futures_io::AsyncRead + Unpin + Send + ?Sized,
{
let mut reader = Reader(reader);
super::read_item(&mut reader).await
}
pub async fn read_value<T, R>(reader: &mut R) -> Result<T, Error>
where
T: de::DeserializeOwned,
R: futures_io::AsyncRead + Unpin + Send + ?Sized,
{
let mut reader = Reader(reader);
super::read_value(&mut reader).await
}
pub async fn write_item<W>(writer: &mut W, item: &[u8]) -> Result<(), Error>
where
W: futures_io::AsyncWrite + Unpin + Send + ?Sized,
{
let mut writer = Writer(writer);
super::write_item(&mut writer, item).await
}
pub async fn write_value<T, W>(writer: &mut W, value: &T) -> Result<(), crate::ser::Error>
where
T: ?Sized + ser::Serialize,
W: futures_io::AsyncWrite + Unpin + Send + ?Sized,
{
let mut writer = Writer(writer);
super::write_value(&mut writer, value).await
}
}
#[cfg(feature = "tokio")]
pub mod tokio {
use serde::{de, ser};
use super::{AsyncRead, AsyncWrite};
use crate::de::Error;
struct Reader<'a, R: ?Sized>(&'a mut R);
impl<R> AsyncRead for Reader<'_, R>
where
R: ::tokio::io::AsyncRead + Unpin + Send + ?Sized,
{
async fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), crate::io::Error> {
use ::tokio::io::AsyncReadExt as _;
self.0.read_exact(buf).await.map(|_| ())
}
}
struct Writer<'a, W: ?Sized>(&'a mut W);
impl<W> AsyncWrite for Writer<'_, W>
where
W: ::tokio::io::AsyncWrite + Unpin + Send + ?Sized,
{
async fn write_all(&mut self, data: &[u8]) -> Result<(), crate::io::Error> {
use ::tokio::io::AsyncWriteExt as _;
self.0.write_all(data).await
}
async fn flush(&mut self) -> Result<(), crate::io::Error> {
use ::tokio::io::AsyncWriteExt as _;
self.0.flush().await
}
}
pub async fn read_item<R>(reader: &mut R) -> Result<alloc::vec::Vec<u8>, Error>
where
R: ::tokio::io::AsyncRead + Unpin + Send + ?Sized,
{
let mut reader = Reader(reader);
super::read_item(&mut reader).await
}
pub async fn read_value<T, R>(reader: &mut R) -> Result<T, Error>
where
T: de::DeserializeOwned,
R: ::tokio::io::AsyncRead + Unpin + Send + ?Sized,
{
let mut reader = Reader(reader);
super::read_value(&mut reader).await
}
pub async fn write_item<W>(writer: &mut W, item: &[u8]) -> Result<(), Error>
where
W: ::tokio::io::AsyncWrite + Unpin + Send + ?Sized,
{
let mut writer = Writer(writer);
super::write_item(&mut writer, item).await
}
pub async fn write_value<T, W>(writer: &mut W, value: &T) -> Result<(), crate::ser::Error>
where
T: ?Sized + ser::Serialize,
W: ::tokio::io::AsyncWrite + Unpin + Send + ?Sized,
{
let mut writer = Writer(writer);
super::write_value(&mut writer, value).await
}
}
pub async fn read_item<R: AsyncRead + ?Sized>(reader: &mut R) -> Result<Vec<u8>, Error> {
let mut out = Vec::new();
let mut offset = 0;
read_item_inner(reader, &mut out, &mut offset, DEFAULT_RECURSION_LIMIT).await?;
Ok(out)
}
pub async fn read_value<T, R>(reader: &mut R) -> Result<T, Error>
where
T: de::DeserializeOwned,
R: AsyncRead + ?Sized,
{
let item = read_item(reader).await?;
crate::from_slice(&item)
}
pub async fn write_item<W: AsyncWrite + ?Sized>(writer: &mut W, item: &[u8]) -> Result<(), Error> {
crate::validate(item)?;
writer.write_all(item).await.map_err(Error::Io)?;
writer.flush().await.map_err(Error::Io)
}
pub async fn write_value<T, W>(writer: &mut W, value: &T) -> Result<(), crate::ser::Error>
where
T: ?Sized + ser::Serialize,
W: AsyncWrite + ?Sized,
{
let item = crate::to_vec(value)?;
writer
.write_all(&item)
.await
.map_err(crate::ser::Error::Io)?;
writer.flush().await.map_err(crate::ser::Error::Io)
}
#[derive(Copy, Clone)]
enum Arg {
This(u8),
Next1(u8),
Next2(u16),
Next4(u32),
Next8(u64),
Indefinite,
}
fn int_arg(arg: Arg) -> Option<u64> {
match arg {
Arg::This(x) => Some(x as u64),
Arg::Next1(x) => Some(x as u64),
Arg::Next2(x) => Some(x as u64),
Arg::Next4(x) => Some(x as u64),
Arg::Next8(x) => Some(x),
Arg::Indefinite => None,
}
}
#[cfg(target_pointer_width = "64")]
fn len_arg(arg: Arg, _start: usize) -> Result<Option<usize>, Error> {
Ok(int_arg(arg).map(|x| x as usize))
}
#[cfg(not(target_pointer_width = "64"))]
fn len_arg(arg: Arg, start: usize) -> Result<Option<usize>, Error> {
match int_arg(arg) {
Some(x) => usize::try_from(x)
.map(Some)
.map_err(|_| Error::Syntax(start)),
None => Ok(None),
}
}
async fn read_exact_record<R: AsyncRead + ?Sized>(
reader: &mut R,
out: &mut Vec<u8>,
offset: &mut usize,
buf: &mut [u8],
) -> Result<(), Error> {
reader.read_exact(buf).await.map_err(Error::Io)?;
*offset += buf.len();
out.extend_from_slice(buf);
Ok(())
}
async fn pull_header<R: AsyncRead + ?Sized>(
reader: &mut R,
out: &mut Vec<u8>,
offset: &mut usize,
) -> Result<(Header, usize), Error> {
let start = *offset;
let mut prefix = [0u8; 1];
read_exact_record(reader, out, offset, &mut prefix).await?;
let major = prefix[0] >> 5;
let minor = prefix[0] & 0b00011111;
let arg = match minor {
x @ 0..=23 => Arg::This(x),
24 => {
let mut b = [0u8; 1];
read_exact_record(reader, out, offset, &mut b).await?;
Arg::Next1(b[0])
}
25 => {
let mut b = [0u8; 2];
read_exact_record(reader, out, offset, &mut b).await?;
Arg::Next2(u16::from_be_bytes(b))
}
26 => {
let mut b = [0u8; 4];
read_exact_record(reader, out, offset, &mut b).await?;
Arg::Next4(u32::from_be_bytes(b))
}
27 => {
let mut b = [0u8; 8];
read_exact_record(reader, out, offset, &mut b).await?;
Arg::Next8(u64::from_be_bytes(b))
}
31 => Arg::Indefinite,
_ => return Err(Error::Syntax(start)),
};
let header = match major {
0 => Header::Positive(int_arg(arg).ok_or(Error::Syntax(start))?),
1 => Header::Negative(int_arg(arg).ok_or(Error::Syntax(start))?),
2 => Header::Bytes(len_arg(arg, start)?),
3 => Header::Text(len_arg(arg, start)?),
4 => Header::Array(len_arg(arg, start)?),
5 => Header::Map(len_arg(arg, start)?),
6 => Header::Tag(int_arg(arg).ok_or(Error::Syntax(start))?),
_ => match arg {
Arg::This(x) => Header::Simple(x),
Arg::Next1(x) if x >= 32 => Header::Simple(x),
Arg::Next1(..) => return Err(Error::Syntax(start)),
Arg::Next2(x) => Header::Float(f16_to_f64(x)),
Arg::Next4(x) => Header::Float(f32::from_bits(x) as f64),
Arg::Next8(x) => Header::Float(f64::from_bits(x)),
Arg::Indefinite => Header::Break,
},
};
Ok((header, start))
}
async fn read_body<R: AsyncRead + ?Sized>(
reader: &mut R,
out: &mut Vec<u8>,
offset: &mut usize,
mut remaining: usize,
) -> Result<(), Error> {
let mut buffer = [0u8; CHUNK];
while remaining > 0 {
let n = remaining.min(buffer.len());
reader
.read_exact(&mut buffer[..n])
.await
.map_err(Error::Io)?;
*offset += n;
out.extend_from_slice(&buffer[..n]);
remaining -= n;
}
Ok(())
}
async fn read_text_body<R: AsyncRead + ?Sized>(
reader: &mut R,
out: &mut Vec<u8>,
offset: &mut usize,
len: usize,
) -> Result<(), Error> {
let body_offset = *offset;
let start = out.len();
read_body(reader, out, offset, len).await?;
core::str::from_utf8(&out[start..]).map_err(|_| Error::Syntax(body_offset))?;
Ok(())
}
enum Frame {
Array(usize),
Map { pairs: usize, value: bool },
IndefArray,
IndefMap { value: bool },
}
async fn read_item_inner<R: AsyncRead + ?Sized>(
reader: &mut R,
out: &mut Vec<u8>,
offset: &mut usize,
limit: usize,
) -> Result<(), Error> {
let mut stack = Vec::with_capacity(8);
stack.push(Frame::Array(1));
while !stack.is_empty() {
match stack.last().expect("non-empty") {
Frame::Array(0)
| Frame::Map {
pairs: 0,
value: false,
} => {
stack.pop();
continue;
}
_ => {}
}
let (header, start) = pull_header(reader, out, offset).await?;
if header == Header::Break {
match stack.last().expect("non-empty") {
Frame::IndefArray | Frame::IndefMap { value: false } => {
stack.pop();
continue;
}
_ => return Err(Error::Syntax(start)),
}
}
match stack.last_mut().expect("non-empty") {
Frame::Array(n) => *n -= 1,
Frame::Map { pairs, value } => {
if *value {
*pairs -= 1;
}
*value = !*value;
}
Frame::IndefArray => {}
Frame::IndefMap { value } => *value = !*value,
}
match header {
Header::Positive(..)
| Header::Negative(..)
| Header::Float(..)
| Header::Simple(..) => {}
Header::Break => unreachable!("handled above"),
Header::Bytes(Some(len)) => read_body(reader, out, offset, len).await?,
Header::Text(Some(len)) => read_text_body(reader, out, offset, len).await?,
Header::Bytes(None) => read_indef_string(reader, out, offset, false).await?,
Header::Text(None) => read_indef_string(reader, out, offset, true).await?,
Header::Tag(..) => push(&mut stack, Frame::Array(1), limit)?,
Header::Array(Some(len)) => push(&mut stack, Frame::Array(len), limit)?,
Header::Array(None) => push(&mut stack, Frame::IndefArray, limit)?,
Header::Map(Some(pairs)) => push(
&mut stack,
Frame::Map {
pairs,
value: false,
},
limit,
)?,
Header::Map(None) => push(&mut stack, Frame::IndefMap { value: false }, limit)?,
}
}
Ok(())
}
fn push(stack: &mut Vec<Frame>, frame: Frame, limit: usize) -> Result<(), Error> {
if stack.len() >= limit {
return Err(Error::RecursionLimitExceeded);
}
stack.push(frame);
Ok(())
}
async fn read_indef_string<R: AsyncRead + ?Sized>(
reader: &mut R,
out: &mut Vec<u8>,
offset: &mut usize,
text: bool,
) -> Result<(), Error> {
loop {
let (header, start) = pull_header(reader, out, offset).await?;
match header {
Header::Break => return Ok(()),
Header::Text(Some(len)) if text => read_text_body(reader, out, offset, len).await?,
Header::Bytes(Some(len)) if !text => read_body(reader, out, offset, len).await?,
_ => return Err(Error::Syntax(start)),
}
}
}