async_flow/tokio/
channel.rs

1// This is free and unencumbered software released into the public domain.
2
3use super::{Inputs, Outputs};
4use crate::{Connection, PortEvent};
5use alloc::boxed::Box;
6use tokio::sync::mpsc;
7
8pub const UNLIMITED: usize = 0;
9pub const ONESHOT: usize = 1;
10
11#[derive(Debug)]
12pub struct Channel<T, const N: usize = UNLIMITED> {
13    pub tx: Outputs<T, N>,
14    pub rx: Inputs<T, N>,
15}
16
17impl<T> Channel<T> {
18    /// Creates a one-shot connection.
19    pub fn oneshot() -> Channel<T, ONESHOT> {
20        Channel::from(mpsc::channel(1))
21    }
22
23    /// Creates a bounded connection.
24    pub fn bounded(buffer: usize) -> Channel<T, UNLIMITED> {
25        Channel::from(mpsc::channel(buffer))
26    }
27
28    /// Creates a bounded, type-erased connection.
29    #[allow(unused)]
30    pub(crate) fn bounded_boxed(
31        buffer: usize,
32    ) -> (
33        Box<dyn crate::io::OutputPort<T> + Send>,
34        Box<dyn crate::io::InputPort<T> + Send>,
35    )
36    where
37        T: Send + Sync + 'static,
38    {
39        let (outputs, inputs) = Self::bounded(buffer).into_inner();
40        (Box::new(outputs), Box::new(inputs))
41    }
42}
43
44impl<T, const N: usize> Channel<T, N> {
45    pub fn into_inner(self) -> (Outputs<T, N>, Inputs<T, N>) {
46        (self.tx, self.rx)
47    }
48}
49
50impl<T, const N: usize> From<(Outputs<T, N>, Inputs<T, N>)> for Channel<T, N> {
51    fn from((tx, rx): (Outputs<T, N>, Inputs<T, N>)) -> Self {
52        Self { tx, rx }
53    }
54}
55
56impl<T, const N: usize> From<(mpsc::Sender<PortEvent<T>>, mpsc::Receiver<PortEvent<T>>)>
57    for Channel<T, N>
58{
59    fn from((tx, rx): (mpsc::Sender<PortEvent<T>>, mpsc::Receiver<PortEvent<T>>)) -> Self {
60        Self {
61            tx: Outputs::<T, N>::from(tx),
62            rx: Inputs::<T, N>::from(rx),
63        }
64    }
65}
66
67impl<T> Connection<T> for Channel<T> {}