async_flow/tokio/
outputs.rs1use crate::{PortDirection, PortEvent, PortState, error::SendError};
4use alloc::{borrow::Cow, boxed::Box};
5use dogma::{MaybeLabeled, MaybeNamed};
6use tokio::sync::mpsc::Sender;
7
8#[derive(Clone, Debug, Default)]
9pub enum OutputPortState<T> {
10 #[default]
11 Unconnected,
12 Connected(Sender<PortEvent<T>>),
13 Disconnected,
14 Closed,
15}
16
17impl<T> Into<SendError> for &OutputPortState<T> {
18 fn into(self) -> SendError {
19 Into::<PortState>::into(self).into()
20 }
21}
22
23impl<T> Into<PortState> for &OutputPortState<T> {
24 fn into(self) -> PortState {
25 use OutputPortState::*;
26 match self {
27 Unconnected => PortState::Unconnected,
28 Connected(tx) => {
29 if tx.is_closed() {
30 PortState::Disconnected
31 } else {
32 PortState::Connected
33 }
34 }
35 Disconnected => PortState::Disconnected,
36 Closed => PortState::Closed,
37 }
38 }
39}
40
41#[derive(Clone, Default)]
42pub struct Outputs<T, const N: usize = 0> {
43 pub(crate) state: OutputPortState<T>,
44}
45
46impl<T, const N: usize> core::fmt::Debug for Outputs<T, N> {
47 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
48 f.debug_struct("Outputs")
49 .finish()
51 }
52}
53
54impl<T, const N: usize> Outputs<T, N> {
55 pub fn close(&mut self) {
56 use OutputPortState::*;
57 match &self.state {
58 Closed => (), Unconnected | Connected(_) | Disconnected => {
60 self.state = Closed;
61 }
62 }
63 }
64
65 pub fn direction(&self) -> PortDirection {
66 PortDirection::Output
67 }
68
69 pub fn state(&self) -> PortState {
70 (&self.state).into()
71 }
72
73 pub fn capacity(&self) -> Option<usize> {
74 use OutputPortState::*;
75 match self.state {
76 Connected(ref tx) => Some(tx.capacity()),
77 _ => None,
78 }
79 }
80
81 pub fn max_capacity(&self) -> Option<usize> {
82 use OutputPortState::*;
83 match self.state {
84 Connected(ref tx) => Some(tx.max_capacity()),
85 _ => None,
86 }
87 }
88
89 pub async fn send(&self, message: T) -> Result<(), SendError> {
90 self.send_event(PortEvent::Message(message)).await
91 }
92
93 pub async fn send_event(&self, event: PortEvent<T>) -> Result<(), SendError> {
94 use OutputPortState::*;
95 match self.state {
96 Connected(ref tx) => Ok(tx.send(event).await?),
97 _ => Err((&self.state).into()),
98 }
99 }
100
101 pub fn blocking_send(&self, _message: T) -> Result<(), SendError> {
102 todo!() }
104}
105
106impl<T, const N: usize> AsRef<Sender<PortEvent<T>>> for Outputs<T, N> {
107 fn as_ref(&self) -> &Sender<PortEvent<T>> {
108 use OutputPortState::*;
109 match self.state {
110 Connected(ref tx) => tx,
111 _ => unreachable!(),
112 }
113 }
114}
115
116impl<T, const N: usize> AsMut<Sender<PortEvent<T>>> for Outputs<T, N> {
117 fn as_mut(&mut self) -> &mut Sender<PortEvent<T>> {
118 use OutputPortState::*;
119 match self.state {
120 Connected(ref mut tx) => tx,
121 _ => unreachable!(),
122 }
123 }
124}
125
126impl<T, const N: usize> From<Sender<PortEvent<T>>> for Outputs<T, N> {
127 fn from(input: Sender<PortEvent<T>>) -> Self {
128 use OutputPortState::*;
129 Self {
130 state: if input.is_closed() {
131 Disconnected
132 } else {
133 Connected(input)
134 },
135 }
136 }
137}
138
139impl<T, const N: usize> From<&Sender<PortEvent<T>>> for Outputs<T, N> {
140 fn from(input: &Sender<PortEvent<T>>) -> Self {
141 use OutputPortState::*;
142 Self {
143 state: if input.is_closed() {
144 Disconnected
145 } else {
146 Connected(input.clone())
147 },
148 }
149 }
150}
151
152#[async_trait::async_trait]
153impl<T: Send + 'static, const N: usize> crate::io::OutputPort<T> for Outputs<T, N> {
154 async fn send(&self, message: T) -> Result<(), SendError> {
155 self.send(message).await
156 }
157}
158
159impl<T: Send, const N: usize> crate::io::Port<T> for Outputs<T, N> {
160 fn close(&mut self) {
161 self.close()
162 }
163
164 fn direction(&self) -> PortDirection {
165 self.direction()
166 }
167
168 fn state(&self) -> PortState {
169 self.state()
170 }
171}
172
173impl<T, const N: usize> MaybeNamed for Outputs<T, N> {
174 fn name(&self) -> Option<Cow<'_, str>> {
175 None
176 }
177}
178
179impl<T, const N: usize> MaybeLabeled for Outputs<T, N> {
180 fn label(&self) -> Option<Cow<'_, str>> {
181 None
182 }
183}