1macro_rules! make_stream {
2 ($read_trait:path, $write_trait:path, $buf_type:ty, $read_ret_type:ty) => {
3 #[doc=concat!("[`", stringify!($read_trait), "`] and [`", stringify!($write_trait), "`],")]
7 #[derive(Debug)]
16 pub struct AsyncBincodeStream<S, R, W, D> {
17 stream: AsyncBincodeReader<InternalAsyncWriter<S, W, D>, R>,
18 }
19
20 #[doc(hidden)]
21 pub struct InternalAsyncWriter<S, T, D>(AsyncBincodeWriter<S, T, D>);
22
23 impl<S: std::fmt::Debug, T, D> std::fmt::Debug for InternalAsyncWriter<S, T, D> {
24 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
25 self.get_ref().fmt(f)
26 }
27 }
28
29 impl<S, R, W> Default for AsyncBincodeStream<S, R, W, SyncDestination>
30 where
31 S: Default,
32 {
33 fn default() -> Self {
34 Self::from(S::default())
35 }
36 }
37
38 impl<S, R, W, D> AsyncBincodeStream<S, R, W, D> {
39 pub fn get_ref(&self) -> &S {
43 &self.stream.get_ref().0.get_ref()
44 }
45
46 pub fn get_mut(&mut self) -> &mut S {
50 self.stream.get_mut().0.get_mut()
51 }
52
53 pub fn into_inner(self) -> S {
58 self.stream.into_inner().0.into_inner()
59 }
60 }
61
62 impl<S, R, W> From<S> for AsyncBincodeStream<S, R, W, SyncDestination> {
63 fn from(stream: S) -> Self {
64 AsyncBincodeStream {
65 stream: AsyncBincodeReader::from(InternalAsyncWriter(
66 AsyncBincodeWriter::from(stream),
67 )),
68 }
69 }
70 }
71
72 impl<S, R, W, D> AsyncBincodeStream<S, R, W, D> {
73 pub fn for_async(self) -> AsyncBincodeStream<S, R, W, crate::AsyncDestination> {
77 let stream = self.into_inner();
78 AsyncBincodeStream {
79 stream: AsyncBincodeReader::from(InternalAsyncWriter(
80 AsyncBincodeWriter::from(stream).for_async(),
81 )),
82 }
83 }
84
85 pub fn for_sync(self) -> AsyncBincodeStream<S, R, W, crate::SyncDestination> {
89 AsyncBincodeStream::from(self.into_inner())
90 }
91 }
92
93 impl<S, T, D> $read_trait for InternalAsyncWriter<S, T, D>
94 where
95 S: $read_trait + Unpin,
96 {
97 fn poll_read(
98 self: std::pin::Pin<&mut Self>,
99 cx: &mut std::task::Context,
100 buf: &mut $buf_type,
101 ) -> std::task::Poll<std::io::Result<$read_ret_type>> {
102 std::pin::Pin::new(self.get_mut().get_mut()).poll_read(cx, buf)
103 }
104 }
105
106 impl<S, T, D> std::ops::Deref for InternalAsyncWriter<S, T, D> {
107 type Target = AsyncBincodeWriter<S, T, D>;
108 fn deref(&self) -> &Self::Target {
109 &self.0
110 }
111 }
112 impl<S, T, D> std::ops::DerefMut for InternalAsyncWriter<S, T, D> {
113 fn deref_mut(&mut self) -> &mut Self::Target {
114 &mut self.0
115 }
116 }
117
118 impl<S, R, W, D> futures_core::Stream for AsyncBincodeStream<S, R, W, D>
119 where
120 S: Unpin,
121 AsyncBincodeReader<InternalAsyncWriter<S, W, D>, R>:
122 futures_core::Stream<Item = Result<R, bincode::error::DecodeError>>,
123 {
124 type Item = Result<R, bincode::error::DecodeError>;
125 fn poll_next(
126 mut self: std::pin::Pin<&mut Self>,
127 cx: &mut std::task::Context,
128 ) -> std::task::Poll<Option<Self::Item>> {
129 std::pin::Pin::new(&mut self.stream).poll_next(cx)
130 }
131 }
132
133 impl<S, R, W, D> futures_sink::Sink<W> for AsyncBincodeStream<S, R, W, D>
134 where
135 S: Unpin,
136 AsyncBincodeWriter<S, W, D>: futures_sink::Sink<W, Error = bincode::error::EncodeError>,
137 {
138 type Error = bincode::error::EncodeError;
139
140 fn poll_ready(
141 mut self: std::pin::Pin<&mut Self>,
142 cx: &mut std::task::Context,
143 ) -> std::task::Poll<Result<(), Self::Error>> {
144 std::pin::Pin::new(&mut **self.stream.get_mut()).poll_ready(cx)
145 }
146
147 fn start_send(mut self: std::pin::Pin<&mut Self>, item: W) -> Result<(), Self::Error> {
148 std::pin::Pin::new(&mut **self.stream.get_mut()).start_send(item)
149 }
150
151 fn poll_flush(
152 mut self: std::pin::Pin<&mut Self>,
153 cx: &mut std::task::Context,
154 ) -> std::task::Poll<Result<(), Self::Error>> {
155 std::pin::Pin::new(&mut **self.stream.get_mut()).poll_flush(cx)
156 }
157
158 fn poll_close(
159 mut self: std::pin::Pin<&mut Self>,
160 cx: &mut std::task::Context,
161 ) -> std::task::Poll<Result<(), Self::Error>> {
162 std::pin::Pin::new(&mut **self.stream.get_mut()).poll_close(cx)
163 }
164 }
165 };
166}