async_flow/tokio/
inputs.rs

1// This is free and unencumbered software released into the public domain.
2
3use crate::io::RecvError;
4use alloc::{borrow::Cow, boxed::Box, vec::Vec};
5use dogma::{MaybeLabeled, MaybeNamed};
6use tokio::sync::mpsc::Receiver;
7
8#[derive(Default)]
9pub struct Inputs<T> {
10    pub(crate) rx: Option<Receiver<T>>,
11}
12
13impl<T> core::fmt::Debug for Inputs<T> {
14    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
15        f.debug_struct("Inputs").field("rx", &self.rx).finish()
16    }
17}
18
19impl<T> Inputs<T> {
20    pub fn is_open(&self) -> bool {
21        !self.is_closed()
22    }
23
24    pub fn is_closed(&self) -> bool {
25        self.rx.as_ref().map(|rx| rx.is_closed()).unwrap_or(true)
26    }
27
28    pub fn is_empty(&self) -> bool {
29        self.rx.as_ref().map(|rx| rx.is_empty()).unwrap_or(true)
30    }
31
32    pub fn capacity(&self) -> Option<usize> {
33        self.rx.as_ref().map(|rx| rx.capacity())
34    }
35
36    pub fn max_capacity(&self) -> Option<usize> {
37        self.rx.as_ref().map(|rx| rx.max_capacity())
38    }
39
40    pub fn close(&mut self) {
41        if let Some(rx) = self.rx.as_mut() {
42            if !rx.is_closed() {
43                rx.close() // idempotent
44            }
45        }
46    }
47
48    pub async fn recv_all(&mut self) -> Result<Vec<T>, RecvError> {
49        let mut inputs = Vec::new();
50        while let Some(input) = self.recv().await? {
51            inputs.push(input);
52        }
53        Ok(inputs)
54    }
55
56    pub async fn recv(&mut self) -> Result<Option<T>, RecvError> {
57        if let Some(rx) = self.rx.as_mut() {
58            Ok(rx.recv().await)
59        } else {
60            Ok(None)
61        }
62    }
63
64    pub fn recv_blocking(&mut self) -> Result<Option<T>, RecvError> {
65        if let Some(rx) = self.rx.as_mut() {
66            Ok(rx.blocking_recv())
67        } else {
68            Ok(None)
69        }
70    }
71}
72
73impl<T> AsRef<Receiver<T>> for Inputs<T> {
74    fn as_ref(&self) -> &Receiver<T> {
75        self.rx.as_ref().unwrap()
76    }
77}
78
79impl<T> AsMut<Receiver<T>> for Inputs<T> {
80    fn as_mut(&mut self) -> &mut Receiver<T> {
81        self.rx.as_mut().unwrap()
82    }
83}
84
85impl<T> From<Receiver<T>> for Inputs<T> {
86    fn from(input: Receiver<T>) -> Self {
87        Self { rx: Some(input) }
88    }
89}
90
91#[async_trait::async_trait]
92impl<T: Send> crate::io::InputPort<T> for Inputs<T> {
93    fn is_empty(&self) -> bool {
94        self.is_empty()
95    }
96
97    async fn recv(&mut self) -> Result<Option<T>, RecvError> {
98        self.recv().await
99    }
100}
101
102impl<T> crate::io::Port<T> for Inputs<T> {
103    fn is_closed(&self) -> bool {
104        self.is_closed()
105    }
106
107    fn close(&mut self) {
108        self.close()
109    }
110}
111
112impl<T> MaybeLabeled for Inputs<T> {
113    fn label(&self) -> Option<Cow<'_, str>> {
114        None
115    }
116}
117
118impl<T> MaybeNamed for Inputs<T> {
119    fn name(&self) -> Option<Cow<'_, str>> {
120        None
121    }
122}