1#![allow(clippy::type_complexity)]
8#![warn(missing_docs)]
9#![warn(missing_copy_implementations, missing_debug_implementations)]
10#![warn(unused_qualifications, unused_results)]
11#![warn(future_incompatible)]
12#![warn(unused)]
13#![forbid(broken_intra_doc_links)]
15
16use dialectic::backend::{self, By, Choice, Mut, Ref, Val};
17use std::{any::Any, future::Future, pin::Pin};
18use thiserror::Error;
19use tokio::sync::mpsc;
20pub use tokio::sync::mpsc::error::SendError;
21
22pub type Chan<P> = dialectic::Chan<P, Sender, Receiver>;
35
36pub type UnboundedChan<P> = dialectic::Chan<P, UnboundedSender, UnboundedReceiver>;
50
51#[derive(Debug)]
53pub struct Receiver(pub mpsc::Receiver<Box<dyn Any + Send>>);
54
55#[derive(Debug, Clone)]
57pub struct Sender(pub mpsc::Sender<Box<dyn Any + Send>>);
58
59#[derive(Debug)]
62pub struct UnboundedReceiver(pub mpsc::UnboundedReceiver<Box<dyn Any + Send>>);
63
64#[derive(Debug, Clone)]
66pub struct UnboundedSender(pub mpsc::UnboundedSender<Box<dyn Any + Send>>);
67
68pub fn channel(buffer: usize) -> (Sender, Receiver) {
79 let (tx, rx) = mpsc::channel(buffer);
80 (Sender(tx), Receiver(rx))
81}
82
83pub fn unbounded_channel() -> (UnboundedSender, UnboundedReceiver) {
94 let (tx, rx) = mpsc::unbounded_channel();
95 (UnboundedSender(tx), UnboundedReceiver(rx))
96}
97
98#[derive(Debug)]
101pub enum Error {
102 Recv(RecvError),
104 Send(Box<dyn Any + Send>),
106}
107
108impl std::fmt::Display for Error {
109 fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110 match self {
111 Error::Recv(e) => e.fmt(fmt),
112 Error::Send(_) => write!(fmt, "channel closed"),
113 }
114 }
115}
116
117impl std::error::Error for Error {}
118
119impl From<RecvError> for Error {
120 fn from(err: RecvError) -> Self {
121 Error::Recv(err)
122 }
123}
124
125impl<T: Any + Send> From<SendError<T>> for Error {
126 fn from(SendError(err): SendError<T>) -> Self {
127 Error::Send(Box::new(err))
128 }
129}
130
131#[derive(Debug, Error)]
133pub enum RecvError {
134 #[error("channel closed")]
136 Closed,
137 #[error("received value was not of desired type")]
141 DowncastFailed(Box<dyn Any + Send>),
142}
143
144impl backend::Transmitter for Sender {
145 type Error = SendError<Box<dyn Any + Send>>;
146
147 fn send_choice<'async_lifetime, const LENGTH: usize>(
148 &'async_lifetime mut self,
149 choice: Choice<LENGTH>,
150 ) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send + 'async_lifetime>> {
151 <Self as backend::Transmit<Choice<LENGTH>>>::send(self, choice)
152 }
153}
154
155impl<T: Send + Any> backend::Transmit<T> for Sender {
156 fn send<'a, 'async_lifetime>(
157 &'async_lifetime mut self,
158 message: <T as By<'a, Val>>::Type,
159 ) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send + 'async_lifetime>>
160 where
161 'a: 'async_lifetime,
162 {
163 Box::pin(mpsc::Sender::send(&self.0, Box::new(message)))
164 }
165}
166
167impl<T: Clone + Send + Any> backend::Transmit<T, Ref> for Sender {
168 fn send<'a, 'async_lifetime>(
169 &'async_lifetime mut self,
170 message: <T as By<'a, Ref>>::Type,
171 ) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send + 'async_lifetime>>
172 where
173 'a: 'async_lifetime,
174 {
175 Box::pin(mpsc::Sender::send(&self.0, Box::new(message.clone())))
176 }
177}
178
179impl<T: Clone + Send + Any> backend::Transmit<T, Mut> for Sender {
180 fn send<'a, 'async_lifetime>(
181 &'async_lifetime mut self,
182 message: <T as By<'a, Mut>>::Type,
183 ) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send + 'async_lifetime>>
184 where
185 'a: 'async_lifetime,
186 {
187 <Self as backend::Transmit<T, Ref>>::send(self, &*message)
188 }
189}
190
191impl backend::Receiver for Receiver {
192 type Error = RecvError;
193
194 fn recv_choice<'async_lifetime, const LENGTH: usize>(
195 &'async_lifetime mut self,
196 ) -> Pin<Box<dyn Future<Output = Result<Choice<LENGTH>, Self::Error>> + Send + 'async_lifetime>>
197 {
198 <Self as backend::Receive<Choice<LENGTH>>>::recv(self)
199 }
200}
201
202impl<T: Send + Any> backend::Receive<T> for Receiver {
203 fn recv<'async_lifetime>(
204 &'async_lifetime mut self,
205 ) -> Pin<Box<dyn Future<Output = Result<T, Self::Error>> + Send + 'async_lifetime>> {
206 Box::pin(async move {
207 match mpsc::Receiver::recv(&mut self.0).await {
208 None => Err(RecvError::Closed),
209 Some(b) => match b.downcast() {
210 Err(b) => Err(RecvError::DowncastFailed(b)),
211 Ok(t) => Ok(*t),
212 },
213 }
214 })
215 }
216}
217
218impl backend::Transmitter for UnboundedSender {
219 type Error = SendError<Box<dyn Any + Send>>;
220
221 fn send_choice<'async_lifetime, const LENGTH: usize>(
222 &'async_lifetime mut self,
223 choice: Choice<LENGTH>,
224 ) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send + 'async_lifetime>> {
225 <Self as backend::Transmit<Choice<LENGTH>>>::send(self, choice)
226 }
227}
228
229impl<T: Send + Any> backend::Transmit<T> for UnboundedSender {
230 fn send<'a, 'async_lifetime>(
231 &'async_lifetime mut self,
232 message: <T as By<'a, Val>>::Type,
233 ) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send + 'async_lifetime>>
234 where
235 'a: 'async_lifetime,
236 {
237 Box::pin(async move { mpsc::UnboundedSender::send(&self.0, Box::new(message)) })
238 }
239}
240
241impl backend::Receiver for UnboundedReceiver {
242 type Error = RecvError;
243
244 fn recv_choice<'async_lifetime, const LENGTH: usize>(
245 &'async_lifetime mut self,
246 ) -> Pin<Box<dyn Future<Output = Result<Choice<LENGTH>, Self::Error>> + Send + 'async_lifetime>>
247 {
248 <Self as backend::Receive<Choice<LENGTH>>>::recv(self)
249 }
250}
251
252impl<T: Send + Any> backend::Receive<T> for UnboundedReceiver {
253 fn recv<'async_lifetime>(
254 &'async_lifetime mut self,
255 ) -> Pin<Box<dyn Future<Output = Result<T, Self::Error>> + Send + 'async_lifetime>> {
256 Box::pin(async move {
257 match mpsc::UnboundedReceiver::recv(&mut self.0).await {
258 None => Err(RecvError::Closed),
259 Some(b) => match b.downcast() {
260 Err(b) => Err(RecvError::DowncastFailed(b)),
261 Ok(t) => Ok(*t),
262 },
263 }
264 })
265 }
266}