Skip to main content

async_flow/tokio/
channel.rs

1// This is free and unencumbered software released into the public domain.
2
3use super::{Inputs, Outputs};
4use crate::{Connection, PortEvent};
5use alloc::boxed::Box;
6use tokio::sync::mpsc;
7
8pub const UNLIMITED: usize = 0;
9pub const ONESHOT: usize = 1;
10
11#[derive(Debug)]
12pub struct Channel<T, const N: usize = UNLIMITED> {
13    pub tx: Outputs<T, N>,
14    pub rx: Inputs<T, N>,
15}
16
17impl<T> Channel<T> {
18    pub fn pair() -> (Channel<T, UNLIMITED>, Channel<T, UNLIMITED>) {
19        (Self::bounded(1), Self::bounded(1))
20    }
21
22    /// Creates a one-shot connection.
23    pub fn oneshot() -> Channel<T, ONESHOT> {
24        Channel::from(mpsc::channel(1))
25    }
26
27    /// Creates a bounded connection.
28    pub fn bounded(buffer: usize) -> Channel<T, UNLIMITED> {
29        Channel::from(mpsc::channel(buffer))
30    }
31
32    /// Creates a bounded, type-erased connection.
33    #[allow(unused)]
34    pub(crate) fn bounded_boxed(
35        buffer: usize,
36    ) -> (
37        Box<dyn crate::io::OutputPort<T> + Send>,
38        Box<dyn crate::io::InputPort<T> + Send>,
39    )
40    where
41        T: Send + Sync + 'static,
42    {
43        let (outputs, inputs) = Self::bounded(buffer).into_inner();
44        (Box::new(outputs), Box::new(inputs))
45    }
46}
47
48impl<T, const N: usize> Channel<T, N> {
49    pub fn into_inner(self) -> (Outputs<T, N>, Inputs<T, N>) {
50        (self.tx, self.rx)
51    }
52}
53
54impl<T, const N: usize> From<(Outputs<T, N>, Inputs<T, N>)> for Channel<T, N> {
55    fn from((tx, rx): (Outputs<T, N>, Inputs<T, N>)) -> Self {
56        Self { tx, rx }
57    }
58}
59
60impl<T, const N: usize> From<(mpsc::Sender<PortEvent<T>>, mpsc::Receiver<PortEvent<T>>)>
61    for Channel<T, N>
62{
63    fn from((tx, rx): (mpsc::Sender<PortEvent<T>>, mpsc::Receiver<PortEvent<T>>)) -> Self {
64        Self {
65            tx: Outputs::<T, N>::from(tx),
66            rx: Inputs::<T, N>::from(rx),
67        }
68    }
69}
70
71impl<T> Connection<T> for Channel<T> {}