generic_session_types/chan/
mpsc.rs

1use std::future::Future;
2use tokio::sync::mpsc;
3
4use crate::{Chan, Error, HasDual, RawChan};
5
6pub struct Mpsc<T> {
7    sx: mpsc::Sender<T>,
8    rx: mpsc::Receiver<T>,
9}
10
11impl<R: Sync + Send + 'static> RawChan for Mpsc<R> {
12    type R = R;
13    type SendFuture<'a> = impl Future<Output = Result<(), Error>> + 'a
14    where
15        Self: 'a;
16    fn send(&mut self, r: R) -> Self::SendFuture<'_> {
17        async move {
18            self.sx.send(r).await.map_err(|_| Error::SendErr)?;
19            Ok(())
20        }
21    }
22
23    type RecvFuture<'a> = impl Future<Output = Result<R, Error>>  + 'a where Self: 'a;
24    fn recv(&mut self) -> Self::RecvFuture<'_> {
25        async {
26            let data = self.rx.recv().await.ok_or(Error::RecvErr)?;
27            Ok(data)
28        }
29    }
30
31    type CloseFuture = impl Future<Output = Result<(), Error>> + 'static;
32    fn close(self) -> Self::CloseFuture {
33        drop(self);
34        async { Ok(()) }
35    }
36}
37
38pub fn channel<P: HasDual, R: Sync + Send + 'static>(
39    buffer: usize,
40) -> (Chan<P, (), Mpsc<R>>, Chan<P::Dual, (), Mpsc<R>>) {
41    let (sx0, rx0) = mpsc::channel(buffer);
42    let (sx1, rx1) = mpsc::channel(buffer);
43    let c0 = Mpsc::<R> { sx: sx0, rx: rx1 };
44    let c1 = Mpsc::<R> { sx: sx1, rx: rx0 };
45    (Chan::from_raw(c0), Chan::from_raw(c1))
46}