generic_session_types/chan/
mpsc.rs1use std::future::Future;
2use tokio::sync::mpsc;
3
4use crate::{Chan, Error, HasDual, RawChan};
5
6pub struct Mpsc<T> {
7 sx: mpsc::Sender<T>,
8 rx: mpsc::Receiver<T>,
9}
10
11impl<R: Sync + Send + 'static> RawChan for Mpsc<R> {
12 type R = R;
13 type SendFuture<'a> = impl Future<Output = Result<(), Error>> + 'a
14 where
15 Self: 'a;
16 fn send(&mut self, r: R) -> Self::SendFuture<'_> {
17 async move {
18 self.sx.send(r).await.map_err(|_| Error::SendErr)?;
19 Ok(())
20 }
21 }
22
23 type RecvFuture<'a> = impl Future<Output = Result<R, Error>> + 'a where Self: 'a;
24 fn recv(&mut self) -> Self::RecvFuture<'_> {
25 async {
26 let data = self.rx.recv().await.ok_or(Error::RecvErr)?;
27 Ok(data)
28 }
29 }
30
31 type CloseFuture = impl Future<Output = Result<(), Error>> + 'static;
32 fn close(self) -> Self::CloseFuture {
33 drop(self);
34 async { Ok(()) }
35 }
36}
37
38pub fn channel<P: HasDual, R: Sync + Send + 'static>(
39 buffer: usize,
40) -> (Chan<P, (), Mpsc<R>>, Chan<P::Dual, (), Mpsc<R>>) {
41 let (sx0, rx0) = mpsc::channel(buffer);
42 let (sx1, rx1) = mpsc::channel(buffer);
43 let c0 = Mpsc::<R> { sx: sx0, rx: rx1 };
44 let c1 = Mpsc::<R> { sx: sx1, rx: rx0 };
45 (Chan::from_raw(c0), Chan::from_raw(c1))
46}