async_flow/tokio/
inputs.rs

1// This is free and unencumbered software released into the public domain.
2
3use crate::{PortDirection, PortState, io::RecvError};
4use alloc::{borrow::Cow, boxed::Box, vec::Vec};
5use dogma::{MaybeLabeled, MaybeNamed};
6use tokio::sync::mpsc::Receiver;
7
8#[derive(Default)]
9pub struct Inputs<T, const N: usize = 0> {
10    pub(crate) rx: Option<Receiver<T>>,
11}
12
13impl<T, const N: usize> core::fmt::Debug for Inputs<T, N> {
14    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
15        f.debug_struct("Inputs").field("rx", &self.rx).finish()
16    }
17}
18
19impl<T, const N: usize> Inputs<T, N> {
20    pub fn close(&mut self) {
21        if let Some(rx) = self.rx.as_mut() {
22            if !rx.is_closed() {
23                rx.close() // idempotent
24            }
25        }
26    }
27
28    pub fn direction(&self) -> PortDirection {
29        PortDirection::Input
30    }
31
32    pub fn state(&self) -> PortState {
33        if self.rx.as_ref().map(|rx| rx.is_closed()).unwrap_or(true) {
34            PortState::Closed
35        } else {
36            PortState::Open
37        }
38    }
39
40    /// Checks whether this port is currently closed.
41    pub fn is_closed(&self) -> bool {
42        self.state().is_closed()
43    }
44
45    /// Checks whether this port is currently open.
46    pub fn is_open(&self) -> bool {
47        self.state().is_open()
48    }
49
50    /// Checks whether this port is currently connected.
51    pub fn is_connected(&self) -> bool {
52        self.state().is_connected()
53    }
54
55    pub fn is_empty(&self) -> bool {
56        self.rx.as_ref().map(|rx| rx.is_empty()).unwrap_or(true)
57    }
58
59    pub fn capacity(&self) -> Option<usize> {
60        self.rx.as_ref().map(|rx| rx.capacity())
61    }
62
63    pub fn max_capacity(&self) -> Option<usize> {
64        self.rx.as_ref().map(|rx| rx.max_capacity())
65    }
66
67    pub async fn recv_all(&mut self) -> Result<Vec<T>, RecvError> {
68        let mut inputs = Vec::new();
69        while let Some(input) = self.recv().await? {
70            inputs.push(input);
71        }
72        Ok(inputs)
73    }
74
75    pub async fn recv(&mut self) -> Result<Option<T>, RecvError> {
76        if let Some(rx) = self.rx.as_mut() {
77            Ok(rx.recv().await)
78        } else {
79            Ok(None)
80        }
81    }
82
83    pub fn recv_blocking(&mut self) -> Result<Option<T>, RecvError> {
84        if let Some(rx) = self.rx.as_mut() {
85            Ok(rx.blocking_recv())
86        } else {
87            Ok(None)
88        }
89    }
90}
91
92impl<T, const N: usize> AsRef<Receiver<T>> for Inputs<T, N> {
93    fn as_ref(&self) -> &Receiver<T> {
94        self.rx.as_ref().unwrap()
95    }
96}
97
98impl<T, const N: usize> AsMut<Receiver<T>> for Inputs<T, N> {
99    fn as_mut(&mut self) -> &mut Receiver<T> {
100        self.rx.as_mut().unwrap()
101    }
102}
103
104impl<T, const N: usize> From<Receiver<T>> for Inputs<T, N> {
105    fn from(input: Receiver<T>) -> Self {
106        Self { rx: Some(input) }
107    }
108}
109
110#[async_trait::async_trait]
111impl<T: Send + 'static, const N: usize> crate::io::InputPort<T> for Inputs<T, N> {
112    fn is_empty(&self) -> bool {
113        self.is_empty()
114    }
115
116    async fn recv(&mut self) -> Result<Option<T>, RecvError> {
117        self.recv().await
118    }
119}
120
121impl<T, const N: usize> crate::io::Port<T> for Inputs<T, N> {
122    fn close(&mut self) {
123        self.close()
124    }
125
126    fn direction(&self) -> PortDirection {
127        self.direction()
128    }
129
130    fn state(&self) -> PortState {
131        self.state()
132    }
133}
134
135impl<T, const N: usize> MaybeLabeled for Inputs<T, N> {
136    fn label(&self) -> Option<Cow<'_, str>> {
137        None
138    }
139}
140
141impl<T, const N: usize> MaybeNamed for Inputs<T, N> {
142    fn name(&self) -> Option<Cow<'_, str>> {
143        None
144    }
145}