async_flow/tokio/
outputs.rs

1// This is free and unencumbered software released into the public domain.
2
3use crate::{PortDirection, PortEvent, PortState, error::SendError};
4use alloc::{borrow::Cow, boxed::Box};
5use dogma::{MaybeLabeled, MaybeNamed};
6use tokio::sync::mpsc::Sender;
7
8#[derive(Clone, Debug, Default)]
9pub enum OutputPortState<T> {
10    #[default]
11    Unconnected,
12    Connected(Sender<PortEvent<T>>),
13    Disconnected,
14    Closed,
15}
16
17impl<T> Into<SendError> for &OutputPortState<T> {
18    fn into(self) -> SendError {
19        Into::<PortState>::into(self).into()
20    }
21}
22
23impl<T> Into<PortState> for &OutputPortState<T> {
24    fn into(self) -> PortState {
25        use OutputPortState::*;
26        match self {
27            Unconnected => PortState::Unconnected,
28            Connected(tx) => {
29                if tx.is_closed() {
30                    PortState::Disconnected
31                } else {
32                    PortState::Connected
33                }
34            }
35            Disconnected => PortState::Disconnected,
36            Closed => PortState::Closed,
37        }
38    }
39}
40
41#[derive(Clone, Default)]
42pub struct Outputs<T, const N: usize = 0> {
43    pub(crate) state: OutputPortState<T>,
44}
45
46impl<T, const N: usize> core::fmt::Debug for Outputs<T, N> {
47    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
48        f.debug_struct("Outputs")
49            //.field("state", &self.state) // TODO
50            .finish()
51    }
52}
53
54impl<T, const N: usize> Outputs<T, N> {
55    pub fn close(&mut self) {
56        use OutputPortState::*;
57        match &self.state {
58            Closed => (), // idempotent
59            Unconnected | Connected(_) | Disconnected => {
60                self.state = Closed;
61            }
62        }
63    }
64
65    pub fn direction(&self) -> PortDirection {
66        PortDirection::Output
67    }
68
69    pub fn state(&self) -> PortState {
70        (&self.state).into()
71    }
72
73    pub fn capacity(&self) -> Option<usize> {
74        use OutputPortState::*;
75        match self.state {
76            Connected(ref tx) => Some(tx.capacity()),
77            _ => None,
78        }
79    }
80
81    pub fn max_capacity(&self) -> Option<usize> {
82        use OutputPortState::*;
83        match self.state {
84            Connected(ref tx) => Some(tx.max_capacity()),
85            _ => None,
86        }
87    }
88
89    pub async fn send(&self, message: T) -> Result<(), SendError> {
90        self.send_event(PortEvent::Message(message)).await
91    }
92
93    pub async fn send_event(&self, event: PortEvent<T>) -> Result<(), SendError> {
94        use OutputPortState::*;
95        match self.state {
96            Connected(ref tx) => Ok(tx.send(event).await?),
97            _ => Err((&self.state).into()),
98        }
99    }
100
101    pub fn blocking_send(&self, _message: T) -> Result<(), SendError> {
102        todo!() // TODO
103    }
104}
105
106impl<T, const N: usize> AsRef<Sender<PortEvent<T>>> for Outputs<T, N> {
107    fn as_ref(&self) -> &Sender<PortEvent<T>> {
108        use OutputPortState::*;
109        match self.state {
110            Connected(ref tx) => tx,
111            _ => unreachable!(),
112        }
113    }
114}
115
116impl<T, const N: usize> AsMut<Sender<PortEvent<T>>> for Outputs<T, N> {
117    fn as_mut(&mut self) -> &mut Sender<PortEvent<T>> {
118        use OutputPortState::*;
119        match self.state {
120            Connected(ref mut tx) => tx,
121            _ => unreachable!(),
122        }
123    }
124}
125
126impl<T, const N: usize> From<Sender<PortEvent<T>>> for Outputs<T, N> {
127    fn from(input: Sender<PortEvent<T>>) -> Self {
128        use OutputPortState::*;
129        Self {
130            state: if input.is_closed() {
131                Disconnected
132            } else {
133                Connected(input)
134            },
135        }
136    }
137}
138
139impl<T, const N: usize> From<&Sender<PortEvent<T>>> for Outputs<T, N> {
140    fn from(input: &Sender<PortEvent<T>>) -> Self {
141        use OutputPortState::*;
142        Self {
143            state: if input.is_closed() {
144                Disconnected
145            } else {
146                Connected(input.clone())
147            },
148        }
149    }
150}
151
152#[async_trait::async_trait]
153impl<T: Send + 'static, const N: usize> crate::io::OutputPort<T> for Outputs<T, N> {
154    async fn send(&self, message: T) -> Result<(), SendError> {
155        self.send(message).await
156    }
157}
158
159impl<T: Send, const N: usize> crate::io::Port<T> for Outputs<T, N> {
160    fn close(&mut self) {
161        self.close()
162    }
163
164    fn direction(&self) -> PortDirection {
165        self.direction()
166    }
167
168    fn state(&self) -> PortState {
169        self.state()
170    }
171}
172
173impl<T, const N: usize> MaybeNamed for Outputs<T, N> {
174    fn name(&self) -> Option<Cow<'_, str>> {
175        None
176    }
177}
178
179impl<T, const N: usize> MaybeLabeled for Outputs<T, N> {
180    fn label(&self) -> Option<Cow<'_, str>> {
181        None
182    }
183}