aldrin_core/
transport.rs

1mod buffered;
2
3use crate::message::Message;
4use pin_project_lite::pin_project;
5use std::future::Future;
6use std::ops::DerefMut;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9use std::{fmt, mem};
10
11pub use buffered::Buffered;
12
13/// Boxed [`AsyncTransport`] type returned by [`AsyncTransportExt::boxed`].
14pub type BoxedTransport<'a, E> = Pin<Box<dyn AsyncTransport<Error = E> + std::marker::Send + 'a>>;
15
16impl<E> fmt::Debug for BoxedTransport<'_, E> {
17    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
18        f.debug_tuple("BoxedTransport").finish()
19    }
20}
21
22/// Bidirectional asynchronous message transport.
23///
24/// This trait represents the core abstraction used throughout Aldrin for communication between
25/// clients and the broker. It is essentially a combination of `Futures`' [`Stream`] trait for
26/// receiving [`Message`s] and the [`Sink`] trait for sending.
27///
28/// Implementations must be reliable and ordered. Reliable means that [`Message`s] must not get
29/// corrupted. Ordered means that [`Message`s] must be delivered in the same order they were
30/// sent.
31///
32/// Typical implementations include:
33///
34/// - TCP/IP across a network.
35/// - Unix domain sockets (`SOCK_STREAM`) between processes on a single machine.
36/// - [Channels] inside a single process.
37///
38/// # Shutdown
39///
40/// Transports shut down only implicitly when dropped or in the case of errors. There is no
41/// explicit shutdown method, because the Aldrin protocol defines the [`Shutdown`] message, after
42/// which users of this trait are expected to drop the transport. Any unexpected shutdown must be
43/// signaled with an [`Error`] by the implementation.
44///
45/// # Errors
46///
47/// All methods may return an [`Error`] at any time. Afterwards, the transport should be considered
48/// unusable. Implementations may panic in any further method calls.
49///
50/// [Channels]: https://docs.rs/futures/latest/futures/channel/mpsc/index.html
51/// [`Error`]: AsyncTransport::Error
52/// [`Message`s]: Message
53/// [`Sink`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
54/// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
55/// [`Shutdown`]: Message::Shutdown
56pub trait AsyncTransport {
57    /// Error type when sending or receiving messages.
58    type Error;
59
60    /// Attempts to receive the next message.
61    fn receive_poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<Message, Self::Error>>;
62
63    /// Prepares the transport for sending a message.
64    ///
65    /// This method must be called before sending a [`Message`] with
66    /// [`send_start`](AsyncTransport::send_start). Only when it returns `Poll::Ready(Ok(()))` is
67    /// the transport ready to start sending a single [`Message`].
68    ///
69    /// The transport may be implicitly flushed, fully or partially, when this method is called.
70    fn send_poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>>;
71
72    /// Begins sending a message.
73    ///
74    /// Every call to this method must be preceded by a successful call to
75    /// [`send_poll_ready`](AsyncTransport::send_poll_ready).
76    ///
77    /// Sending a [`Message`] may flush the transport, but does not necessarily do so. Thus, even
78    /// when `Ok(())` is returned, the [`Message`] may not yet be delivered to the remote end of
79    /// the transport. Use [`send_poll_flush`](AsyncTransport::send_poll_flush) to explicitly flush
80    /// the transport.
81    fn send_start(self: Pin<&mut Self>, msg: Message) -> Result<(), Self::Error>;
82
83    /// Attempts to flush the transport.
84    ///
85    /// Flushing must deliver _all_ prior [`Message`s](Message) to the remote end of the transport.
86    fn send_poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>>;
87}
88
89impl<T> AsyncTransport for Pin<T>
90where
91    T: DerefMut + Unpin,
92    T::Target: AsyncTransport,
93{
94    type Error = <T::Target as AsyncTransport>::Error;
95
96    fn receive_poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<Message, Self::Error>> {
97        self.get_mut().as_mut().receive_poll(cx)
98    }
99
100    fn send_poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
101        self.get_mut().as_mut().send_poll_ready(cx)
102    }
103
104    fn send_start(self: Pin<&mut Self>, msg: Message) -> Result<(), Self::Error> {
105        self.get_mut().as_mut().send_start(msg)
106    }
107
108    fn send_poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
109        self.get_mut().as_mut().send_poll_flush(cx)
110    }
111}
112
113impl<T> AsyncTransport for Box<T>
114where
115    T: AsyncTransport + Unpin + ?Sized,
116{
117    type Error = T::Error;
118
119    fn receive_poll(
120        mut self: Pin<&mut Self>,
121        cx: &mut Context,
122    ) -> Poll<Result<Message, Self::Error>> {
123        Pin::new(&mut **self).receive_poll(cx)
124    }
125
126    fn send_poll_ready(
127        mut self: Pin<&mut Self>,
128        cx: &mut Context,
129    ) -> Poll<Result<(), Self::Error>> {
130        Pin::new(&mut **self).send_poll_ready(cx)
131    }
132
133    fn send_start(mut self: Pin<&mut Self>, msg: Message) -> Result<(), Self::Error> {
134        Pin::new(&mut **self).send_start(msg)
135    }
136
137    fn send_poll_flush(
138        mut self: Pin<&mut Self>,
139        cx: &mut Context,
140    ) -> Poll<Result<(), Self::Error>> {
141        Pin::new(&mut **self).send_poll_flush(cx)
142    }
143}
144
145impl<T> AsyncTransport for &mut T
146where
147    T: AsyncTransport + Unpin + ?Sized,
148{
149    type Error = T::Error;
150
151    fn receive_poll(
152        mut self: Pin<&mut Self>,
153        cx: &mut Context,
154    ) -> Poll<Result<Message, Self::Error>> {
155        T::receive_poll(Pin::new(&mut **self), cx)
156    }
157
158    fn send_poll_ready(
159        mut self: Pin<&mut Self>,
160        cx: &mut Context,
161    ) -> Poll<Result<(), Self::Error>> {
162        T::send_poll_ready(Pin::new(&mut **self), cx)
163    }
164
165    fn send_start(mut self: Pin<&mut Self>, msg: Message) -> Result<(), Self::Error> {
166        T::send_start(Pin::new(&mut **self), msg)
167    }
168
169    fn send_poll_flush(
170        mut self: Pin<&mut Self>,
171        cx: &mut Context,
172    ) -> Poll<Result<(), Self::Error>> {
173        T::send_poll_flush(Pin::new(&mut **self), cx)
174    }
175}
176
177pub trait AsyncTransportExt: AsyncTransport {
178    fn receive(&mut self) -> Receive<'_, Self>
179    where
180        Self: Unpin,
181    {
182        Receive(self)
183    }
184
185    fn send(&mut self, msg: impl Into<Message>) -> Send<'_, Self>
186    where
187        Self: Unpin,
188    {
189        Send {
190            t: self,
191            msg: Some(msg.into()),
192        }
193    }
194
195    fn flush(&mut self) -> Flush<'_, Self>
196    where
197        Self: Unpin,
198    {
199        Flush(self)
200    }
201
202    fn send_and_flush(&mut self, msg: impl Into<Message>) -> SendFlush<'_, Self>
203    where
204        Self: Unpin,
205    {
206        SendFlush(SendFlushInner::Send(self.send(msg)))
207    }
208
209    fn receive_poll_unpin(&mut self, cx: &mut Context) -> Poll<Result<Message, Self::Error>>
210    where
211        Self: Unpin,
212    {
213        Pin::new(self).receive_poll(cx)
214    }
215
216    fn send_poll_ready_unpin(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>>
217    where
218        Self: Unpin,
219    {
220        Pin::new(self).send_poll_ready(cx)
221    }
222
223    fn send_start_unpin(&mut self, msg: impl Into<Message>) -> Result<(), Self::Error>
224    where
225        Self: Unpin,
226    {
227        Pin::new(self).send_start(msg.into())
228    }
229
230    fn send_poll_flush_unpin(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>>
231    where
232        Self: Unpin,
233    {
234        Pin::new(self).send_poll_flush(cx)
235    }
236
237    fn map_err<F, E>(self, f: F) -> MapError<Self, F>
238    where
239        Self: Sized,
240        F: FnMut(Self::Error) -> E,
241    {
242        MapError {
243            transport: self,
244            map_err: f,
245        }
246    }
247
248    fn boxed<'a>(self) -> BoxedTransport<'a, Self::Error>
249    where
250        Self: Sized + std::marker::Send + 'a,
251    {
252        Box::pin(self)
253    }
254
255    fn buffered(self) -> Buffered<Self>
256    where
257        Self: Sized,
258    {
259        Buffered::new(self)
260    }
261}
262
263impl<T> AsyncTransportExt for T where T: AsyncTransport {}
264
265#[derive(Debug)]
266#[must_use = "futures do nothing unless you `.await` or poll them"]
267pub struct Receive<'a, T>(&'a mut T)
268where
269    T: AsyncTransport + Unpin + ?Sized;
270
271impl<T> Future for Receive<'_, T>
272where
273    T: AsyncTransport + Unpin + ?Sized,
274{
275    type Output = Result<Message, T::Error>;
276
277    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
278        self.0.receive_poll_unpin(cx)
279    }
280}
281
282#[derive(Debug)]
283#[must_use = "futures do nothing unless you `.await` or poll them"]
284pub struct Send<'a, T>
285where
286    T: AsyncTransport + Unpin + ?Sized,
287{
288    t: &'a mut T,
289    msg: Option<Message>,
290}
291
292impl<T> Future for Send<'_, T>
293where
294    T: AsyncTransport + Unpin + ?Sized,
295{
296    type Output = Result<(), T::Error>;
297
298    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
299        match self.t.send_poll_ready_unpin(cx) {
300            Poll::Ready(Ok(())) => {
301                let msg = self.msg.take().unwrap();
302                if let Err(e) = self.t.send_start_unpin(msg) {
303                    return Poll::Ready(Err(e));
304                }
305                Poll::Ready(Ok(()))
306            }
307
308            Poll::Ready(Err(e)) => {
309                self.msg.take();
310                Poll::Ready(Err(e))
311            }
312
313            Poll::Pending => Poll::Pending,
314        }
315    }
316}
317
318#[derive(Debug)]
319#[must_use = "futures do nothing unless you `.await` or poll them"]
320pub struct Flush<'a, T>(&'a mut T)
321where
322    T: AsyncTransport + Unpin + ?Sized;
323
324impl<T> Future for Flush<'_, T>
325where
326    T: AsyncTransport + Unpin + ?Sized,
327{
328    type Output = Result<(), T::Error>;
329
330    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
331        self.0.send_poll_flush_unpin(cx)
332    }
333}
334
335#[derive(Debug)]
336#[must_use = "futures do nothing unless you `.await` or poll them"]
337pub struct SendFlush<'a, T>(SendFlushInner<'a, T>)
338where
339    T: AsyncTransport + Unpin + ?Sized;
340
341#[derive(Debug)]
342enum SendFlushInner<'a, T>
343where
344    T: AsyncTransport + Unpin + ?Sized,
345{
346    Send(Send<'a, T>),
347    Flush(Flush<'a, T>),
348    None,
349}
350
351impl<T> Future for SendFlush<'_, T>
352where
353    T: AsyncTransport + Unpin + ?Sized,
354{
355    type Output = Result<(), T::Error>;
356
357    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
358        if let SendFlushInner::Send(ref mut send) = self.0 {
359            match Pin::new(send).poll(cx) {
360                Poll::Ready(Ok(())) => {}
361                p => return p,
362            }
363
364            let mut tmp = SendFlushInner::None;
365            mem::swap(&mut tmp, &mut self.0);
366            let t = match tmp {
367                SendFlushInner::Send(s) => s.t,
368                _ => unreachable!(),
369            };
370            self.0 = SendFlushInner::Flush(Flush(t));
371        }
372
373        match self.0 {
374            SendFlushInner::Flush(ref mut flush) => Pin::new(flush).poll(cx),
375            _ => unreachable!(),
376        }
377    }
378}
379
380pin_project! {
381    #[derive(Debug)]
382    pub struct MapError<T, F> {
383        #[pin]
384        transport: T,
385        map_err: F,
386    }
387}
388
389impl<T, F, E> AsyncTransport for MapError<T, F>
390where
391    T: AsyncTransport,
392    F: FnMut(T::Error) -> E,
393{
394    type Error = E;
395
396    fn receive_poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<Message, Self::Error>> {
397        let this = self.project();
398        this.transport.receive_poll(cx).map_err(this.map_err)
399    }
400
401    fn send_poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
402        let this = self.project();
403        this.transport.send_poll_ready(cx).map_err(this.map_err)
404    }
405
406    fn send_start(self: Pin<&mut Self>, msg: Message) -> Result<(), Self::Error> {
407        let this = self.project();
408        this.transport.send_start(msg).map_err(this.map_err)
409    }
410
411    fn send_poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
412        let this = self.project();
413        this.transport.send_poll_flush(cx).map_err(this.map_err)
414    }
415}