async_flow/tokio/
outputs.rs

1// This is free and unencumbered software released into the public domain.
2
3use crate::{PortDirection, PortState, io::SendError};
4use alloc::{borrow::Cow, boxed::Box};
5use dogma::{MaybeLabeled, MaybeNamed};
6use tokio::sync::mpsc::Sender;
7
8#[derive(Clone, Default)]
9pub struct Outputs<T, const N: usize = 0> {
10    pub(crate) tx: Option<Sender<T>>,
11}
12
13impl<T, const N: usize> core::fmt::Debug for Outputs<T, N> {
14    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
15        f.debug_struct("Outputs").field("tx", &self.tx).finish()
16    }
17}
18
19impl<T, const N: usize> Outputs<T, N> {
20    pub fn close(&mut self) {
21        let _ = self.tx.take();
22    }
23
24    pub fn direction(&self) -> PortDirection {
25        PortDirection::Output
26    }
27
28    pub fn state(&self) -> PortState {
29        if self.tx.as_ref().map(|tx| tx.is_closed()).unwrap_or(true) {
30            PortState::Closed
31        } else {
32            PortState::Open
33        }
34    }
35
36    /// Checks whether this port is currently closed.
37    pub fn is_closed(&self) -> bool {
38        self.state().is_closed()
39    }
40
41    /// Checks whether this port is currently open.
42    pub fn is_open(&self) -> bool {
43        self.state().is_open()
44    }
45
46    /// Checks whether this port is currently connected.
47    pub fn is_connected(&self) -> bool {
48        self.state().is_connected()
49    }
50
51    pub fn capacity(&self) -> Option<usize> {
52        self.tx.as_ref().map(|tx| tx.capacity())
53    }
54
55    pub fn max_capacity(&self) -> Option<usize> {
56        self.tx.as_ref().map(|tx| tx.max_capacity())
57    }
58
59    pub async fn send(&self, value: T) -> Result<(), SendError> {
60        if let Some(tx) = self.tx.as_ref() {
61            Ok(tx.send(value).await?)
62        } else {
63            Err(SendError) // TODO: SendError::Closed
64        }
65    }
66
67    pub fn send_blocking(&self, value: T) -> Result<(), SendError> {
68        if let Some(tx) = self.tx.as_ref() {
69            Ok(tx.blocking_send(value)?)
70        } else {
71            Err(SendError) // TODO: SendError::Closed
72        }
73    }
74}
75
76impl<T, const N: usize> AsRef<Sender<T>> for Outputs<T, N> {
77    fn as_ref(&self) -> &Sender<T> {
78        self.tx.as_ref().unwrap()
79    }
80}
81
82impl<T, const N: usize> AsMut<Sender<T>> for Outputs<T, N> {
83    fn as_mut(&mut self) -> &mut Sender<T> {
84        self.tx.as_mut().unwrap()
85    }
86}
87
88impl<T, const N: usize> From<Sender<T>> for Outputs<T, N> {
89    fn from(input: Sender<T>) -> Self {
90        Self { tx: Some(input) }
91    }
92}
93
94impl<T, const N: usize> From<&Sender<T>> for Outputs<T, N> {
95    fn from(input: &Sender<T>) -> Self {
96        Self {
97            tx: Some(input.clone()),
98        }
99    }
100}
101
102#[async_trait::async_trait]
103impl<T: Send + 'static, const N: usize> crate::io::OutputPort<T> for Outputs<T, N> {
104    async fn send(&self, value: T) -> Result<(), SendError> {
105        self.send(value).await
106    }
107}
108
109impl<T, const N: usize> crate::io::Port<T> for Outputs<T, N> {
110    fn close(&mut self) {
111        self.close()
112    }
113
114    fn direction(&self) -> PortDirection {
115        self.direction()
116    }
117
118    fn state(&self) -> PortState {
119        self.state()
120    }
121}
122
123impl<T, const N: usize> MaybeLabeled for Outputs<T, N> {
124    fn label(&self) -> Option<Cow<'_, str>> {
125        None
126    }
127}
128
129impl<T, const N: usize> MaybeNamed for Outputs<T, N> {
130    fn name(&self) -> Option<Cow<'_, str>> {
131        None
132    }
133}