async_flow/tokio/
outputs.rs

1// This is free and unencumbered software released into the public domain.
2
3use crate::io::SendError;
4use alloc::{borrow::Cow, boxed::Box};
5use dogma::{MaybeLabeled, MaybeNamed};
6use tokio::sync::mpsc::Sender;
7
8#[derive(Clone)]
9pub struct Outputs<T> {
10    pub(crate) tx: Option<Sender<T>>,
11}
12
13impl<T> core::fmt::Debug for Outputs<T> {
14    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
15        f.debug_struct("Outputs").field("tx", &self.tx).finish()
16    }
17}
18
19impl<T> Outputs<T> {
20    pub fn capacity(&self) -> Option<usize> {
21        self.tx.as_ref().map(|tx| tx.capacity())
22    }
23
24    pub fn max_capacity(&self) -> Option<usize> {
25        self.tx.as_ref().map(|tx| tx.max_capacity())
26    }
27
28    pub async fn send(&self, value: T) -> Result<(), SendError> {
29        if let Some(tx) = self.tx.as_ref() {
30            Ok(tx.send(value).await?)
31        } else {
32            Err(SendError) // TODO: SendError::Closed
33        }
34    }
35
36    pub fn send_blocking(&self, value: T) -> Result<(), SendError> {
37        if let Some(tx) = self.tx.as_ref() {
38            Ok(tx.blocking_send(value)?)
39        } else {
40            Err(SendError) // TODO: SendError::Closed
41        }
42    }
43}
44
45impl<T> AsRef<Sender<T>> for Outputs<T> {
46    fn as_ref(&self) -> &Sender<T> {
47        self.tx.as_ref().unwrap()
48    }
49}
50
51impl<T> AsMut<Sender<T>> for Outputs<T> {
52    fn as_mut(&mut self) -> &mut Sender<T> {
53        self.tx.as_mut().unwrap()
54    }
55}
56
57impl<T> From<Sender<T>> for Outputs<T> {
58    fn from(input: Sender<T>) -> Self {
59        Self { tx: Some(input) }
60    }
61}
62
63impl<T> From<&Sender<T>> for Outputs<T> {
64    fn from(input: &Sender<T>) -> Self {
65        Self {
66            tx: Some(input.clone()),
67        }
68    }
69}
70
71#[async_trait::async_trait]
72impl<T: Send + 'static> crate::io::OutputPort<T> for Outputs<T> {
73    async fn send(&self, value: T) -> Result<(), SendError> {
74        self.send(value).await
75    }
76}
77
78impl<T> crate::io::Port<T> for Outputs<T> {
79    fn is_closed(&self) -> bool {
80        self.tx.as_ref().map(|tx| tx.is_closed()).unwrap_or(true)
81    }
82
83    fn close(&mut self) {
84        self.tx.take();
85    }
86}
87
88impl<T> MaybeLabeled for Outputs<T> {
89    fn label(&self) -> Option<Cow<'_, str>> {
90        None
91    }
92}
93
94impl<T> MaybeNamed for Outputs<T> {
95    fn name(&self) -> Option<Cow<'_, str>> {
96        None
97    }
98}