async_bincode/
stream.rs

1macro_rules! make_stream {
2    ($read_trait:path, $write_trait:path, $buf_type:ty, $read_ret_type:ty) => {
3        /// A wrapper around an asynchronous stream that receives and sends bincode-encoded values.
4        ///
5        /// To use, provide a stream that implements both
6        #[doc=concat!("[`", stringify!($read_trait), "`] and [`", stringify!($write_trait), "`],")]
7        /// and then use [`futures_sink::Sink`] to send values and [`futures_core::Stream`] to
8        /// receive them.
9        ///
10        /// Note that an `AsyncBincodeStream` must be of the type [`crate::AsyncDestination`] in
11        /// order to be compatible with an [`AsyncBincodeReader`] on the remote end (recall that it
12        /// requires the serialized size prefixed to the serialized data). The default is
13        /// [`crate::SyncDestination`], but these can be easily toggled between using
14        /// [`AsyncBincodeStream::for_async`].
15        #[derive(Debug)]
16        pub struct AsyncBincodeStream<S, R, W, D> {
17            stream: AsyncBincodeReader<InternalAsyncWriter<S, W, D>, R>,
18        }
19
20        #[doc(hidden)]
21        pub struct InternalAsyncWriter<S, T, D>(AsyncBincodeWriter<S, T, D>);
22
23        impl<S: std::fmt::Debug, T, D> std::fmt::Debug for InternalAsyncWriter<S, T, D> {
24            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
25                self.get_ref().fmt(f)
26            }
27        }
28
29        impl<S, R, W> Default for AsyncBincodeStream<S, R, W, SyncDestination>
30        where
31            S: Default,
32        {
33            fn default() -> Self {
34                Self::from(S::default())
35            }
36        }
37
38        impl<S, R, W, D> AsyncBincodeStream<S, R, W, D> {
39            /// Gets a reference to the underlying stream.
40            ///
41            /// It is inadvisable to directly read from or write to the underlying stream.
42            pub fn get_ref(&self) -> &S {
43                &self.stream.get_ref().0.get_ref()
44            }
45
46            /// Gets a mutable reference to the underlying stream.
47            ///
48            /// It is inadvisable to directly read from or write to the underlying stream.
49            pub fn get_mut(&mut self) -> &mut S {
50                self.stream.get_mut().0.get_mut()
51            }
52
53            /// Unwraps this `AsyncBincodeStream`, returning the underlying stream.
54            ///
55            /// Note that any leftover serialized data that has not yet been sent, or received data that
56            /// has not yet been deserialized, is lost.
57            pub fn into_inner(self) -> S {
58                self.stream.into_inner().0.into_inner()
59            }
60        }
61
62        impl<S, R, W> From<S> for AsyncBincodeStream<S, R, W, SyncDestination> {
63            fn from(stream: S) -> Self {
64                AsyncBincodeStream {
65                    stream: AsyncBincodeReader::from(InternalAsyncWriter(
66                        AsyncBincodeWriter::from(stream),
67                    )),
68                }
69            }
70        }
71
72        impl<S, R, W, D> AsyncBincodeStream<S, R, W, D> {
73            /// Make this stream include the serialized data's size before each serialized value.
74            ///
75            /// This is necessary for compatability with a remote [`AsyncBincodeReader`].
76            pub fn for_async(self) -> AsyncBincodeStream<S, R, W, crate::AsyncDestination> {
77                let stream = self.into_inner();
78                AsyncBincodeStream {
79                    stream: AsyncBincodeReader::from(InternalAsyncWriter(
80                        AsyncBincodeWriter::from(stream).for_async(),
81                    )),
82                }
83            }
84
85            /// Make this stream only send bincode-encoded values.
86            ///
87            /// This is necessary for compatability with stock `bincode` receivers.
88            pub fn for_sync(self) -> AsyncBincodeStream<S, R, W, crate::SyncDestination> {
89                AsyncBincodeStream::from(self.into_inner())
90            }
91        }
92
93        impl<S, T, D> $read_trait for InternalAsyncWriter<S, T, D>
94        where
95            S: $read_trait + Unpin,
96        {
97            fn poll_read(
98                self: std::pin::Pin<&mut Self>,
99                cx: &mut std::task::Context,
100                buf: &mut $buf_type,
101            ) -> std::task::Poll<std::io::Result<$read_ret_type>> {
102                std::pin::Pin::new(self.get_mut().get_mut()).poll_read(cx, buf)
103            }
104        }
105
106        impl<S, T, D> std::ops::Deref for InternalAsyncWriter<S, T, D> {
107            type Target = AsyncBincodeWriter<S, T, D>;
108            fn deref(&self) -> &Self::Target {
109                &self.0
110            }
111        }
112        impl<S, T, D> std::ops::DerefMut for InternalAsyncWriter<S, T, D> {
113            fn deref_mut(&mut self) -> &mut Self::Target {
114                &mut self.0
115            }
116        }
117
118        impl<S, R, W, D> futures_core::Stream for AsyncBincodeStream<S, R, W, D>
119        where
120            S: Unpin,
121            AsyncBincodeReader<InternalAsyncWriter<S, W, D>, R>:
122                futures_core::Stream<Item = Result<R, bincode::error::DecodeError>>,
123        {
124            type Item = Result<R, bincode::error::DecodeError>;
125            fn poll_next(
126                mut self: std::pin::Pin<&mut Self>,
127                cx: &mut std::task::Context,
128            ) -> std::task::Poll<Option<Self::Item>> {
129                std::pin::Pin::new(&mut self.stream).poll_next(cx)
130            }
131        }
132
133        impl<S, R, W, D> futures_sink::Sink<W> for AsyncBincodeStream<S, R, W, D>
134        where
135            S: Unpin,
136            AsyncBincodeWriter<S, W, D>: futures_sink::Sink<W, Error = bincode::error::EncodeError>,
137        {
138            type Error = bincode::error::EncodeError;
139
140            fn poll_ready(
141                mut self: std::pin::Pin<&mut Self>,
142                cx: &mut std::task::Context,
143            ) -> std::task::Poll<Result<(), Self::Error>> {
144                std::pin::Pin::new(&mut **self.stream.get_mut()).poll_ready(cx)
145            }
146
147            fn start_send(mut self: std::pin::Pin<&mut Self>, item: W) -> Result<(), Self::Error> {
148                std::pin::Pin::new(&mut **self.stream.get_mut()).start_send(item)
149            }
150
151            fn poll_flush(
152                mut self: std::pin::Pin<&mut Self>,
153                cx: &mut std::task::Context,
154            ) -> std::task::Poll<Result<(), Self::Error>> {
155                std::pin::Pin::new(&mut **self.stream.get_mut()).poll_flush(cx)
156            }
157
158            fn poll_close(
159                mut self: std::pin::Pin<&mut Self>,
160                cx: &mut std::task::Context,
161            ) -> std::task::Poll<Result<(), Self::Error>> {
162                std::pin::Pin::new(&mut **self.stream.get_mut()).poll_close(cx)
163            }
164        }
165    };
166}