async_flow/tokio/
outputs.rs

1// This is free and unencumbered software released into the public domain.
2
3use crate::io::SendError;
4use alloc::{borrow::Cow, boxed::Box};
5use dogma::{MaybeLabeled, MaybeNamed};
6use tokio::sync::mpsc::Sender;
7
8#[derive(Clone, Default)]
9pub struct Outputs<T, const N: usize = 0> {
10    pub(crate) tx: Option<Sender<T>>,
11}
12
13impl<T, const N: usize> core::fmt::Debug for Outputs<T, N> {
14    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
15        f.debug_struct("Outputs").field("tx", &self.tx).finish()
16    }
17}
18
19impl<T, const N: usize> Outputs<T, N> {
20    pub fn is_open(&self) -> bool {
21        !self.is_closed()
22    }
23
24    pub fn is_closed(&self) -> bool {
25        self.tx.as_ref().map(|tx| tx.is_closed()).unwrap_or(true)
26    }
27
28    pub fn capacity(&self) -> Option<usize> {
29        self.tx.as_ref().map(|tx| tx.capacity())
30    }
31
32    pub fn max_capacity(&self) -> Option<usize> {
33        self.tx.as_ref().map(|tx| tx.max_capacity())
34    }
35
36    pub fn close(&mut self) {
37        let _ = self.tx.take();
38    }
39
40    pub async fn send(&self, value: T) -> Result<(), SendError> {
41        if let Some(tx) = self.tx.as_ref() {
42            Ok(tx.send(value).await?)
43        } else {
44            Err(SendError) // TODO: SendError::Closed
45        }
46    }
47
48    pub fn send_blocking(&self, value: T) -> Result<(), SendError> {
49        if let Some(tx) = self.tx.as_ref() {
50            Ok(tx.blocking_send(value)?)
51        } else {
52            Err(SendError) // TODO: SendError::Closed
53        }
54    }
55}
56
57impl<T, const N: usize> AsRef<Sender<T>> for Outputs<T, N> {
58    fn as_ref(&self) -> &Sender<T> {
59        self.tx.as_ref().unwrap()
60    }
61}
62
63impl<T, const N: usize> AsMut<Sender<T>> for Outputs<T, N> {
64    fn as_mut(&mut self) -> &mut Sender<T> {
65        self.tx.as_mut().unwrap()
66    }
67}
68
69impl<T, const N: usize> From<Sender<T>> for Outputs<T, N> {
70    fn from(input: Sender<T>) -> Self {
71        Self { tx: Some(input) }
72    }
73}
74
75impl<T, const N: usize> From<&Sender<T>> for Outputs<T, N> {
76    fn from(input: &Sender<T>) -> Self {
77        Self {
78            tx: Some(input.clone()),
79        }
80    }
81}
82
83#[async_trait::async_trait]
84impl<T: Send + 'static, const N: usize> crate::io::OutputPort<T> for Outputs<T, N> {
85    async fn send(&self, value: T) -> Result<(), SendError> {
86        self.send(value).await
87    }
88}
89
90impl<T, const N: usize> crate::io::Port<T> for Outputs<T, N> {
91    fn is_closed(&self) -> bool {
92        self.is_closed()
93    }
94
95    fn close(&mut self) {
96        self.close()
97    }
98}
99
100impl<T, const N: usize> MaybeLabeled for Outputs<T, N> {
101    fn label(&self) -> Option<Cow<'_, str>> {
102        None
103    }
104}
105
106impl<T, const N: usize> MaybeNamed for Outputs<T, N> {
107    fn name(&self) -> Option<Cow<'_, str>> {
108        None
109    }
110}