wrpc_pack/lib.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
//! Stopgap solution for packing and unpacking wRPC values to/from singular, flat byte buffers.
//!
//! All APIs in this crate are to be considered unstable and everything may break arbitrarily.
//!
//! This crate will never reach 1.0 and will be deprecated once <https://github.com/bytecodealliance/wrpc/issues/25> is complete
//!
//! This crate is maintained on a best-effort basis.
use core::pin::Pin;
use core::task::{Context, Poll};
use bytes::BytesMut;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::codec::{Decoder, Encoder};
use wrpc_transport::{Decode, Deferred as _, Encode};
/// A stream, which fails on each operation, this type should only ever be used in trait bounds
pub struct NoopStream;
impl AsyncRead for NoopStream {
fn poll_read(
self: Pin<&mut Self>,
_: &mut Context<'_>,
_: &mut tokio::io::ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::Other,
"should not be called",
)))
}
}
impl AsyncWrite for NoopStream {
fn poll_write(
self: Pin<&mut Self>,
_: &mut Context<'_>,
_: &[u8],
) -> Poll<std::io::Result<usize>> {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::Other,
"should not be called",
)))
}
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::Other,
"should not be called",
)))
}
fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::Other,
"should not be called",
)))
}
}
impl wrpc_transport::Index<Self> for NoopStream {
fn index(&self, _: &[usize]) -> anyhow::Result<Self> {
anyhow::bail!("should not be called")
}
}
/// Pack a [`wrpc_transport::Encode`] into a singular byte buffer `dst`.
///
/// This function does not support asynchronous values and will return an error is such a value is
/// passed in.
///
/// This is unstable API, which will be deprecated once feature-complete "packing" functionality is available in [`wrpc_transport`].
/// Track <https://github.com/bytecodealliance/wrpc/issues/25> for updates.
pub fn pack<T: Encode<NoopStream>>(
v: T,
dst: &mut BytesMut,
) -> Result<(), <T::Encoder as Encoder<T>>::Error> {
let mut enc = T::Encoder::default();
enc.encode(v, dst)?;
if enc.take_deferred().is_some() {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"value contains pending asynchronous values and cannot be packed",
)
.into());
}
Ok(())
}
/// Unpack a [`wrpc_transport::Decode`] from a byte buffer `dst`.
///
/// This function does not support asynchronous values and will return an error if `buf` contains pending async values.
///
/// If this function returns an error, contents of `buf` are undefined.
///
/// This is unstable API, which will be deprecated once feature-complete "unpacking" functionality is available in [`wrpc_transport`].
/// Track <https://github.com/bytecodealliance/wrpc/issues/25> for updates.
pub fn unpack<T: Decode<NoopStream>>(
buf: &mut BytesMut,
) -> Result<T, <T::Decoder as Decoder>::Error> {
let mut dec = T::Decoder::default();
let v = dec.decode(buf)?;
let v = v.ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "buffer is incomplete")
})?;
if dec.take_deferred().is_some() {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"buffer contains pending asynchronous values and cannot be unpacked",
)
.into());
}
Ok(v)
}