dialectic_tokio_mpsc/
lib.rs

1//! This crate provides a backend implementation for the [`dialectic`] crate using
2//! [`tokio::sync::mpsc`] channels carrying boxed values `Box<dyn Any
3//! + Send>`, which are downcast to their true type (inferred from the session type) on the other
4//!   end of the channel. Select this backend if you're using the Tokio runtime for asynchrony and
5//!   you only need to communicate between tasks in the same process.
6
7#![allow(clippy::type_complexity)]
8#![warn(missing_docs)]
9#![warn(missing_copy_implementations, missing_debug_implementations)]
10#![warn(unused_qualifications, unused_results)]
11#![warn(future_incompatible)]
12#![warn(unused)]
13// Documentation configuration
14#![forbid(broken_intra_doc_links)]
15
16use dialectic::backend::{self, By, Choice, Mut, Ref, Val};
17use std::{any::Any, future::Future, pin::Pin};
18use thiserror::Error;
19use tokio::sync::mpsc;
20pub use tokio::sync::mpsc::error::SendError;
21
22/// Shorthand for a [`Chan`](dialectic::Chan) using a bounded [`Sender`] and [`Receiver`].
23///
24/// # Examples
25///
26/// ```
27/// use dialectic::prelude::*;
28/// use dialectic::types::Done;
29/// use dialectic_tokio_mpsc as mpsc;
30///
31/// let _: (mpsc::Chan<Done>, mpsc::Chan<Done>) =
32///     Done::channel(|| mpsc::channel(1));
33/// ```
34pub type Chan<P> = dialectic::Chan<P, Sender, Receiver>;
35
36/// Shorthand for a [`Chan`](dialectic::Chan) using an unbounded [`UnboundedSender`] and
37/// [`UnboundedReceiver`].
38///
39/// # Examples
40///
41/// ```
42/// use dialectic::prelude::*;
43/// use dialectic::types::Done;
44/// use dialectic_tokio_mpsc as mpsc;
45///
46/// let _: (mpsc::UnboundedChan<Done>, mpsc::UnboundedChan<Done>) =
47///     Done::channel(mpsc::unbounded_channel);
48/// ```
49pub type UnboundedChan<P> = dialectic::Chan<P, UnboundedSender, UnboundedReceiver>;
50
51/// A bounded receiver for dynamically typed values. See [`tokio::sync::mpsc::Receiver`].
52#[derive(Debug)]
53pub struct Receiver(pub mpsc::Receiver<Box<dyn Any + Send>>);
54
55/// A bounded sender for dynamically typed values. See [`tokio::sync::mpsc::Sender`].
56#[derive(Debug, Clone)]
57pub struct Sender(pub mpsc::Sender<Box<dyn Any + Send>>);
58
59/// An unbounded receiver for dynamically typed values. See
60/// [`tokio::sync::mpsc::UnboundedReceiver`].
61#[derive(Debug)]
62pub struct UnboundedReceiver(pub mpsc::UnboundedReceiver<Box<dyn Any + Send>>);
63
64/// An unbounded sender for dynamically typed values. See [`tokio::sync::mpsc::UnboundedSender`].
65#[derive(Debug, Clone)]
66pub struct UnboundedSender(pub mpsc::UnboundedSender<Box<dyn Any + Send>>);
67
68/// Create a bounded mpsc channel for transporting dynamically typed values.
69///
70/// This is a wrapper around `tokio::sync::mpsc::channel::<Box<dyn Any + Send>>`. See
71/// [`tokio::sync::mpsc::channel`].
72///
73/// # Examples
74///
75/// ```
76/// let (tx, rx) = dialectic_tokio_mpsc::channel(1);
77/// ```
78pub fn channel(buffer: usize) -> (Sender, Receiver) {
79    let (tx, rx) = mpsc::channel(buffer);
80    (Sender(tx), Receiver(rx))
81}
82
83/// Create an unbounded mpsc channel for transporting dynamically typed values.
84///
85/// This is a wrapper around `tokio::sync::mpsc::channel::<Box<dyn Any + Send>>`. See
86/// [`tokio::sync::mpsc::unbounded_channel`].
87///
88/// # Examples
89///
90/// ```
91/// let (tx, rx) = dialectic_tokio_mpsc::unbounded_channel();
92/// ```
93pub fn unbounded_channel() -> (UnboundedSender, UnboundedReceiver) {
94    let (tx, rx) = mpsc::unbounded_channel();
95    (UnboundedSender(tx), UnboundedReceiver(rx))
96}
97
98/// An error thrown while receiving from or sending to a dynamically typed [`tokio::sync::mpsc`]
99/// channel.
100#[derive(Debug)]
101pub enum Error {
102    /// Error during receive.
103    Recv(RecvError),
104    /// Error during send.
105    Send(Box<dyn Any + Send>),
106}
107
108impl std::fmt::Display for Error {
109    fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110        match self {
111            Error::Recv(e) => e.fmt(fmt),
112            Error::Send(_) => write!(fmt, "channel closed"),
113        }
114    }
115}
116
117impl std::error::Error for Error {}
118
119impl From<RecvError> for Error {
120    fn from(err: RecvError) -> Self {
121        Error::Recv(err)
122    }
123}
124
125impl<T: Any + Send> From<SendError<T>> for Error {
126    fn from(SendError(err): SendError<T>) -> Self {
127        Error::Send(Box::new(err))
128    }
129}
130
131/// An error thrown while receiving from a dynamically typed [`tokio::sync::mpsc`] channel.
132#[derive(Debug, Error)]
133pub enum RecvError {
134    /// The channel was explicitly closed, or all senders were dropped, implicitly closing it.
135    #[error("channel closed")]
136    Closed,
137    /// A value received from the channel could not be cast into the correct expected type. This is
138    /// always resultant from the other end of a channel failing to follow the session type of the
139    /// channel.
140    #[error("received value was not of desired type")]
141    DowncastFailed(Box<dyn Any + Send>),
142}
143
144impl backend::Transmitter for Sender {
145    type Error = SendError<Box<dyn Any + Send>>;
146
147    fn send_choice<'async_lifetime, const LENGTH: usize>(
148        &'async_lifetime mut self,
149        choice: Choice<LENGTH>,
150    ) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send + 'async_lifetime>> {
151        <Self as backend::Transmit<Choice<LENGTH>>>::send(self, choice)
152    }
153}
154
155impl<T: Send + Any> backend::Transmit<T> for Sender {
156    fn send<'a, 'async_lifetime>(
157        &'async_lifetime mut self,
158        message: <T as By<'a, Val>>::Type,
159    ) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send + 'async_lifetime>>
160    where
161        'a: 'async_lifetime,
162    {
163        Box::pin(mpsc::Sender::send(&self.0, Box::new(message)))
164    }
165}
166
167impl<T: Clone + Send + Any> backend::Transmit<T, Ref> for Sender {
168    fn send<'a, 'async_lifetime>(
169        &'async_lifetime mut self,
170        message: <T as By<'a, Ref>>::Type,
171    ) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send + 'async_lifetime>>
172    where
173        'a: 'async_lifetime,
174    {
175        Box::pin(mpsc::Sender::send(&self.0, Box::new(message.clone())))
176    }
177}
178
179impl<T: Clone + Send + Any> backend::Transmit<T, Mut> for Sender {
180    fn send<'a, 'async_lifetime>(
181        &'async_lifetime mut self,
182        message: <T as By<'a, Mut>>::Type,
183    ) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send + 'async_lifetime>>
184    where
185        'a: 'async_lifetime,
186    {
187        <Self as backend::Transmit<T, Ref>>::send(self, &*message)
188    }
189}
190
191impl backend::Receiver for Receiver {
192    type Error = RecvError;
193
194    fn recv_choice<'async_lifetime, const LENGTH: usize>(
195        &'async_lifetime mut self,
196    ) -> Pin<Box<dyn Future<Output = Result<Choice<LENGTH>, Self::Error>> + Send + 'async_lifetime>>
197    {
198        <Self as backend::Receive<Choice<LENGTH>>>::recv(self)
199    }
200}
201
202impl<T: Send + Any> backend::Receive<T> for Receiver {
203    fn recv<'async_lifetime>(
204        &'async_lifetime mut self,
205    ) -> Pin<Box<dyn Future<Output = Result<T, Self::Error>> + Send + 'async_lifetime>> {
206        Box::pin(async move {
207            match mpsc::Receiver::recv(&mut self.0).await {
208                None => Err(RecvError::Closed),
209                Some(b) => match b.downcast() {
210                    Err(b) => Err(RecvError::DowncastFailed(b)),
211                    Ok(t) => Ok(*t),
212                },
213            }
214        })
215    }
216}
217
218impl backend::Transmitter for UnboundedSender {
219    type Error = SendError<Box<dyn Any + Send>>;
220
221    fn send_choice<'async_lifetime, const LENGTH: usize>(
222        &'async_lifetime mut self,
223        choice: Choice<LENGTH>,
224    ) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send + 'async_lifetime>> {
225        <Self as backend::Transmit<Choice<LENGTH>>>::send(self, choice)
226    }
227}
228
229impl<T: Send + Any> backend::Transmit<T> for UnboundedSender {
230    fn send<'a, 'async_lifetime>(
231        &'async_lifetime mut self,
232        message: <T as By<'a, Val>>::Type,
233    ) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send + 'async_lifetime>>
234    where
235        'a: 'async_lifetime,
236    {
237        Box::pin(async move { mpsc::UnboundedSender::send(&self.0, Box::new(message)) })
238    }
239}
240
241impl backend::Receiver for UnboundedReceiver {
242    type Error = RecvError;
243
244    fn recv_choice<'async_lifetime, const LENGTH: usize>(
245        &'async_lifetime mut self,
246    ) -> Pin<Box<dyn Future<Output = Result<Choice<LENGTH>, Self::Error>> + Send + 'async_lifetime>>
247    {
248        <Self as backend::Receive<Choice<LENGTH>>>::recv(self)
249    }
250}
251
252impl<T: Send + Any> backend::Receive<T> for UnboundedReceiver {
253    fn recv<'async_lifetime>(
254        &'async_lifetime mut self,
255    ) -> Pin<Box<dyn Future<Output = Result<T, Self::Error>> + Send + 'async_lifetime>> {
256        Box::pin(async move {
257            match mpsc::UnboundedReceiver::recv(&mut self.0).await {
258                None => Err(RecvError::Closed),
259                Some(b) => match b.downcast() {
260                    Err(b) => Err(RecvError::DowncastFailed(b)),
261                    Ok(t) => Ok(*t),
262                },
263            }
264        })
265    }
266}