use std::future::Future;
use std::task::{Context, Poll};
use cbor2::async_io::{self, AsyncRead, AsyncWrite};
fn block_on<F: Future>(future: F) -> F::Output {
let waker = std::task::Waker::noop();
let mut cx = Context::from_waker(waker);
let mut future = std::pin::pin!(future);
match future.as_mut().poll(&mut cx) {
Poll::Ready(value) => value,
Poll::Pending => panic!("test future unexpectedly pending"),
}
}
struct Cursor {
data: Vec<u8>,
pos: usize,
}
impl Cursor {
fn new(data: Vec<u8>) -> Self {
Self { data, pos: 0 }
}
}
impl AsyncRead for Cursor {
async fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), std::io::Error> {
let end = self.pos + buf.len();
if end > self.data.len() {
return Err(std::io::ErrorKind::UnexpectedEof.into());
}
buf.copy_from_slice(&self.data[self.pos..end]);
self.pos = end;
Ok(())
}
}
#[derive(Default)]
struct Sink {
data: Vec<u8>,
flushed: bool,
}
impl AsyncWrite for Sink {
async fn write_all(&mut self, data: &[u8]) -> Result<(), std::io::Error> {
self.data.extend_from_slice(data);
Ok(())
}
async fn flush(&mut self) -> Result<(), std::io::Error> {
self.flushed = true;
Ok(())
}
}
#[test]
fn read_item_stops_at_one_complete_item() {
let first = cbor2::to_vec(&("hi", 7u8)).unwrap();
let second = cbor2::to_vec(&false).unwrap();
let mut stream = first.clone();
stream.extend_from_slice(&second);
let mut reader = Cursor::new(stream);
let item = block_on(async_io::read_item(&mut reader)).unwrap();
assert_eq!(item, first);
assert_eq!(reader.pos, first.len());
let value: (String, u8) = cbor2::from_slice(&item).unwrap();
assert_eq!(value, ("hi".into(), 7));
}
#[test]
fn read_value_deserializes_owned_values() {
let bytes = cbor2::to_vec(&vec!["a".to_string(), "b".to_string()]).unwrap();
let mut reader = Cursor::new(bytes);
let out: Vec<String> = block_on(async_io::read_value(&mut reader)).unwrap();
assert_eq!(out, ["a", "b"]);
}
#[test]
fn read_item_validates_text_and_structure() {
let mut invalid_text = Cursor::new(hex::decode("62fffe").unwrap());
assert!(matches!(
block_on(async_io::read_item(&mut invalid_text)),
Err(cbor2::de::Error::Syntax(1))
));
let mut dangling_key = Cursor::new(hex::decode("bf6161ff").unwrap());
assert!(block_on(async_io::read_item(&mut dangling_key)).is_err());
}
#[test]
fn read_item_walks_nested_indefinite_and_tags() {
let item = hex::decode("85c06178a101027f6261626163ff5f41de41adff83010203").unwrap();
assert!(
cbor2::validate(&item[..]).is_ok(),
"test vector must be one item"
);
let mut stream = item.clone();
stream.extend_from_slice(&cbor2::to_vec(&99u8).unwrap());
let mut reader = Cursor::new(stream);
let got = block_on(async_io::read_item(&mut reader)).unwrap();
assert_eq!(got, item);
assert_eq!(reader.pos, item.len());
}
#[test]
fn read_item_enforces_recursion_limit() {
let mut bytes = vec![0x81u8; 1000];
bytes.push(0x00);
let mut reader = Cursor::new(bytes);
assert!(matches!(
block_on(async_io::read_item(&mut reader)),
Err(cbor2::de::Error::RecursionLimitExceeded)
));
}
#[test]
fn read_item_future_is_send() {
fn assert_send<T: Send>(value: T) -> T {
value
}
let one = cbor2::to_vec(&1u8).unwrap();
let mut reader = Cursor::new(one.clone());
let fut = assert_send(async_io::read_item(&mut reader));
assert_eq!(block_on(fut).unwrap(), one);
}
#[test]
fn futures_are_send_in_generic_context() {
fn assert_send<T: Send>(value: T) -> T {
value
}
fn check_read<R: AsyncRead + Send>(reader: &mut R) -> impl Send + '_ {
assert_send(async_io::read_item(reader))
}
fn check_value<R: AsyncRead + Send>(reader: &mut R) -> impl Send + '_ {
assert_send(async_io::read_value::<u8, R>(reader))
}
fn check_write<W: AsyncWrite + Send>(writer: &mut W) -> impl Send + '_ {
assert_send(async_io::write_value(writer, &1u8))
}
let _ = (
check_read::<Cursor>,
check_value::<Cursor>,
check_write::<Sink>,
);
}
#[test]
fn write_helpers_emit_exactly_one_item() {
let mut sink = Sink::default();
block_on(async_io::write_value(&mut sink, &("ok", 1u8))).unwrap();
assert!(sink.flushed);
assert_eq!(
cbor2::from_slice::<(String, u8)>(&sink.data).unwrap(),
("ok".into(), 1)
);
let mut item = cbor2::to_vec(&1u8).unwrap();
item.extend_from_slice(&cbor2::to_vec(&2u8).unwrap());
assert!(block_on(async_io::write_item(&mut Sink::default(), &item)).is_err());
}
#[cfg(feature = "futures")]
struct FuturesCursor(Cursor);
#[cfg(feature = "futures")]
impl futures_io::AsyncRead for FuturesCursor {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize, std::io::Error>> {
let remaining = self.0.data.len().saturating_sub(self.0.pos);
let n = remaining.min(buf.len());
if n == 0 {
return Poll::Ready(Ok(0));
}
let end = self.0.pos + n;
buf[..n].copy_from_slice(&self.0.data[self.0.pos..end]);
self.0.pos = end;
Poll::Ready(Ok(n))
}
}
#[cfg(feature = "futures")]
#[derive(Default)]
struct FuturesSink(Sink);
#[cfg(feature = "futures")]
impl futures_io::AsyncWrite for FuturesSink {
fn poll_write(
mut self: std::pin::Pin<&mut Self>,
_cx: &mut Context<'_>,
data: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
self.0.data.extend_from_slice(data);
Poll::Ready(Ok(data.len()))
}
fn poll_flush(
mut self: std::pin::Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
self.0.flushed = true;
Poll::Ready(Ok(()))
}
fn poll_close(
self: std::pin::Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
Poll::Ready(Ok(()))
}
}
#[cfg(feature = "futures")]
#[test]
fn futures_async_traits_are_supported() {
let first = cbor2::to_vec(&("futures", 7u8)).unwrap();
let mut reader = FuturesCursor(Cursor::new(first.clone()));
let item = block_on(async_io::futures::read_item(&mut reader)).unwrap();
assert_eq!(item, first);
let mut reader = FuturesCursor(Cursor::new(cbor2::to_vec(&"owned").unwrap()));
let value: String = block_on(async_io::futures::read_value(&mut reader)).unwrap();
assert_eq!(value, "owned");
let mut sink = FuturesSink::default();
block_on(async_io::futures::write_value(&mut sink, &("ok", 9u8))).unwrap();
assert!(sink.0.flushed);
assert_eq!(
cbor2::from_slice::<(String, u8)>(&sink.0.data).unwrap(),
("ok".into(), 9)
);
let mut sink = FuturesSink::default();
block_on(async_io::futures::write_item(&mut sink, &first)).unwrap();
assert_eq!(sink.0.data, first);
}
#[cfg(feature = "tokio")]
struct TokioCursor(Cursor);
#[cfg(feature = "tokio")]
impl ::tokio::io::AsyncRead for TokioCursor {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &mut ::tokio::io::ReadBuf<'_>,
) -> Poll<Result<(), std::io::Error>> {
let remaining = self.0.data.len().saturating_sub(self.0.pos);
let n = remaining.min(buf.remaining());
let end = self.0.pos + n;
buf.put_slice(&self.0.data[self.0.pos..end]);
self.0.pos = end;
Poll::Ready(Ok(()))
}
}
#[cfg(feature = "tokio")]
#[derive(Default)]
struct TokioSink(Sink);
#[cfg(feature = "tokio")]
impl ::tokio::io::AsyncWrite for TokioSink {
fn poll_write(
mut self: std::pin::Pin<&mut Self>,
_cx: &mut Context<'_>,
data: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
self.0.data.extend_from_slice(data);
Poll::Ready(Ok(data.len()))
}
fn poll_flush(
mut self: std::pin::Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
self.0.flushed = true;
Poll::Ready(Ok(()))
}
fn poll_shutdown(
self: std::pin::Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
Poll::Ready(Ok(()))
}
}
#[cfg(feature = "tokio")]
#[test]
fn tokio_async_traits_are_supported() {
let first = cbor2::to_vec(&("tokio", 7u8)).unwrap();
let mut reader = TokioCursor(Cursor::new(first.clone()));
let item = block_on(async_io::tokio::read_item(&mut reader)).unwrap();
assert_eq!(item, first);
let mut reader = TokioCursor(Cursor::new(cbor2::to_vec(&"owned").unwrap()));
let value: String = block_on(async_io::tokio::read_value(&mut reader)).unwrap();
assert_eq!(value, "owned");
let mut sink = TokioSink::default();
block_on(async_io::tokio::write_value(&mut sink, &("ok", 9u8))).unwrap();
assert!(sink.0.flushed);
assert_eq!(
cbor2::from_slice::<(String, u8)>(&sink.0.data).unwrap(),
("ok".into(), 9)
);
let mut sink = TokioSink::default();
block_on(async_io::tokio::write_item(&mut sink, &first)).unwrap();
assert_eq!(sink.0.data, first);
}