async_flow/tokio/
inputs.rs1use crate::io::RecvError;
4use alloc::{borrow::Cow, boxed::Box, vec::Vec};
5use dogma::{MaybeLabeled, MaybeNamed};
6use tokio::sync::mpsc::Receiver;
7
8pub struct Inputs<T> {
9 pub(crate) rx: Receiver<T>,
10}
11
12impl<T> core::fmt::Debug for Inputs<T> {
13 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
14 f.debug_struct("Inputs").field("rx", &self.rx).finish()
15 }
16}
17
18impl<T> Inputs<T> {
19 pub fn capacity(&self) -> Option<usize> {
20 Some(self.rx.capacity())
21 }
22
23 pub fn max_capacity(&self) -> Option<usize> {
24 Some(self.rx.max_capacity())
25 }
26
27 pub async fn recv_all(&mut self) -> Result<Vec<T>, RecvError> {
28 let mut inputs = Vec::new();
29 while let Some(input) = self.recv().await? {
30 inputs.push(input);
31 }
32 Ok(inputs)
33 }
34
35 pub async fn recv(&mut self) -> Result<Option<T>, RecvError> {
36 Ok(self.rx.recv().await)
37 }
38
39 pub fn recv_blocking(&mut self) -> Result<Option<T>, RecvError> {
40 Ok(self.rx.blocking_recv())
41 }
42}
43
44impl<T> AsRef<Receiver<T>> for Inputs<T> {
45 fn as_ref(&self) -> &Receiver<T> {
46 &self.rx
47 }
48}
49
50impl<T> AsMut<Receiver<T>> for Inputs<T> {
51 fn as_mut(&mut self) -> &mut Receiver<T> {
52 &mut self.rx
53 }
54}
55
56impl<T> From<Receiver<T>> for Inputs<T> {
57 fn from(input: Receiver<T>) -> Self {
58 Self { rx: input }
59 }
60}
61
62#[async_trait::async_trait]
63impl<T: Send> crate::io::InputPort<T> for Inputs<T> {
64 fn is_empty(&self) -> bool {
65 self.rx.is_empty()
66 }
67
68 async fn recv(&mut self) -> Result<Option<T>, RecvError> {
69 self.recv().await
70 }
71}
72
73impl<T> crate::io::Port<T> for Inputs<T> {
74 fn is_closed(&self) -> bool {
75 self.rx.is_closed()
76 }
77
78 fn close(&mut self) {
79 self.rx.close()
80 }
81}
82
83impl<T> MaybeLabeled for Inputs<T> {
84 fn label(&self) -> Option<Cow<'_, str>> {
85 None
86 }
87}
88
89impl<T> MaybeNamed for Inputs<T> {
90 fn name(&self) -> Option<Cow<'_, str>> {
91 None
92 }
93}