async_flow/tokio/
outputs.rs1use crate::{PortDirection, PortState, io::SendError};
4use alloc::{borrow::Cow, boxed::Box};
5use dogma::{MaybeLabeled, MaybeNamed};
6use tokio::sync::mpsc::Sender;
7
8#[derive(Clone, Default)]
9pub struct Outputs<T, const N: usize = 0> {
10 pub(crate) tx: Option<Sender<T>>,
11}
12
13impl<T, const N: usize> core::fmt::Debug for Outputs<T, N> {
14 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
15 f.debug_struct("Outputs").field("tx", &self.tx).finish()
16 }
17}
18
19impl<T, const N: usize> Outputs<T, N> {
20 pub fn close(&mut self) {
21 let _ = self.tx.take();
22 }
23
24 pub fn direction(&self) -> PortDirection {
25 PortDirection::Output
26 }
27
28 pub fn state(&self) -> PortState {
29 if self.tx.as_ref().map(|tx| tx.is_closed()).unwrap_or(true) {
30 PortState::Closed
31 } else {
32 PortState::Open
33 }
34 }
35
36 pub fn is_closed(&self) -> bool {
38 self.state().is_closed()
39 }
40
41 pub fn is_open(&self) -> bool {
43 self.state().is_open()
44 }
45
46 pub fn is_connected(&self) -> bool {
48 self.state().is_connected()
49 }
50
51 pub fn capacity(&self) -> Option<usize> {
52 self.tx.as_ref().map(|tx| tx.capacity())
53 }
54
55 pub fn max_capacity(&self) -> Option<usize> {
56 self.tx.as_ref().map(|tx| tx.max_capacity())
57 }
58
59 pub async fn send(&self, value: T) -> Result<(), SendError> {
60 if let Some(tx) = self.tx.as_ref() {
61 Ok(tx.send(value).await?)
62 } else {
63 Err(SendError) }
65 }
66
67 pub fn send_blocking(&self, value: T) -> Result<(), SendError> {
68 if let Some(tx) = self.tx.as_ref() {
69 Ok(tx.blocking_send(value)?)
70 } else {
71 Err(SendError) }
73 }
74}
75
76impl<T, const N: usize> AsRef<Sender<T>> for Outputs<T, N> {
77 fn as_ref(&self) -> &Sender<T> {
78 self.tx.as_ref().unwrap()
79 }
80}
81
82impl<T, const N: usize> AsMut<Sender<T>> for Outputs<T, N> {
83 fn as_mut(&mut self) -> &mut Sender<T> {
84 self.tx.as_mut().unwrap()
85 }
86}
87
88impl<T, const N: usize> From<Sender<T>> for Outputs<T, N> {
89 fn from(input: Sender<T>) -> Self {
90 Self { tx: Some(input) }
91 }
92}
93
94impl<T, const N: usize> From<&Sender<T>> for Outputs<T, N> {
95 fn from(input: &Sender<T>) -> Self {
96 Self {
97 tx: Some(input.clone()),
98 }
99 }
100}
101
102#[async_trait::async_trait]
103impl<T: Send + 'static, const N: usize> crate::io::OutputPort<T> for Outputs<T, N> {
104 async fn send(&self, value: T) -> Result<(), SendError> {
105 self.send(value).await
106 }
107}
108
109impl<T, const N: usize> crate::io::Port<T> for Outputs<T, N> {
110 fn close(&mut self) {
111 self.close()
112 }
113
114 fn direction(&self) -> PortDirection {
115 self.direction()
116 }
117
118 fn state(&self) -> PortState {
119 self.state()
120 }
121}
122
123impl<T, const N: usize> MaybeLabeled for Outputs<T, N> {
124 fn label(&self) -> Option<Cow<'_, str>> {
125 None
126 }
127}
128
129impl<T, const N: usize> MaybeNamed for Outputs<T, N> {
130 fn name(&self) -> Option<Cow<'_, str>> {
131 None
132 }
133}