async_flow/tokio/
inputs.rs

1// This is free and unencumbered software released into the public domain.
2
3use crate::{PortDirection, PortEvent, PortState, error::RecvError};
4use alloc::{borrow::Cow, boxed::Box};
5use dogma::{MaybeLabeled, MaybeNamed};
6use tokio::sync::mpsc::Receiver;
7
8#[derive(Debug, Default)]
9pub enum InputPortState<T> {
10    #[default]
11    Unconnected,
12    Connected(Receiver<PortEvent<T>>),
13    Disconnected(Receiver<PortEvent<T>>),
14    Closed,
15}
16
17impl<T> Into<RecvError> for &InputPortState<T> {
18    fn into(self) -> RecvError {
19        Into::<PortState>::into(self).into()
20    }
21}
22
23impl<T> Into<PortState> for &InputPortState<T> {
24    fn into(self) -> PortState {
25        use InputPortState::*;
26        match self {
27            Unconnected => PortState::Unconnected,
28            Connected(rx) => {
29                if rx.is_closed() {
30                    PortState::Disconnected
31                } else {
32                    PortState::Connected
33                }
34            }
35            Disconnected(_) => PortState::Disconnected,
36            Closed => PortState::Closed,
37        }
38    }
39}
40
41#[derive(Default)]
42pub struct Inputs<T, const N: usize = 0> {
43    pub(crate) state: InputPortState<T>,
44}
45
46impl<T, const N: usize> core::fmt::Debug for Inputs<T, N> {
47    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
48        f.debug_struct("Inputs")
49            //.field("state", &self.state) // TODO
50            .finish()
51    }
52}
53
54impl<T, const N: usize> Inputs<T, N> {
55    pub fn close(&mut self) {
56        use InputPortState::*;
57        match self.state {
58            Unconnected => self.state = Closed,
59            Connected(ref mut rx) => {
60                if !rx.is_closed() {
61                    rx.close()
62                }
63                self.state = Closed;
64            }
65            Disconnected(_) => self.state = Closed,
66            Closed => (), // idempotent
67        }
68    }
69
70    pub fn disconnect(&mut self) {
71        use InputPortState::*;
72        replace_with::replace_with_or_abort(&mut self.state, |self_| match self_ {
73            Unconnected => Unconnected,
74            Connected(mut rx) => {
75                if !rx.is_closed() {
76                    rx.close()
77                }
78                Disconnected(rx)
79            }
80            Disconnected(rx) => Disconnected(rx),
81            Closed => Closed,
82        })
83    }
84
85    pub fn direction(&self) -> PortDirection {
86        PortDirection::Input
87    }
88
89    pub fn state(&self) -> PortState {
90        (&self.state).into()
91    }
92
93    pub fn is_empty(&self) -> bool {
94        use InputPortState::*;
95        match self.state {
96            Connected(ref rx) | Disconnected(ref rx) => rx.is_empty(),
97            _ => true,
98        }
99    }
100
101    pub fn capacity(&self) -> Option<usize> {
102        use InputPortState::*;
103        match self.state {
104            Connected(ref rx) | Disconnected(ref rx) => Some(rx.capacity()),
105            _ => None,
106        }
107    }
108
109    pub fn max_capacity(&self) -> Option<usize> {
110        use InputPortState::*;
111        match self.state {
112            Connected(ref rx) | Disconnected(ref rx) => Some(rx.max_capacity()),
113            _ => None,
114        }
115    }
116
117    pub async fn recv(&mut self) -> Result<Option<T>, RecvError> {
118        loop {
119            return match self.recv_event().await? {
120                Some(PortEvent::Message(m)) => Ok(Some(m)),
121                Some(PortEvent::Connect) => continue, // TODO
122                Some(PortEvent::Disconnect) => Ok(None),
123                None => Ok(None),
124            };
125        }
126    }
127
128    pub async fn recv_event(&mut self) -> Result<Option<PortEvent<T>>, RecvError> {
129        use InputPortState::*;
130        match self.state {
131            Connected(ref mut rx) | Disconnected(ref mut rx) => Ok(rx.recv().await),
132            _ => Ok(None),
133        }
134    }
135
136    pub fn blocking_recv(&mut self) -> Result<Option<T>, RecvError> {
137        todo!() // TODO
138    }
139}
140
141impl<T, const N: usize> AsRef<Receiver<PortEvent<T>>> for Inputs<T, N> {
142    fn as_ref(&self) -> &Receiver<PortEvent<T>> {
143        use InputPortState::*;
144        match self.state {
145            Connected(ref rx) | Disconnected(ref rx) => rx,
146            _ => unreachable!(),
147        }
148    }
149}
150
151impl<T, const N: usize> AsMut<Receiver<PortEvent<T>>> for Inputs<T, N> {
152    fn as_mut(&mut self) -> &mut Receiver<PortEvent<T>> {
153        use InputPortState::*;
154        match self.state {
155            Connected(ref mut rx) | Disconnected(ref mut rx) => rx,
156            _ => unreachable!(),
157        }
158    }
159}
160
161impl<T, const N: usize> From<Receiver<PortEvent<T>>> for Inputs<T, N> {
162    fn from(input: Receiver<PortEvent<T>>) -> Self {
163        use InputPortState::*;
164        Self {
165            state: if input.is_closed() {
166                Disconnected(input)
167            } else {
168                Connected(input)
169            },
170        }
171    }
172}
173
174#[async_trait::async_trait]
175impl<T: Send + 'static, const N: usize> crate::io::InputPort<T> for Inputs<T, N> {
176    fn is_empty(&self) -> bool {
177        self.is_empty()
178    }
179
180    async fn recv(&mut self) -> Result<Option<T>, RecvError> {
181        self.recv().await
182    }
183}
184
185impl<T: Send, const N: usize> crate::io::Port<T> for Inputs<T, N> {
186    fn close(&mut self) {
187        self.close()
188    }
189
190    fn direction(&self) -> PortDirection {
191        self.direction()
192    }
193
194    fn state(&self) -> PortState {
195        self.state()
196    }
197}
198
199impl<T, const N: usize> MaybeNamed for Inputs<T, N> {
200    fn name(&self) -> Option<Cow<'_, str>> {
201        None
202    }
203}
204
205impl<T, const N: usize> MaybeLabeled for Inputs<T, N> {
206    fn label(&self) -> Option<Cow<'_, str>> {
207        None
208    }
209}