async_flow/tokio/
inputs.rs1use crate::io::RecvError;
4use alloc::{borrow::Cow, boxed::Box, vec::Vec};
5use dogma::{MaybeLabeled, MaybeNamed};
6use tokio::sync::mpsc::Receiver;
7
8#[derive(Default)]
9pub struct Inputs<T, const N: usize = 0> {
10 pub(crate) rx: Option<Receiver<T>>,
11}
12
13impl<T> core::fmt::Debug for Inputs<T> {
14 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
15 f.debug_struct("Inputs").field("rx", &self.rx).finish()
16 }
17}
18
19impl<T> Inputs<T> {
20 pub fn is_open(&self) -> bool {
21 !self.is_closed()
22 }
23
24 pub fn is_closed(&self) -> bool {
25 self.rx.as_ref().map(|rx| rx.is_closed()).unwrap_or(true)
26 }
27
28 pub fn is_empty(&self) -> bool {
29 self.rx.as_ref().map(|rx| rx.is_empty()).unwrap_or(true)
30 }
31
32 pub fn capacity(&self) -> Option<usize> {
33 self.rx.as_ref().map(|rx| rx.capacity())
34 }
35
36 pub fn max_capacity(&self) -> Option<usize> {
37 self.rx.as_ref().map(|rx| rx.max_capacity())
38 }
39
40 pub fn close(&mut self) {
41 if let Some(rx) = self.rx.as_mut() {
42 if !rx.is_closed() {
43 rx.close() }
45 }
46 }
47
48 pub async fn recv_all(&mut self) -> Result<Vec<T>, RecvError> {
49 let mut inputs = Vec::new();
50 while let Some(input) = self.recv().await? {
51 inputs.push(input);
52 }
53 Ok(inputs)
54 }
55
56 pub async fn recv(&mut self) -> Result<Option<T>, RecvError> {
57 if let Some(rx) = self.rx.as_mut() {
58 Ok(rx.recv().await)
59 } else {
60 Ok(None)
61 }
62 }
63
64 pub fn recv_blocking(&mut self) -> Result<Option<T>, RecvError> {
65 if let Some(rx) = self.rx.as_mut() {
66 Ok(rx.blocking_recv())
67 } else {
68 Ok(None)
69 }
70 }
71}
72
73impl<T> AsRef<Receiver<T>> for Inputs<T> {
74 fn as_ref(&self) -> &Receiver<T> {
75 self.rx.as_ref().unwrap()
76 }
77}
78
79impl<T> AsMut<Receiver<T>> for Inputs<T> {
80 fn as_mut(&mut self) -> &mut Receiver<T> {
81 self.rx.as_mut().unwrap()
82 }
83}
84
85impl<T> From<Receiver<T>> for Inputs<T> {
86 fn from(input: Receiver<T>) -> Self {
87 Self { rx: Some(input) }
88 }
89}
90
91#[async_trait::async_trait]
92impl<T: Send> crate::io::InputPort<T> for Inputs<T> {
93 fn is_empty(&self) -> bool {
94 self.is_empty()
95 }
96
97 async fn recv(&mut self) -> Result<Option<T>, RecvError> {
98 self.recv().await
99 }
100}
101
102impl<T> crate::io::Port<T> for Inputs<T> {
103 fn is_closed(&self) -> bool {
104 self.is_closed()
105 }
106
107 fn close(&mut self) {
108 self.close()
109 }
110}
111
112impl<T> MaybeLabeled for Inputs<T> {
113 fn label(&self) -> Option<Cow<'_, str>> {
114 None
115 }
116}
117
118impl<T> MaybeNamed for Inputs<T> {
119 fn name(&self) -> Option<Cow<'_, str>> {
120 None
121 }
122}