aldrin_core/
channel.rs

1//! Channels-based transports.
2//!
3//! Channels-based transports can be used to efficiently connect client and broker within the same
4//! process.
5//!
6//! The transports come in two flavors, [`Bounded`] and [`Unbounded`]. [`Bounded`] will cause
7//! back-pressure to the sender when an internal fifo runs full, whereas [`Unbounded`] never blocks
8//! (asynchronously).
9//!
10//! # Examples
11//!
12//! ```
13//! use aldrin_broker::Broker;
14//! use aldrin::Client;
15//! use aldrin_core::channel;
16//!
17//! #[tokio::main]
18//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
19//!     // Create a broker:
20//!     let broker = Broker::new();
21//!     let mut broker_handle = broker.handle().clone();
22//!     let broker_join = tokio::spawn(broker.run());
23//!
24//!     // Connect a client with the Bounded transport:
25//!     let (t1, t2) = channel::bounded(16);
26//!     let (connection1, client1) = tokio::join!(broker_handle.connect(t1), Client::connect(t2));
27//!     let connection1 = connection1?;
28//!     let client1 = client1?;
29//!     tokio::spawn(connection1.run());
30//!     let client1_handle = client1.handle().clone();
31//!     let client1_join = tokio::spawn(client1.run());
32//!
33//!     // Connect a client with the Unbounded transport:
34//!     let (t1, t2) = channel::unbounded();
35//!     let (connection2, client2) = tokio::join!(broker_handle.connect(t1), Client::connect(t2));
36//!     let connection2 = connection2?;
37//!     let client2 = client2?;
38//!     tokio::spawn(connection2.run());
39//!     let client2_handle = client2.handle().clone();
40//!     let client2_join = tokio::spawn(client2.run());
41//!
42//!     // Shut everything down again:
43//!     broker_handle.shutdown_idle().await;
44//!     client1_handle.shutdown();
45//!     client1_join.await??;
46//!     client2_handle.shutdown();
47//!     client2_join.await??;
48//!     broker_join.await?;
49//!
50//!     Ok(())
51//! }
52//! ```
53
54use crate::message::Message;
55use crate::transport::AsyncTransport;
56use futures_channel::mpsc;
57use futures_core::stream::Stream;
58use std::pin::Pin;
59use std::task::{Context, Poll};
60use thiserror::Error;
61
62/// Creates a pair of bounded channel transports.
63///
64/// Both transports have a separate fifo for receiving [`Message`s](Message). If either fifo is
65/// full, this will cause backpressure to the sender.
66pub fn bounded(fifo_size: usize) -> (Bounded, Bounded) {
67    let (sender1, receiver1) = mpsc::channel(fifo_size);
68    let (sender2, receiver2) = mpsc::channel(fifo_size);
69
70    (
71        Bounded::new(receiver1, sender2),
72        Bounded::new(receiver2, sender1),
73    )
74}
75
76/// A bounded channels-based transport for connecting a broker and a client in the same process.
77///
78/// Bounded transports have an internal fifo for receiving [`Message`s](Message). If this runs full,
79/// backpressure will be applied to the sender.
80#[derive(Debug)]
81pub struct Bounded {
82    receiver: mpsc::Receiver<Message>,
83    sender: mpsc::Sender<Message>,
84}
85
86impl Bounded {
87    fn new(receiver: mpsc::Receiver<Message>, sender: mpsc::Sender<Message>) -> Self {
88        Self { receiver, sender }
89    }
90}
91
92/// Error type when using channels as a transport.
93#[derive(Error, Debug, Copy, Clone, PartialEq, Eq)]
94#[error("disconnected")]
95pub struct Disconnected;
96
97impl AsyncTransport for Bounded {
98    type Error = Disconnected;
99
100    fn receive_poll(
101        mut self: Pin<&mut Self>,
102        cx: &mut Context,
103    ) -> Poll<Result<Message, Disconnected>> {
104        match Pin::new(&mut self.receiver).poll_next(cx) {
105            Poll::Ready(Some(msg)) => Poll::Ready(Ok(msg)),
106            Poll::Ready(None) => Poll::Ready(Err(Disconnected)),
107            Poll::Pending => Poll::Pending,
108        }
109    }
110
111    fn send_poll_ready(
112        mut self: Pin<&mut Self>,
113        cx: &mut Context,
114    ) -> Poll<Result<(), Disconnected>> {
115        match self.sender.poll_ready(cx) {
116            Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
117            Poll::Ready(Err(_)) => Poll::Ready(Err(Disconnected)),
118            Poll::Pending => Poll::Pending,
119        }
120    }
121
122    fn send_start(mut self: Pin<&mut Self>, msg: Message) -> Result<(), Disconnected> {
123        self.sender.start_send(msg).map_err(|_| Disconnected)
124    }
125
126    fn send_poll_flush(
127        mut self: Pin<&mut Self>,
128        cx: &mut Context,
129    ) -> Poll<Result<(), Disconnected>> {
130        match self.sender.poll_ready(cx) {
131            Poll::Ready(_) => Poll::Ready(Ok(())),
132            Poll::Pending => Poll::Pending,
133        }
134    }
135}
136
137/// Creates a pair of unbounded channel transports.
138pub fn unbounded() -> (Unbounded, Unbounded) {
139    let (sender1, receiver1) = mpsc::unbounded();
140    let (sender2, receiver2) = mpsc::unbounded();
141
142    (
143        Unbounded::new(receiver1, sender2),
144        Unbounded::new(receiver2, sender1),
145    )
146}
147
148/// An unbounded channels-based transport for connecting a broker and a client in the same process.
149#[derive(Debug)]
150pub struct Unbounded {
151    receiver: mpsc::UnboundedReceiver<Message>,
152    sender: mpsc::UnboundedSender<Message>,
153}
154
155impl Unbounded {
156    fn new(
157        receiver: mpsc::UnboundedReceiver<Message>,
158        sender: mpsc::UnboundedSender<Message>,
159    ) -> Self {
160        Self { receiver, sender }
161    }
162}
163
164impl AsyncTransport for Unbounded {
165    type Error = Disconnected;
166
167    fn receive_poll(
168        mut self: Pin<&mut Self>,
169        cx: &mut Context,
170    ) -> Poll<Result<Message, Disconnected>> {
171        match Pin::new(&mut self.receiver).poll_next(cx) {
172            Poll::Ready(Some(msg)) => Poll::Ready(Ok(msg)),
173            Poll::Ready(None) => Poll::Ready(Err(Disconnected)),
174            Poll::Pending => Poll::Pending,
175        }
176    }
177
178    fn send_poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Disconnected>> {
179        match self.sender.poll_ready(cx) {
180            Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
181            Poll::Ready(Err(_)) => Poll::Ready(Err(Disconnected)),
182            Poll::Pending => Poll::Pending,
183        }
184    }
185
186    fn send_start(mut self: Pin<&mut Self>, msg: Message) -> Result<(), Disconnected> {
187        self.sender.start_send(msg).map_err(|_| Disconnected)
188    }
189
190    fn send_poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Disconnected>> {
191        match self.sender.poll_ready(cx) {
192            Poll::Ready(_) => Poll::Ready(Ok(())),
193            Poll::Pending => Poll::Pending,
194        }
195    }
196}