async_flow/tokio/
channel.rs1use super::{Inputs, Outputs};
4use crate::{Connection, PortEvent};
5use alloc::boxed::Box;
6use tokio::sync::mpsc;
7
8pub const UNLIMITED: usize = 0;
9pub const ONESHOT: usize = 1;
10
11#[derive(Debug)]
12pub struct Channel<T, const N: usize = UNLIMITED> {
13 pub tx: Outputs<T, N>,
14 pub rx: Inputs<T, N>,
15}
16
17impl<T> Channel<T> {
18 pub fn oneshot() -> Channel<T, ONESHOT> {
20 Channel::from(mpsc::channel(1))
21 }
22
23 pub fn bounded(buffer: usize) -> Channel<T, UNLIMITED> {
25 Channel::from(mpsc::channel(buffer))
26 }
27
28 #[allow(unused)]
30 pub(crate) fn bounded_boxed(
31 buffer: usize,
32 ) -> (
33 Box<dyn crate::io::OutputPort<T> + Send>,
34 Box<dyn crate::io::InputPort<T> + Send>,
35 )
36 where
37 T: Send + Sync + 'static,
38 {
39 let (outputs, inputs) = Self::bounded(buffer).into_inner();
40 (Box::new(outputs), Box::new(inputs))
41 }
42}
43
44impl<T, const N: usize> Channel<T, N> {
45 pub fn into_inner(self) -> (Outputs<T, N>, Inputs<T, N>) {
46 (self.tx, self.rx)
47 }
48}
49
50impl<T, const N: usize> From<(Outputs<T, N>, Inputs<T, N>)> for Channel<T, N> {
51 fn from((tx, rx): (Outputs<T, N>, Inputs<T, N>)) -> Self {
52 Self { tx, rx }
53 }
54}
55
56impl<T, const N: usize> From<(mpsc::Sender<PortEvent<T>>, mpsc::Receiver<PortEvent<T>>)>
57 for Channel<T, N>
58{
59 fn from((tx, rx): (mpsc::Sender<PortEvent<T>>, mpsc::Receiver<PortEvent<T>>)) -> Self {
60 Self {
61 tx: Outputs::<T, N>::from(tx),
62 rx: Inputs::<T, N>::from(rx),
63 }
64 }
65}
66
67impl<T> Connection<T> for Channel<T> {}