async_flow/tokio/
inputs.rs1use crate::{PortDirection, PortEvent, PortState, error::RecvError};
4use alloc::{borrow::Cow, boxed::Box};
5use dogma::{MaybeLabeled, MaybeNamed};
6use tokio::sync::mpsc::Receiver;
7
8#[derive(Debug, Default)]
9pub enum InputPortState<T> {
10 #[default]
11 Unconnected,
12 Connected(Receiver<PortEvent<T>>),
13 Disconnected(Receiver<PortEvent<T>>),
14 Closed,
15}
16
17impl<T> Into<RecvError> for &InputPortState<T> {
18 fn into(self) -> RecvError {
19 Into::<PortState>::into(self).into()
20 }
21}
22
23impl<T> Into<PortState> for &InputPortState<T> {
24 fn into(self) -> PortState {
25 use InputPortState::*;
26 match self {
27 Unconnected => PortState::Unconnected,
28 Connected(rx) => {
29 if rx.is_closed() {
30 PortState::Disconnected
31 } else {
32 PortState::Connected
33 }
34 }
35 Disconnected(_) => PortState::Disconnected,
36 Closed => PortState::Closed,
37 }
38 }
39}
40
41#[derive(Default)]
42pub struct Inputs<T, const N: usize = 0> {
43 pub(crate) state: InputPortState<T>,
44}
45
46impl<T, const N: usize> core::fmt::Debug for Inputs<T, N> {
47 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
48 f.debug_struct("Inputs")
49 .finish()
51 }
52}
53
54impl<T, const N: usize> Inputs<T, N> {
55 pub fn close(&mut self) {
56 use InputPortState::*;
57 match self.state {
58 Unconnected => self.state = Closed,
59 Connected(ref mut rx) => {
60 if !rx.is_closed() {
61 rx.close()
62 }
63 self.state = Closed;
64 }
65 Disconnected(_) => self.state = Closed,
66 Closed => (), }
68 }
69
70 pub fn disconnect(&mut self) {
71 use InputPortState::*;
72 replace_with::replace_with_or_abort(&mut self.state, |self_| match self_ {
73 Unconnected => Unconnected,
74 Connected(mut rx) => {
75 if !rx.is_closed() {
76 rx.close()
77 }
78 Disconnected(rx)
79 }
80 Disconnected(rx) => Disconnected(rx),
81 Closed => Closed,
82 })
83 }
84
85 pub fn direction(&self) -> PortDirection {
86 PortDirection::Input
87 }
88
89 pub fn state(&self) -> PortState {
90 (&self.state).into()
91 }
92
93 pub fn is_empty(&self) -> bool {
94 use InputPortState::*;
95 match self.state {
96 Connected(ref rx) | Disconnected(ref rx) => rx.is_empty(),
97 _ => true,
98 }
99 }
100
101 pub fn capacity(&self) -> Option<usize> {
102 use InputPortState::*;
103 match self.state {
104 Connected(ref rx) | Disconnected(ref rx) => Some(rx.capacity()),
105 _ => None,
106 }
107 }
108
109 pub fn max_capacity(&self) -> Option<usize> {
110 use InputPortState::*;
111 match self.state {
112 Connected(ref rx) | Disconnected(ref rx) => Some(rx.max_capacity()),
113 _ => None,
114 }
115 }
116
117 pub async fn recv(&mut self) -> Result<Option<T>, RecvError> {
118 loop {
119 return match self.recv_event().await? {
120 Some(PortEvent::Message(m)) => Ok(Some(m)),
121 Some(PortEvent::Connect) => continue, Some(PortEvent::Disconnect) => Ok(None),
123 None => Ok(None),
124 };
125 }
126 }
127
128 pub async fn recv_event(&mut self) -> Result<Option<PortEvent<T>>, RecvError> {
129 use InputPortState::*;
130 match self.state {
131 Connected(ref mut rx) | Disconnected(ref mut rx) => Ok(rx.recv().await),
132 _ => Ok(None),
133 }
134 }
135
136 pub fn blocking_recv(&mut self) -> Result<Option<T>, RecvError> {
137 todo!() }
139}
140
141impl<T, const N: usize> AsRef<Receiver<PortEvent<T>>> for Inputs<T, N> {
142 fn as_ref(&self) -> &Receiver<PortEvent<T>> {
143 use InputPortState::*;
144 match self.state {
145 Connected(ref rx) | Disconnected(ref rx) => rx,
146 _ => unreachable!(),
147 }
148 }
149}
150
151impl<T, const N: usize> AsMut<Receiver<PortEvent<T>>> for Inputs<T, N> {
152 fn as_mut(&mut self) -> &mut Receiver<PortEvent<T>> {
153 use InputPortState::*;
154 match self.state {
155 Connected(ref mut rx) | Disconnected(ref mut rx) => rx,
156 _ => unreachable!(),
157 }
158 }
159}
160
161impl<T, const N: usize> From<Receiver<PortEvent<T>>> for Inputs<T, N> {
162 fn from(input: Receiver<PortEvent<T>>) -> Self {
163 use InputPortState::*;
164 Self {
165 state: if input.is_closed() {
166 Disconnected(input)
167 } else {
168 Connected(input)
169 },
170 }
171 }
172}
173
174#[async_trait::async_trait]
175impl<T: Send + 'static, const N: usize> crate::io::InputPort<T> for Inputs<T, N> {
176 fn is_empty(&self) -> bool {
177 self.is_empty()
178 }
179
180 async fn recv(&mut self) -> Result<Option<T>, RecvError> {
181 self.recv().await
182 }
183}
184
185impl<T: Send, const N: usize> crate::io::Port<T> for Inputs<T, N> {
186 fn close(&mut self) {
187 self.close()
188 }
189
190 fn direction(&self) -> PortDirection {
191 self.direction()
192 }
193
194 fn state(&self) -> PortState {
195 self.state()
196 }
197}
198
199impl<T, const N: usize> MaybeNamed for Inputs<T, N> {
200 fn name(&self) -> Option<Cow<'_, str>> {
201 None
202 }
203}
204
205impl<T, const N: usize> MaybeLabeled for Inputs<T, N> {
206 fn label(&self) -> Option<Cow<'_, str>> {
207 None
208 }
209}