#[cfg(feature = "read-initializer")]
use std::mem::MaybeUninit;
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::task::{Context, Poll};
use rmp::Marker;
use rmpv::{Integer, Value};
use byteorder::{BigEndian, ByteOrder};
use futures::future::FusedFuture;
use futures::io::ErrorKind;
use futures::io::Result as IoResult;
use futures::prelude::*;
use crate::MsgPackOption;
type DynIoResultFuture<'a, T> = dyn Future<Output = IoResult<T>> + Send + 'a;
pub(crate) trait WrapReader<R, R2> {
type Output;
fn wrap(self, wrap: impl FnOnce(R) -> R2) -> Self::Output;
}
#[must_use]
#[derive(Debug)]
pub enum ValueFuture<R> {
Nil(R),
Boolean(bool, R),
Integer(Integer, R),
F32(f32, R),
F64(f64, R),
Array(ArrayFuture<R>),
Map(MapFuture<R>),
Bin(BinFuture<R>),
String(StringFuture<R>),
Ext(ExtFuture<R>),
}
impl<R, R2> WrapReader<R, R2> for ValueFuture<R> {
type Output = ValueFuture<R2>;
fn wrap(self, wrap: impl FnOnce(R) -> R2) -> Self::Output {
match self {
ValueFuture::Nil(r) => ValueFuture::Nil(wrap(r)),
ValueFuture::Boolean(v, r) => ValueFuture::Boolean(v, wrap(r)),
ValueFuture::Integer(v, r) => ValueFuture::Integer(v, wrap(r)),
ValueFuture::F32(v, r) => ValueFuture::F32(v, wrap(r)),
ValueFuture::F64(v, r) => ValueFuture::F64(v, wrap(r)),
ValueFuture::Array(v) => ValueFuture::Array(v.wrap(wrap)),
ValueFuture::Map(v) => ValueFuture::Map(v.wrap(wrap)),
ValueFuture::Bin(v) => ValueFuture::Bin(v.wrap(wrap)),
ValueFuture::String(v) => ValueFuture::String(v.wrap(wrap)),
ValueFuture::Ext(v) => ValueFuture::Ext(v.wrap(wrap)),
}
}
}
impl<R> ValueFuture<R> {
#[must_use]
pub fn into_bool(self) -> Option<(bool, R)> {
if let ValueFuture::Boolean(val, r) = self {
Some((val, r))
} else {
None
}
}
#[must_use]
pub fn into_bin(self) -> Option<BinFuture<R>> {
if let ValueFuture::Bin(bin) = self {
Some(bin)
} else {
None
}
}
#[must_use]
pub fn into_string(self) -> Option<StringFuture<R>> {
if let ValueFuture::String(s) = self {
Some(s)
} else {
None
}
}
#[must_use]
pub fn into_ext(self) -> Option<ExtFuture<R>> {
if let ValueFuture::Ext(ext) = self {
Some(ext)
} else {
None
}
}
#[must_use]
pub fn into_array(self) -> Option<ArrayFuture<R>> {
if let ValueFuture::Array(array) = self {
Some(array)
} else {
None
}
}
#[must_use]
pub fn into_map(self) -> Option<MapFuture<R>> {
if let ValueFuture::Map(map) = self {
Some(map)
} else {
None
}
}
#[must_use]
pub fn into_u64(self) -> Option<(u64, R)> {
if let ValueFuture::Integer(val, r) = self {
val.as_u64().map(|val| (val, r))
} else {
None
}
}
#[must_use]
pub fn into_f32(self) -> Option<(f32, R)> {
if let ValueFuture::F32(val, r) = self {
Some((val, r))
} else {
None
}
}
#[must_use]
pub fn into_f64(self) -> Option<(f64, R)> {
if let ValueFuture::F64(val, r) = self {
Some((val, r))
} else {
None
}
}
pub async fn skip(self) -> IoResult<R>
where
R: AsyncRead + Send + Unpin,
{
Ok(match self {
ValueFuture::Nil(r) => r,
ValueFuture::Boolean(_b, r) => r,
ValueFuture::Integer(_i, r) => r,
ValueFuture::F32(_f, r) => r,
ValueFuture::F64(_f, r) => r,
ValueFuture::Array(a) => a.skip().await?,
ValueFuture::Map(m) => m.skip().await?,
ValueFuture::Bin(m) => m.skip().await?,
ValueFuture::String(s) => s.skip().await?,
ValueFuture::Ext(e) => e.skip().await?,
})
}
pub async fn into_value(self) -> IoResult<(Value, R)>
where
R: AsyncRead + Send + Unpin,
{
Ok(match self {
ValueFuture::Nil(r) => (Value::Nil, r),
ValueFuture::Boolean(b, r) => (Value::Boolean(b), r),
ValueFuture::Integer(i, r) => (Value::Integer(i), r),
ValueFuture::F32(f, r) => (Value::F32(f), r),
ValueFuture::F64(f, r) => (Value::F64(f), r),
ValueFuture::Array(a) => a.into_value().await?,
ValueFuture::Map(m) => m.into_value().await?,
ValueFuture::Bin(m) => m.into_value().await?,
ValueFuture::String(s) => s.into_value().await?,
ValueFuture::Ext(e) => e.into_value().await?,
})
}
}
pub(crate) struct MarkerFuture<R> {
reader: Option<R>,
}
impl<R> MarkerFuture<R> {
pub fn new(reader: R) -> Self {
MarkerFuture {
reader: Some(reader),
}
}
pub fn into_inner(self) -> R {
self.reader.unwrap()
}
}
impl<R: AsyncRead + Unpin> Future for MarkerFuture<R> {
type Output = IoResult<(Marker, R)>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut buf = [0u8];
let s = self.get_mut();
if let Some(mut reader) = s.reader.take() {
let count = match AsyncRead::poll_read(Pin::new(&mut reader), cx, &mut buf) {
Poll::Ready(count) => count?,
Poll::Pending => {
s.reader = Some(reader);
return Poll::Pending;
}
};
Poll::Ready(match count {
0 => Err(ErrorKind::UnexpectedEof.into()),
1 => Ok((Marker::from_u8(buf[0]), reader)),
_ => unreachable!(),
})
} else {
Poll::Pending
}
}
}
impl<R: AsyncRead + Unpin> FusedFuture for MarkerFuture<R> {
fn is_terminated(&self) -> bool {
self.reader.is_none()
}
}
#[must_use]
pub struct MsgPackFuture<R> {
reader: R,
}
impl<R: AsyncRead + Unpin> MsgPackFuture<R> {
pub fn new(reader: R) -> Self {
MsgPackFuture { reader }
}
pub fn into_inner(self) -> R {
self.reader
}
async fn read_1(&mut self) -> IoResult<u8> {
let mut val = [0];
self.reader.read_exact(&mut val[..]).await?;
Ok(val[0])
}
async fn read_2(&mut self) -> IoResult<[u8; 2]> {
#[cfg(feature = "read-initializer")]
#[allow(clippy::uninit_assumed_init)]
unsafe {
let mut val = MaybeUninit::<[u8; 2]>::uninit().assume_init();
self.reader.initializer().initialize(&mut val[..]);
self.reader.read_exact(&mut val[..]).await?;
Ok(val)
}
#[cfg(not(feature = "read-initializer"))]
{
let mut val = [0; 2];
self.reader.read_exact(&mut val[..]).await?;
Ok(val)
}
}
async fn read_4(&mut self) -> IoResult<[u8; 4]> {
#[cfg(feature = "read-initializer")]
#[allow(clippy::uninit_assumed_init)]
unsafe {
let mut val = MaybeUninit::<[u8; 4]>::uninit().assume_init();
self.reader.initializer().initialize(&mut val);
self.reader.read_exact(&mut val[..]).await?;
Ok(val)
}
#[cfg(not(feature = "read-initializer"))]
{
let mut val = [0; 4];
self.reader.read_exact(&mut val[..]).await?;
Ok(val)
}
}
async fn read_8(&mut self) -> IoResult<[u8; 8]> {
#[cfg(feature = "read-initializer")]
#[allow(clippy::uninit_assumed_init)]
unsafe {
let mut val = MaybeUninit::<[u8; 8]>::uninit().assume_init();
self.reader.initializer().initialize(&mut val);
self.reader.read_exact(&mut val[..]).await?;
Ok(val)
}
#[cfg(not(feature = "read-initializer"))]
{
let mut val = [0; 8];
self.reader.read_exact(&mut val[..]).await?;
Ok(val)
}
}
fn read_u8(&mut self) -> impl Future<Output = IoResult<u8>> + '_ {
self.read_1()
}
fn read_u16(&mut self) -> impl Future<Output = IoResult<u16>> + '_ {
self.read_2().map_ok(|val| BigEndian::read_u16(&val))
}
fn read_u32(&mut self) -> impl Future<Output = IoResult<u32>> + '_ {
self.read_4().map_ok(|val| BigEndian::read_u32(&val))
}
fn read_u64(&mut self) -> impl Future<Output = IoResult<u64>> + '_ {
self.read_8().map_ok(|val| BigEndian::read_u64(&val))
}
fn read_i8(&mut self) -> impl Future<Output = IoResult<i8>> + '_ {
self.read_1().map_ok(|val| val as i8)
}
fn read_i16(&mut self) -> impl Future<Output = IoResult<i16>> + '_ {
self.read_2().map_ok(|val| BigEndian::read_i16(&val))
}
fn read_i32(&mut self) -> impl Future<Output = IoResult<i32>> + '_ {
self.read_4().map_ok(|val| BigEndian::read_i32(&val))
}
fn read_i64(&mut self) -> impl Future<Output = IoResult<i64>> + '_ {
self.read_8().map_ok(|val| BigEndian::read_i64(&val))
}
fn read_f32(&mut self) -> impl Future<Output = IoResult<f32>> + '_ {
self.read_4().map_ok(|val| BigEndian::read_f32(&val))
}
fn read_f64(&mut self) -> impl Future<Output = IoResult<f64>> + '_ {
self.read_8().map_ok(|val| BigEndian::read_f64(&val))
}
pub async fn skip(self) -> IoResult<R>
where
R: Send,
{
self.decode().await?.skip().await
}
pub fn skip_dyn<'a>(self) -> Pin<Box<DynIoResultFuture<'a, R>>>
where
R: Send + 'a,
{
self.skip().boxed()
}
pub async fn decode(mut self) -> IoResult<ValueFuture<R>> {
let mf = MarkerFuture::new(self.reader);
let (marker, reader) = mf.await?;
self.reader = reader;
self.decode_after_marker(marker).await
}
pub(crate) async fn decode_after_marker(mut self, marker: Marker) -> IoResult<ValueFuture<R>> {
Ok(match marker {
Marker::FixPos(val) => ValueFuture::Integer(Integer::from(val), self.reader),
Marker::FixNeg(val) => ValueFuture::Integer(Integer::from(val), self.reader),
Marker::Null => ValueFuture::Nil(self.reader),
Marker::True => ValueFuture::Boolean(true, self.reader),
Marker::False => ValueFuture::Boolean(false, self.reader),
Marker::U8 => ValueFuture::Integer(Integer::from(self.read_u8().await?), self.reader),
Marker::U16 => ValueFuture::Integer(Integer::from(self.read_u16().await?), self.reader),
Marker::U32 => ValueFuture::Integer(Integer::from(self.read_u32().await?), self.reader),
Marker::U64 => ValueFuture::Integer(Integer::from(self.read_u64().await?), self.reader),
Marker::I8 => ValueFuture::Integer(Integer::from(self.read_i8().await?), self.reader),
Marker::I16 => ValueFuture::Integer(Integer::from(self.read_i16().await?), self.reader),
Marker::I32 => ValueFuture::Integer(Integer::from(self.read_i32().await?), self.reader),
Marker::I64 => ValueFuture::Integer(Integer::from(self.read_i64().await?), self.reader),
Marker::F32 => ValueFuture::F32(self.read_f32().await?, self.reader),
Marker::F64 => ValueFuture::F64(self.read_f64().await?, self.reader),
Marker::FixStr(len) => ValueFuture::String(StringFuture(BinFuture {
reader: self.reader,
len: len.into(),
})),
Marker::Str8 => {
let len = self.read_u8().await?;
ValueFuture::String(StringFuture(BinFuture {
reader: self.reader,
len: len.into(),
}))
}
Marker::Str16 => {
let len = self.read_u16().await?;
ValueFuture::String(StringFuture(BinFuture {
reader: self.reader,
len: len.into(),
}))
}
Marker::Str32 => {
let len = self.read_u32().await?;
ValueFuture::String(StringFuture(BinFuture {
reader: self.reader,
len: len as usize,
}))
}
Marker::Bin8 => {
let len = self.read_u8().await?;
ValueFuture::Bin(BinFuture {
reader: self.reader,
len: len.into(),
})
}
Marker::Bin16 => {
let len = self.read_u16().await?.into();
ValueFuture::Bin(BinFuture {
reader: self.reader,
len,
})
}
Marker::Bin32 => {
let len = self.read_u32().await? as usize;
ValueFuture::Bin(BinFuture {
reader: self.reader,
len,
})
}
Marker::FixArray(len) => ValueFuture::Array(ArrayFuture {
reader: self.reader,
len: len.into(),
}),
Marker::Array16 => {
let len = self.read_u16().await?;
ValueFuture::Array(ArrayFuture {
reader: self.reader,
len: len.into(),
})
}
Marker::Array32 => {
let len = self.read_u32().await?;
ValueFuture::Array(ArrayFuture {
reader: self.reader,
len: len as usize,
})
}
Marker::FixMap(len) => ValueFuture::Map(MapFuture {
reader: self.reader,
len: len.into(),
}),
Marker::Map16 => {
let len = self.read_u16().await?;
ValueFuture::Map(MapFuture {
reader: self.reader,
len: len.into(),
})
}
Marker::Map32 => {
let len = self.read_u32().await?;
ValueFuture::Map(MapFuture {
reader: self.reader,
len: len as usize,
})
}
Marker::FixExt1 => {
let ty = self.read_i8().await?;
ValueFuture::Ext(ExtFuture {
bin: BinFuture {
reader: self.reader,
len: 1,
},
ty,
})
}
Marker::FixExt2 => {
let ty = self.read_i8().await?;
ValueFuture::Ext(ExtFuture {
bin: BinFuture {
reader: self.reader,
len: 2,
},
ty,
})
}
Marker::FixExt4 => {
let ty = self.read_i8().await?;
ValueFuture::Ext(ExtFuture {
bin: BinFuture {
reader: self.reader,
len: 4,
},
ty,
})
}
Marker::FixExt8 => {
let ty = self.read_i8().await?;
ValueFuture::Ext(ExtFuture {
bin: BinFuture {
reader: self.reader,
len: 8,
},
ty,
})
}
Marker::FixExt16 => {
let ty = self.read_i8().await?;
ValueFuture::Ext(ExtFuture {
bin: BinFuture {
reader: self.reader,
len: 16,
},
ty,
})
}
Marker::Ext8 => {
let len = self.read_u8().await?;
let ty = self.read_i8().await?;
ValueFuture::Ext(ExtFuture {
bin: BinFuture {
reader: self.reader,
len: len.into(),
},
ty,
})
}
Marker::Ext16 => {
let len = self.read_u16().await?;
let ty = self.read_i8().await?;
ValueFuture::Ext(ExtFuture {
bin: BinFuture {
reader: self.reader,
len: len.into(),
},
ty,
})
}
Marker::Ext32 => {
let len = self.read_u32().await?;
let ty = self.read_i8().await?;
ValueFuture::Ext(ExtFuture {
bin: BinFuture {
reader: self.reader,
len: len as usize,
},
ty,
})
}
Marker::Reserved => return Err(ErrorKind::InvalidData.into()),
})
}
pub async fn into_value(self) -> IoResult<(Value, R)>
where
R: Send,
{
self.decode().await?.into_value().await
}
pub fn into_value_dyn<'a>(self) -> Pin<Box<DynIoResultFuture<'a, (Value, R)>>>
where
R: Send + 'a,
{
self.into_value().boxed()
}
}
impl<R: AsyncRead + Unpin> AsyncRead for MsgPackFuture<R> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context,
buf: &mut [u8],
) -> Poll<IoResult<usize>> {
R::poll_read(Pin::new(&mut self.as_mut().reader), cx, buf)
}
}
#[derive(Debug)]
pub struct ArrayFuture<R> {
reader: R,
len: usize,
}
impl<R: AsyncRead + Unpin> AsyncRead for ArrayFuture<R> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context,
buf: &mut [u8],
) -> Poll<IoResult<usize>> {
R::poll_read(Pin::new(&mut self.as_mut().reader), cx, buf)
}
}
impl<R, R2> WrapReader<R, R2> for ArrayFuture<R> {
type Output = ArrayFuture<R2>;
fn wrap(self, wrap: impl FnOnce(R) -> R2) -> Self::Output {
ArrayFuture {
reader: wrap(self.reader),
len: self.len,
}
}
}
impl<R: AsyncRead + Unpin> ArrayFuture<R> {
pub fn len(&self) -> usize {
self.len
}
pub fn is_empty(&self) -> bool {
self.len == 0
}
pub fn next(mut self) -> MsgPackOption<MsgPackFuture<Self>, R> {
if self.len > 0 {
self.len -= 1;
MsgPackOption::Some(MsgPackFuture::new(self))
} else {
MsgPackOption::End(self.reader)
}
}
#[must_use]
pub fn next_dyn(&mut self) -> Option<MsgPackFuture<&mut (dyn AsyncRead + Unpin + Send)>>
where
R: Send,
{
if self.len > 0 {
self.len -= 1;
Some(MsgPackFuture::new(&mut self.reader))
} else {
None
}
}
pub fn last(self) -> MsgPackOption<MsgPackFuture<R>, R> {
if self.len == 1 {
MsgPackOption::Some(MsgPackFuture::new(self.reader))
} else {
MsgPackOption::End(self.reader)
}
}
pub async fn skip(mut self) -> IoResult<R>
where
R: Send,
{
if self.is_empty() {
return Ok(self.reader);
}
while let Some(m) = self.next_dyn() {
m.skip_dyn().await?;
}
Ok(self.reader)
}
pub async fn into_value_vec(mut self) -> IoResult<(Vec<Value>, R)>
where
R: Send,
{
let mut v = Vec::with_capacity(self.len());
while let Some(m) = self.next_dyn() {
let (value, _) = m.into_value_dyn().await?;
v.push(value);
}
Ok((v, self.reader))
}
pub fn into_value(self) -> impl Future<Output = IoResult<(Value, R)>>
where
R: Send,
{
self.into_value_vec().map_ok(|(v, r)| (Value::Array(v), r))
}
}
pub struct FinalizedArray<R>(ArrayFuture<R>);
impl<R: AsyncRead + Unpin + 'static> FinalizedArray<R> {
pub async fn finish(self) -> IoResult<R>
where
R: Send,
{
self.0.skip().await
}
}
impl<R: AsyncRead + Unpin> AsyncRead for FinalizedArray<R> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context,
buf: &mut [u8],
) -> Poll<IoResult<usize>> {
ArrayFuture::poll_read(Pin::new(&mut self.as_mut().0), cx, buf)
}
}
#[derive(Debug)]
pub struct MapFuture<R> {
reader: R,
len: usize,
}
impl<R: AsyncRead + Unpin> AsyncRead for MapFuture<R> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context,
buf: &mut [u8],
) -> Poll<IoResult<usize>> {
R::poll_read(Pin::new(&mut self.as_mut().reader), cx, buf)
}
}
impl<R, R2> WrapReader<R, R2> for MapFuture<R> {
type Output = MapFuture<R2>;
fn wrap(self, wrap: impl FnOnce(R) -> R2) -> Self::Output {
MapFuture {
reader: wrap(self.reader),
len: self.len,
}
}
}
impl<R: AsyncRead + Unpin> MapFuture<R> {
pub fn len(&self) -> usize {
self.len
}
pub fn is_empty(&self) -> bool {
self.len == 0
}
pub fn next_key(mut self) -> MsgPackOption<MsgPackFuture<MsgPackFuture<Self>>, R> {
if self.len > 0 {
self.len -= 1;
MsgPackOption::Some(MsgPackFuture::new(MsgPackFuture::new(self)))
} else {
MsgPackOption::End(self.reader)
}
}
#[must_use]
pub fn next_key_dyn(
&mut self,
) -> Option<MsgPackFuture<MsgPackFuture<&mut (dyn AsyncRead + Unpin + Send)>>>
where
R: Send,
{
if self.len > 0 {
self.len -= 1;
Some(MsgPackFuture::new(MsgPackFuture::new(&mut self.reader)))
} else {
None
}
}
pub async fn skip(mut self) -> IoResult<R>
where
R: Send,
{
if self.is_empty() {
return Ok(self.reader);
}
while let Some(m) = self.next_key_dyn() {
let val = m.skip_dyn().await?;
val.skip_dyn().await?;
}
Ok(self.reader)
}
pub async fn into_value(mut self) -> IoResult<(Value, R)>
where
R: Send,
{
let mut v = Vec::with_capacity(self.len());
while let Some(m) = self.next_key_dyn() {
let (key, val) = m.into_value_dyn().await?;
let (val, _next) = val.into_value_dyn().await?;
v.push((key, val));
}
Ok((Value::Map(v), self.reader))
}
}
async fn reader_skip<R: AsyncRead + Unpin>(reader: &mut R, mut n: usize) -> IoResult<()> {
let mut buf = [0; 32];
while n > 0 {
let to_read = std::cmp::min(n, buf.len());
reader.read_exact(&mut buf[..to_read]).await?;
n -= to_read;
}
Ok(())
}
#[derive(Debug)]
pub struct BinFuture<R> {
reader: R,
len: usize,
}
impl<R, R2> WrapReader<R, R2> for BinFuture<R> {
type Output = BinFuture<R2>;
fn wrap(self, wrap: impl FnOnce(R) -> R2) -> Self::Output {
BinFuture {
reader: wrap(self.reader),
len: self.len,
}
}
}
impl<R: AsyncRead + Unpin> BinFuture<R> {
pub fn len(&self) -> usize {
self.len
}
pub fn is_empty(&self) -> bool {
self.len == 0
}
pub async fn read_exact(&mut self, buf: &mut [u8]) -> IoResult<()> {
assert!(buf.len() <= self.len);
self.reader.read_exact(buf).await.map(|_| {
self.len -= buf.len();
})
}
pub async fn read_all(mut self, buf: &mut [u8]) -> IoResult<R> {
self.read_exact(buf).await.map(|_| self.end())
}
pub fn end(self) -> R {
assert_eq!(self.len, 0);
self.reader
}
pub async fn into_vec(mut self) -> IoResult<(Vec<u8>, R)> {
let mut vec = vec![0u8; self.len];
self.reader
.read_exact(&mut vec[..])
.await
.map(|_| (vec, self.reader))
}
pub async fn skip(mut self) -> IoResult<R> {
reader_skip(&mut self.reader, self.len).await?;
Ok(self.reader)
}
pub async fn into_value(self) -> IoResult<(Value, R)> {
self.into_vec().await.map(|(v, r)| (Value::Binary(v), r))
}
}
#[derive(Debug)]
pub struct StringFuture<R>(BinFuture<R>);
impl<R, R2> WrapReader<R, R2> for StringFuture<R> {
type Output = StringFuture<R2>;
fn wrap(self, wrap: impl FnOnce(R) -> R2) -> Self::Output {
StringFuture(self.0.wrap(wrap))
}
}
impl<R: AsyncRead + Unpin> StringFuture<R> {
pub async fn into_string(self) -> IoResult<(String, R)> {
self.0.into_vec().await.and_then(|(v, r)| {
String::from_utf8(v)
.map_err(|_| ErrorKind::InvalidData.into())
.map(|s| (s, r))
})
}
pub async fn read_exact(&mut self, buf: &mut [u8]) -> IoResult<()> {
self.0.read_exact(buf).await
}
pub async fn read_all(self, buf: &mut [u8]) -> IoResult<R> {
self.0.read_all(buf).await
}
pub async fn skip(self) -> IoResult<R> {
self.0.skip().await
}
pub async fn into_value(self) -> IoResult<(Value, R)> {
self.into_string()
.await
.map(|(s, r)| (Value::String(s.into()), r))
}
}
impl<R> Deref for StringFuture<R> {
type Target = BinFuture<R>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<R> DerefMut for StringFuture<R> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
#[derive(Debug)]
pub struct ExtFuture<R> {
bin: BinFuture<R>,
ty: i8,
}
impl<R, R2> WrapReader<R, R2> for ExtFuture<R> {
type Output = ExtFuture<R2>;
fn wrap(self, wrap: impl FnOnce(R) -> R2) -> Self::Output {
ExtFuture {
bin: self.bin.wrap(wrap),
ty: self.ty,
}
}
}
impl<R: AsyncRead + Unpin> ExtFuture<R> {
pub fn ext_type(&self) -> i8 {
self.ty
}
pub async fn into_vec(self) -> IoResult<(i8, Vec<u8>, R)> {
let ty = self.ty;
self.bin.into_vec().await.map(|(v, r)| (ty, v, r))
}
pub async fn read_exact(&mut self, buf: &mut [u8]) -> IoResult<()> {
self.bin.read_exact(buf).await
}
pub async fn read_all(self, buf: &mut [u8]) -> IoResult<R> {
self.bin.read_all(buf).await
}
pub async fn skip(self) -> IoResult<R> {
self.bin.skip().await
}
pub async fn into_value(self) -> IoResult<(Value, R)> {
self.into_vec()
.await
.map(|(ty, v, r)| (Value::Ext(ty, v), r))
}
}
impl<R> Deref for ExtFuture<R> {
type Target = BinFuture<R>;
fn deref(&self) -> &Self::Target {
&self.bin
}
}
impl<R> DerefMut for ExtFuture<R> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.bin
}
}
#[cfg(test)]
mod test {
use super::*;
use futures::io::Cursor;
use rmpv::Value;
fn value_to_vec(val: &Value) -> Cursor<Vec<u8>> {
let mut buf = Vec::new();
rmpv::encode::write_value(&mut buf, &val).unwrap();
Cursor::new(buf)
}
#[test]
fn bool() {
async fn bool_test(buf: Cursor<Vec<u8>>) -> IoResult<bool> {
let msg = MsgPackFuture::new(buf);
let (val, _r) = msg.decode().await?.into_bool().unwrap();
Ok(val)
}
let val = true.into();
let cursor = value_to_vec(&val);
let out = futures::executor::LocalPool::new()
.run_until(bool_test(cursor.clone()))
.unwrap();
assert_eq!(out, true);
let (out, _r) = futures::executor::LocalPool::new()
.run_until(MsgPackFuture::new(cursor).into_value())
.unwrap();
assert_eq!(out, val);
}
#[test]
#[allow(clippy::float_cmp)] fn f32() {
async fn f32_test(buf: Cursor<Vec<u8>>) -> IoResult<f32> {
let msg = MsgPackFuture::new(buf);
let (val, _r) = msg.decode().await?.into_f32().unwrap();
Ok(val)
}
let val = 25.5f32.into();
let cursor = value_to_vec(&val);
let out = futures::executor::LocalPool::new()
.run_until(f32_test(cursor.clone()))
.unwrap();
assert_eq!(out, 25.5);
let (out, _r) = futures::executor::LocalPool::new()
.run_until(MsgPackFuture::new(cursor).into_value())
.unwrap();
assert_eq!(out, val);
}
#[test]
#[allow(clippy::float_cmp)] fn f64() {
async fn f64_test(buf: Cursor<Vec<u8>>) -> IoResult<f64> {
let msg = MsgPackFuture::new(buf);
let (val, _r) = msg.decode().await?.into_f64().unwrap();
Ok(val)
}
let val = 25.5f64.into();
let cursor = value_to_vec(&val);
let out = futures::executor::LocalPool::new()
.run_until(f64_test(cursor.clone()))
.unwrap();
assert_eq!(out, 25.5);
let (out, _r) = futures::executor::LocalPool::new()
.run_until(MsgPackFuture::new(cursor).into_value())
.unwrap();
assert_eq!(out, val);
}
#[test]
fn bin() {
async fn bin_test(buf: Cursor<Vec<u8>>) -> IoResult<Vec<u8>> {
let msg = MsgPackFuture::new(buf);
let (val, _r) = msg.decode().await?.into_bin().unwrap().into_vec().await?;
Ok(val)
}
let val = vec![1u8, 2, 3, 4].into();
let cursor = value_to_vec(&val);
let out = futures::executor::LocalPool::new()
.run_until(bin_test(cursor.clone()))
.unwrap();
assert_eq!(out, vec![1u8, 2, 3, 4]);
let (out, _r) = futures::executor::LocalPool::new()
.run_until(MsgPackFuture::new(cursor).into_value())
.unwrap();
assert_eq!(out, val);
}
#[test]
fn string() {
async fn string_test(buf: Cursor<Vec<u8>>) -> IoResult<String> {
let msg = MsgPackFuture::new(buf);
let (val, _r) = msg
.decode()
.await?
.into_string()
.unwrap()
.into_string()
.await?;
Ok(val)
}
let val = "Hello world".into();
let cursor = value_to_vec(&val);
let out = futures::executor::LocalPool::new()
.run_until(string_test(cursor.clone()))
.unwrap();
assert_eq!(out, "Hello world");
let (out, _r) = futures::executor::LocalPool::new()
.run_until(MsgPackFuture::new(cursor).into_value())
.unwrap();
assert_eq!(out, val);
}
#[test]
fn array_int() {
async fn array_test(buf: Cursor<Vec<u8>>) -> IoResult<Vec<u64>> {
let msg = MsgPackFuture::new(buf);
let mut vec = Vec::new();
let mut array = msg.decode().await?.into_array().unwrap();
while let MsgPackOption::Some(elem) = array.next() {
let (elem, a) = elem.decode().await?.into_u64().unwrap();
vec.push(elem);
array = a;
}
Ok(vec)
}
let val = Value::from(vec![Value::from(1), 2.into(), 3.into(), 4.into()]);
let cursor = value_to_vec(&val);
let out = futures::executor::LocalPool::new()
.run_until(array_test(cursor.clone()))
.unwrap();
assert_eq!(out, vec![1, 2, 3, 4]);
let (out, _r) = futures::executor::LocalPool::new()
.run_until(MsgPackFuture::new(cursor).into_value())
.unwrap();
assert_eq!(out, val);
}
#[test]
fn map_int() {
async fn map_test(buf: Cursor<Vec<u8>>) -> IoResult<Vec<(u64, u64)>> {
let msg = MsgPackFuture::new(buf);
let mut vec = Vec::new();
let mut map = msg.decode().await?.into_map().unwrap();
while let MsgPackOption::Some(elem) = map.next_key() {
let (key, r) = elem.decode().await?.into_u64().unwrap();
let (val, m) = r.decode().await?.into_u64().unwrap();
vec.push((key, val));
map = m;
}
Ok(vec)
}
let val = Value::from(vec![
(Value::from(1), Value::from(2)),
(Value::from(3), Value::from(4)),
]);
let cursor = value_to_vec(&val);
let out = futures::executor::LocalPool::new()
.run_until(map_test(cursor.clone()))
.unwrap();
assert_eq!(out, vec![(1, 2), (3, 4)]);
let (out, _r) = futures::executor::LocalPool::new()
.run_until(MsgPackFuture::new(cursor).into_value())
.unwrap();
assert_eq!(out, val);
}
#[test]
fn ext() {
async fn ext_test(buf: Cursor<Vec<u8>>) -> IoResult<(i8, Vec<u8>)> {
let msg = MsgPackFuture::new(buf);
let (ty, val, _r) = msg.decode().await?.into_ext().unwrap().into_vec().await?;
Ok((ty, val))
}
let val = Value::Ext(42, vec![1u8, 2, 3, 4]);
let cursor = value_to_vec(&val);
let out = futures::executor::LocalPool::new()
.run_until(ext_test(cursor.clone()))
.unwrap();
assert_eq!(out, (42, vec![1, 2, 3, 4]));
let (out, _r) = futures::executor::LocalPool::new()
.run_until(MsgPackFuture::new(cursor).into_value())
.unwrap();
assert_eq!(out, val);
}
}