1mod buffered;
2
3use crate::message::Message;
4use pin_project_lite::pin_project;
5use std::future::Future;
6use std::ops::DerefMut;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9use std::{fmt, mem};
10
11pub use buffered::Buffered;
12
13pub type BoxedTransport<'a, E> = Pin<Box<dyn AsyncTransport<Error = E> + std::marker::Send + 'a>>;
15
16impl<E> fmt::Debug for BoxedTransport<'_, E> {
17 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
18 f.debug_tuple("BoxedTransport").finish()
19 }
20}
21
22pub trait AsyncTransport {
57 type Error;
59
60 fn receive_poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<Message, Self::Error>>;
62
63 fn send_poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>>;
71
72 fn send_start(self: Pin<&mut Self>, msg: Message) -> Result<(), Self::Error>;
82
83 fn send_poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>>;
87}
88
89impl<T> AsyncTransport for Pin<T>
90where
91 T: DerefMut + Unpin,
92 T::Target: AsyncTransport,
93{
94 type Error = <T::Target as AsyncTransport>::Error;
95
96 fn receive_poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<Message, Self::Error>> {
97 self.get_mut().as_mut().receive_poll(cx)
98 }
99
100 fn send_poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
101 self.get_mut().as_mut().send_poll_ready(cx)
102 }
103
104 fn send_start(self: Pin<&mut Self>, msg: Message) -> Result<(), Self::Error> {
105 self.get_mut().as_mut().send_start(msg)
106 }
107
108 fn send_poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
109 self.get_mut().as_mut().send_poll_flush(cx)
110 }
111}
112
113impl<T> AsyncTransport for Box<T>
114where
115 T: AsyncTransport + Unpin + ?Sized,
116{
117 type Error = T::Error;
118
119 fn receive_poll(
120 mut self: Pin<&mut Self>,
121 cx: &mut Context,
122 ) -> Poll<Result<Message, Self::Error>> {
123 Pin::new(&mut **self).receive_poll(cx)
124 }
125
126 fn send_poll_ready(
127 mut self: Pin<&mut Self>,
128 cx: &mut Context,
129 ) -> Poll<Result<(), Self::Error>> {
130 Pin::new(&mut **self).send_poll_ready(cx)
131 }
132
133 fn send_start(mut self: Pin<&mut Self>, msg: Message) -> Result<(), Self::Error> {
134 Pin::new(&mut **self).send_start(msg)
135 }
136
137 fn send_poll_flush(
138 mut self: Pin<&mut Self>,
139 cx: &mut Context,
140 ) -> Poll<Result<(), Self::Error>> {
141 Pin::new(&mut **self).send_poll_flush(cx)
142 }
143}
144
145impl<T> AsyncTransport for &mut T
146where
147 T: AsyncTransport + Unpin + ?Sized,
148{
149 type Error = T::Error;
150
151 fn receive_poll(
152 mut self: Pin<&mut Self>,
153 cx: &mut Context,
154 ) -> Poll<Result<Message, Self::Error>> {
155 T::receive_poll(Pin::new(&mut **self), cx)
156 }
157
158 fn send_poll_ready(
159 mut self: Pin<&mut Self>,
160 cx: &mut Context,
161 ) -> Poll<Result<(), Self::Error>> {
162 T::send_poll_ready(Pin::new(&mut **self), cx)
163 }
164
165 fn send_start(mut self: Pin<&mut Self>, msg: Message) -> Result<(), Self::Error> {
166 T::send_start(Pin::new(&mut **self), msg)
167 }
168
169 fn send_poll_flush(
170 mut self: Pin<&mut Self>,
171 cx: &mut Context,
172 ) -> Poll<Result<(), Self::Error>> {
173 T::send_poll_flush(Pin::new(&mut **self), cx)
174 }
175}
176
177pub trait AsyncTransportExt: AsyncTransport {
178 fn receive(&mut self) -> Receive<'_, Self>
179 where
180 Self: Unpin,
181 {
182 Receive(self)
183 }
184
185 fn send(&mut self, msg: impl Into<Message>) -> Send<'_, Self>
186 where
187 Self: Unpin,
188 {
189 Send {
190 t: self,
191 msg: Some(msg.into()),
192 }
193 }
194
195 fn flush(&mut self) -> Flush<'_, Self>
196 where
197 Self: Unpin,
198 {
199 Flush(self)
200 }
201
202 fn send_and_flush(&mut self, msg: impl Into<Message>) -> SendFlush<'_, Self>
203 where
204 Self: Unpin,
205 {
206 SendFlush(SendFlushInner::Send(self.send(msg)))
207 }
208
209 fn receive_poll_unpin(&mut self, cx: &mut Context) -> Poll<Result<Message, Self::Error>>
210 where
211 Self: Unpin,
212 {
213 Pin::new(self).receive_poll(cx)
214 }
215
216 fn send_poll_ready_unpin(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>>
217 where
218 Self: Unpin,
219 {
220 Pin::new(self).send_poll_ready(cx)
221 }
222
223 fn send_start_unpin(&mut self, msg: impl Into<Message>) -> Result<(), Self::Error>
224 where
225 Self: Unpin,
226 {
227 Pin::new(self).send_start(msg.into())
228 }
229
230 fn send_poll_flush_unpin(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>>
231 where
232 Self: Unpin,
233 {
234 Pin::new(self).send_poll_flush(cx)
235 }
236
237 fn map_err<F, E>(self, f: F) -> MapError<Self, F>
238 where
239 Self: Sized,
240 F: FnMut(Self::Error) -> E,
241 {
242 MapError {
243 transport: self,
244 map_err: f,
245 }
246 }
247
248 fn boxed<'a>(self) -> BoxedTransport<'a, Self::Error>
249 where
250 Self: Sized + std::marker::Send + 'a,
251 {
252 Box::pin(self)
253 }
254
255 fn buffered(self) -> Buffered<Self>
256 where
257 Self: Sized,
258 {
259 Buffered::new(self)
260 }
261}
262
263impl<T> AsyncTransportExt for T where T: AsyncTransport {}
264
265#[derive(Debug)]
266#[must_use = "futures do nothing unless you `.await` or poll them"]
267pub struct Receive<'a, T>(&'a mut T)
268where
269 T: AsyncTransport + Unpin + ?Sized;
270
271impl<T> Future for Receive<'_, T>
272where
273 T: AsyncTransport + Unpin + ?Sized,
274{
275 type Output = Result<Message, T::Error>;
276
277 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
278 self.0.receive_poll_unpin(cx)
279 }
280}
281
282#[derive(Debug)]
283#[must_use = "futures do nothing unless you `.await` or poll them"]
284pub struct Send<'a, T>
285where
286 T: AsyncTransport + Unpin + ?Sized,
287{
288 t: &'a mut T,
289 msg: Option<Message>,
290}
291
292impl<T> Future for Send<'_, T>
293where
294 T: AsyncTransport + Unpin + ?Sized,
295{
296 type Output = Result<(), T::Error>;
297
298 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
299 match self.t.send_poll_ready_unpin(cx) {
300 Poll::Ready(Ok(())) => {
301 let msg = self.msg.take().unwrap();
302 if let Err(e) = self.t.send_start_unpin(msg) {
303 return Poll::Ready(Err(e));
304 }
305 Poll::Ready(Ok(()))
306 }
307
308 Poll::Ready(Err(e)) => {
309 self.msg.take();
310 Poll::Ready(Err(e))
311 }
312
313 Poll::Pending => Poll::Pending,
314 }
315 }
316}
317
318#[derive(Debug)]
319#[must_use = "futures do nothing unless you `.await` or poll them"]
320pub struct Flush<'a, T>(&'a mut T)
321where
322 T: AsyncTransport + Unpin + ?Sized;
323
324impl<T> Future for Flush<'_, T>
325where
326 T: AsyncTransport + Unpin + ?Sized,
327{
328 type Output = Result<(), T::Error>;
329
330 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
331 self.0.send_poll_flush_unpin(cx)
332 }
333}
334
335#[derive(Debug)]
336#[must_use = "futures do nothing unless you `.await` or poll them"]
337pub struct SendFlush<'a, T>(SendFlushInner<'a, T>)
338where
339 T: AsyncTransport + Unpin + ?Sized;
340
341#[derive(Debug)]
342enum SendFlushInner<'a, T>
343where
344 T: AsyncTransport + Unpin + ?Sized,
345{
346 Send(Send<'a, T>),
347 Flush(Flush<'a, T>),
348 None,
349}
350
351impl<T> Future for SendFlush<'_, T>
352where
353 T: AsyncTransport + Unpin + ?Sized,
354{
355 type Output = Result<(), T::Error>;
356
357 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
358 if let SendFlushInner::Send(ref mut send) = self.0 {
359 match Pin::new(send).poll(cx) {
360 Poll::Ready(Ok(())) => {}
361 p => return p,
362 }
363
364 let mut tmp = SendFlushInner::None;
365 mem::swap(&mut tmp, &mut self.0);
366 let t = match tmp {
367 SendFlushInner::Send(s) => s.t,
368 _ => unreachable!(),
369 };
370 self.0 = SendFlushInner::Flush(Flush(t));
371 }
372
373 match self.0 {
374 SendFlushInner::Flush(ref mut flush) => Pin::new(flush).poll(cx),
375 _ => unreachable!(),
376 }
377 }
378}
379
380pin_project! {
381 #[derive(Debug)]
382 pub struct MapError<T, F> {
383 #[pin]
384 transport: T,
385 map_err: F,
386 }
387}
388
389impl<T, F, E> AsyncTransport for MapError<T, F>
390where
391 T: AsyncTransport,
392 F: FnMut(T::Error) -> E,
393{
394 type Error = E;
395
396 fn receive_poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<Message, Self::Error>> {
397 let this = self.project();
398 this.transport.receive_poll(cx).map_err(this.map_err)
399 }
400
401 fn send_poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
402 let this = self.project();
403 this.transport.send_poll_ready(cx).map_err(this.map_err)
404 }
405
406 fn send_start(self: Pin<&mut Self>, msg: Message) -> Result<(), Self::Error> {
407 let this = self.project();
408 this.transport.send_start(msg).map_err(this.map_err)
409 }
410
411 fn send_poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
412 let this = self.project();
413 this.transport.send_poll_flush(cx).map_err(this.map_err)
414 }
415}