async_flow/tokio/
channel.rs1use super::{Inputs, Outputs};
4use crate::{Connection, PortEvent};
5use alloc::boxed::Box;
6use tokio::sync::mpsc;
7
8pub const UNLIMITED: usize = 0;
9pub const ONESHOT: usize = 1;
10
11#[derive(Debug)]
12pub struct Channel<T, const N: usize = UNLIMITED> {
13 pub tx: Outputs<T, N>,
14 pub rx: Inputs<T, N>,
15}
16
17impl<T> Channel<T> {
18 pub fn pair() -> (Channel<T, UNLIMITED>, Channel<T, UNLIMITED>) {
19 (Self::bounded(1), Self::bounded(1))
20 }
21
22 pub fn oneshot() -> Channel<T, ONESHOT> {
24 Channel::from(mpsc::channel(1))
25 }
26
27 pub fn bounded(buffer: usize) -> Channel<T, UNLIMITED> {
29 Channel::from(mpsc::channel(buffer))
30 }
31
32 #[allow(unused)]
34 pub(crate) fn bounded_boxed(
35 buffer: usize,
36 ) -> (
37 Box<dyn crate::io::OutputPort<T> + Send>,
38 Box<dyn crate::io::InputPort<T> + Send>,
39 )
40 where
41 T: Send + Sync + 'static,
42 {
43 let (outputs, inputs) = Self::bounded(buffer).into_inner();
44 (Box::new(outputs), Box::new(inputs))
45 }
46}
47
48impl<T, const N: usize> Channel<T, N> {
49 pub fn into_inner(self) -> (Outputs<T, N>, Inputs<T, N>) {
50 (self.tx, self.rx)
51 }
52}
53
54impl<T, const N: usize> From<(Outputs<T, N>, Inputs<T, N>)> for Channel<T, N> {
55 fn from((tx, rx): (Outputs<T, N>, Inputs<T, N>)) -> Self {
56 Self { tx, rx }
57 }
58}
59
60impl<T, const N: usize> From<(mpsc::Sender<PortEvent<T>>, mpsc::Receiver<PortEvent<T>>)>
61 for Channel<T, N>
62{
63 fn from((tx, rx): (mpsc::Sender<PortEvent<T>>, mpsc::Receiver<PortEvent<T>>)) -> Self {
64 Self {
65 tx: Outputs::<T, N>::from(tx),
66 rx: Inputs::<T, N>::from(rx),
67 }
68 }
69}
70
71impl<T> Connection<T> for Channel<T> {}