async_flow/tokio/
outputs.rs1use crate::io::SendError;
4use alloc::{borrow::Cow, boxed::Box};
5use dogma::{MaybeLabeled, MaybeNamed};
6use tokio::sync::mpsc::Sender;
7
8#[derive(Clone, Default)]
9pub struct Outputs<T, const N: usize = 0> {
10 pub(crate) tx: Option<Sender<T>>,
11}
12
13impl<T> core::fmt::Debug for Outputs<T> {
14 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
15 f.debug_struct("Outputs").field("tx", &self.tx).finish()
16 }
17}
18
19impl<T> Outputs<T> {
20 pub fn is_open(&self) -> bool {
21 !self.is_closed()
22 }
23
24 pub fn is_closed(&self) -> bool {
25 self.tx.as_ref().map(|tx| tx.is_closed()).unwrap_or(true)
26 }
27
28 pub fn capacity(&self) -> Option<usize> {
29 self.tx.as_ref().map(|tx| tx.capacity())
30 }
31
32 pub fn max_capacity(&self) -> Option<usize> {
33 self.tx.as_ref().map(|tx| tx.max_capacity())
34 }
35
36 pub fn close(&mut self) {
37 let _ = self.tx.take();
38 }
39
40 pub async fn send(&self, value: T) -> Result<(), SendError> {
41 if let Some(tx) = self.tx.as_ref() {
42 Ok(tx.send(value).await?)
43 } else {
44 Err(SendError) }
46 }
47
48 pub fn send_blocking(&self, value: T) -> Result<(), SendError> {
49 if let Some(tx) = self.tx.as_ref() {
50 Ok(tx.blocking_send(value)?)
51 } else {
52 Err(SendError) }
54 }
55}
56
57impl<T> AsRef<Sender<T>> for Outputs<T> {
58 fn as_ref(&self) -> &Sender<T> {
59 self.tx.as_ref().unwrap()
60 }
61}
62
63impl<T> AsMut<Sender<T>> for Outputs<T> {
64 fn as_mut(&mut self) -> &mut Sender<T> {
65 self.tx.as_mut().unwrap()
66 }
67}
68
69impl<T> From<Sender<T>> for Outputs<T> {
70 fn from(input: Sender<T>) -> Self {
71 Self { tx: Some(input) }
72 }
73}
74
75impl<T> From<&Sender<T>> for Outputs<T> {
76 fn from(input: &Sender<T>) -> Self {
77 Self {
78 tx: Some(input.clone()),
79 }
80 }
81}
82
83#[async_trait::async_trait]
84impl<T: Send + 'static> crate::io::OutputPort<T> for Outputs<T> {
85 async fn send(&self, value: T) -> Result<(), SendError> {
86 self.send(value).await
87 }
88}
89
90impl<T> crate::io::Port<T> for Outputs<T> {
91 fn is_closed(&self) -> bool {
92 self.is_closed()
93 }
94
95 fn close(&mut self) {
96 self.close()
97 }
98}
99
100impl<T> MaybeLabeled for Outputs<T> {
101 fn label(&self) -> Option<Cow<'_, str>> {
102 None
103 }
104}
105
106impl<T> MaybeNamed for Outputs<T> {
107 fn name(&self) -> Option<Cow<'_, str>> {
108 None
109 }
110}