1use core::pin::Pin;
10use core::task::{Context, Poll};
11
12use bytes::BytesMut;
13use tokio::io::{AsyncRead, AsyncWrite};
14use tokio_util::codec::{Decoder, Encoder};
15use wrpc_transport::{Decode, Deferred as _, Encode};
16
17pub struct NoopStream;
19
20impl AsyncRead for NoopStream {
21 fn poll_read(
22 self: Pin<&mut Self>,
23 _: &mut Context<'_>,
24 _: &mut tokio::io::ReadBuf<'_>,
25 ) -> Poll<std::io::Result<()>> {
26 Poll::Ready(Err(std::io::Error::new(
27 std::io::ErrorKind::Other,
28 "should not be called",
29 )))
30 }
31}
32
33impl AsyncWrite for NoopStream {
34 fn poll_write(
35 self: Pin<&mut Self>,
36 _: &mut Context<'_>,
37 _: &[u8],
38 ) -> Poll<std::io::Result<usize>> {
39 Poll::Ready(Err(std::io::Error::new(
40 std::io::ErrorKind::Other,
41 "should not be called",
42 )))
43 }
44
45 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<std::io::Result<()>> {
46 Poll::Ready(Err(std::io::Error::new(
47 std::io::ErrorKind::Other,
48 "should not be called",
49 )))
50 }
51
52 fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<std::io::Result<()>> {
53 Poll::Ready(Err(std::io::Error::new(
54 std::io::ErrorKind::Other,
55 "should not be called",
56 )))
57 }
58}
59
60impl wrpc_transport::Index<Self> for NoopStream {
61 fn index(&self, _: &[usize]) -> anyhow::Result<Self> {
62 anyhow::bail!("should not be called")
63 }
64}
65
66pub fn pack<T: Encode<NoopStream>>(
74 v: T,
75 dst: &mut BytesMut,
76) -> Result<(), <T::Encoder as Encoder<T>>::Error> {
77 let mut enc = T::Encoder::default();
78 enc.encode(v, dst)?;
79 if enc.take_deferred().is_some() {
80 return Err(std::io::Error::new(
81 std::io::ErrorKind::InvalidData,
82 "value contains pending asynchronous values and cannot be packed",
83 )
84 .into());
85 }
86 Ok(())
87}
88
89pub fn unpack<T: Decode<NoopStream>>(
98 buf: &mut BytesMut,
99) -> Result<T, <T::Decoder as Decoder>::Error> {
100 let mut dec = T::Decoder::default();
101 let v = dec.decode(buf)?;
102 let v = v.ok_or_else(|| {
103 std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "buffer is incomplete")
104 })?;
105 if dec.take_deferred().is_some() {
106 return Err(std::io::Error::new(
107 std::io::ErrorKind::InvalidData,
108 "buffer contains pending asynchronous values and cannot be unpacked",
109 )
110 .into());
111 }
112 Ok(v)
113}