async_flow/tokio/
inputs.rs1use crate::{PortDirection, PortState, io::RecvError};
4use alloc::{borrow::Cow, boxed::Box, vec::Vec};
5use dogma::{MaybeLabeled, MaybeNamed};
6use tokio::sync::mpsc::Receiver;
7
8#[derive(Default)]
9pub struct Inputs<T, const N: usize = 0> {
10 pub(crate) rx: Option<Receiver<T>>,
11}
12
13impl<T, const N: usize> core::fmt::Debug for Inputs<T, N> {
14 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
15 f.debug_struct("Inputs").field("rx", &self.rx).finish()
16 }
17}
18
19impl<T, const N: usize> Inputs<T, N> {
20 pub fn close(&mut self) {
21 if let Some(rx) = self.rx.as_mut() {
22 if !rx.is_closed() {
23 rx.close() }
25 }
26 }
27
28 pub fn direction(&self) -> PortDirection {
29 PortDirection::Input
30 }
31
32 pub fn state(&self) -> PortState {
33 if self.rx.as_ref().map(|rx| rx.is_closed()).unwrap_or(true) {
34 PortState::Closed
35 } else {
36 PortState::Open
37 }
38 }
39
40 pub fn is_closed(&self) -> bool {
42 self.state().is_closed()
43 }
44
45 pub fn is_open(&self) -> bool {
47 self.state().is_open()
48 }
49
50 pub fn is_connected(&self) -> bool {
52 self.state().is_connected()
53 }
54
55 pub fn is_empty(&self) -> bool {
56 self.rx.as_ref().map(|rx| rx.is_empty()).unwrap_or(true)
57 }
58
59 pub fn capacity(&self) -> Option<usize> {
60 self.rx.as_ref().map(|rx| rx.capacity())
61 }
62
63 pub fn max_capacity(&self) -> Option<usize> {
64 self.rx.as_ref().map(|rx| rx.max_capacity())
65 }
66
67 pub async fn recv_all(&mut self) -> Result<Vec<T>, RecvError> {
68 let mut inputs = Vec::new();
69 while let Some(input) = self.recv().await? {
70 inputs.push(input);
71 }
72 Ok(inputs)
73 }
74
75 pub async fn recv(&mut self) -> Result<Option<T>, RecvError> {
76 if let Some(rx) = self.rx.as_mut() {
77 Ok(rx.recv().await)
78 } else {
79 Ok(None)
80 }
81 }
82
83 pub fn recv_blocking(&mut self) -> Result<Option<T>, RecvError> {
84 if let Some(rx) = self.rx.as_mut() {
85 Ok(rx.blocking_recv())
86 } else {
87 Ok(None)
88 }
89 }
90}
91
92impl<T, const N: usize> AsRef<Receiver<T>> for Inputs<T, N> {
93 fn as_ref(&self) -> &Receiver<T> {
94 self.rx.as_ref().unwrap()
95 }
96}
97
98impl<T, const N: usize> AsMut<Receiver<T>> for Inputs<T, N> {
99 fn as_mut(&mut self) -> &mut Receiver<T> {
100 self.rx.as_mut().unwrap()
101 }
102}
103
104impl<T, const N: usize> From<Receiver<T>> for Inputs<T, N> {
105 fn from(input: Receiver<T>) -> Self {
106 Self { rx: Some(input) }
107 }
108}
109
110#[async_trait::async_trait]
111impl<T: Send + 'static, const N: usize> crate::io::InputPort<T> for Inputs<T, N> {
112 fn is_empty(&self) -> bool {
113 self.is_empty()
114 }
115
116 async fn recv(&mut self) -> Result<Option<T>, RecvError> {
117 self.recv().await
118 }
119}
120
121impl<T, const N: usize> crate::io::Port<T> for Inputs<T, N> {
122 fn close(&mut self) {
123 self.close()
124 }
125
126 fn direction(&self) -> PortDirection {
127 self.direction()
128 }
129
130 fn state(&self) -> PortState {
131 self.state()
132 }
133}
134
135impl<T, const N: usize> MaybeLabeled for Inputs<T, N> {
136 fn label(&self) -> Option<Cow<'_, str>> {
137 None
138 }
139}
140
141impl<T, const N: usize> MaybeNamed for Inputs<T, N> {
142 fn name(&self) -> Option<Cow<'_, str>> {
143 None
144 }
145}