async_flow/tokio/
inputs.rs

1// This is free and unencumbered software released into the public domain.
2
3use crate::io::RecvError;
4use alloc::{borrow::Cow, boxed::Box, vec::Vec};
5use dogma::{MaybeLabeled, MaybeNamed};
6use tokio::sync::mpsc::Receiver;
7
8pub struct Inputs<T> {
9    pub(crate) rx: Receiver<T>,
10}
11
12impl<T> core::fmt::Debug for Inputs<T> {
13    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
14        f.debug_struct("Inputs").field("rx", &self.rx).finish()
15    }
16}
17
18impl<T> Inputs<T> {
19    pub fn capacity(&self) -> Option<usize> {
20        Some(self.rx.capacity())
21    }
22
23    pub fn max_capacity(&self) -> Option<usize> {
24        Some(self.rx.max_capacity())
25    }
26
27    pub async fn recv_all(&mut self) -> Result<Vec<T>, RecvError> {
28        let mut inputs = Vec::new();
29        while let Some(input) = self.recv().await? {
30            inputs.push(input);
31        }
32        Ok(inputs)
33    }
34
35    pub async fn recv(&mut self) -> Result<Option<T>, RecvError> {
36        Ok(self.rx.recv().await)
37    }
38
39    pub fn recv_blocking(&mut self) -> Result<Option<T>, RecvError> {
40        Ok(self.rx.blocking_recv())
41    }
42}
43
44impl<T> AsRef<Receiver<T>> for Inputs<T> {
45    fn as_ref(&self) -> &Receiver<T> {
46        &self.rx
47    }
48}
49
50impl<T> AsMut<Receiver<T>> for Inputs<T> {
51    fn as_mut(&mut self) -> &mut Receiver<T> {
52        &mut self.rx
53    }
54}
55
56impl<T> From<Receiver<T>> for Inputs<T> {
57    fn from(input: Receiver<T>) -> Self {
58        Self { rx: input }
59    }
60}
61
62#[async_trait::async_trait]
63impl<T: Send> crate::io::InputPort<T> for Inputs<T> {
64    fn is_empty(&self) -> bool {
65        self.rx.is_empty()
66    }
67
68    async fn recv(&mut self) -> Result<Option<T>, RecvError> {
69        self.recv().await
70    }
71}
72
73impl<T> crate::io::Port<T> for Inputs<T> {
74    fn is_closed(&self) -> bool {
75        self.rx.is_closed()
76    }
77
78    fn close(&mut self) {
79        self.rx.close()
80    }
81}
82
83impl<T> MaybeLabeled for Inputs<T> {
84    fn label(&self) -> Option<Cow<'_, str>> {
85        None
86    }
87}
88
89impl<T> MaybeNamed for Inputs<T> {
90    fn name(&self) -> Option<Cow<'_, str>> {
91        None
92    }
93}