async_flow/tokio/
system.rs1use super::{Channel, Inputs, ONESHOT, Outputs, UNLIMITED};
4use crate::error::Result;
5use tokio::task::{AbortHandle, JoinSet};
6
7pub type Subsystem = System;
8
9pub struct System {
10 pub(crate) blocks: JoinSet<Result>,
11}
12
13impl System {
14 pub fn oneshot<T>() -> Channel<T, ONESHOT> {
15 Channel::oneshot()
16 }
17
18 pub fn bounded<T>(buffer: usize) -> Channel<T, UNLIMITED> {
19 Channel::bounded(buffer)
20 }
21
22 pub async fn run<F: FnOnce(&mut Self)>(f: F) -> Result {
24 Self::build(f).execute().await
25 }
26
27 pub fn build<F: FnOnce(&mut Self)>(f: F) -> Self {
29 let mut system = Self::new();
30 f(&mut system);
31 system
32 }
33
34 pub fn new() -> Self {
36 Self {
37 blocks: JoinSet::new(),
38 }
39 }
40
41 pub fn connect<T>(&mut self, inputs: Inputs<T>, outputs: Outputs<T>)
42 where
43 T: Send + 'static,
44 {
45 self.blocks.spawn(async move {
46 let mut inputs = inputs;
47 let outputs = outputs;
48 while let Some(input) = inputs.recv().await? {
49 outputs.send(input).await?;
50 }
51 Ok(())
52 });
53 }
54
55 pub fn spawn<F>(&mut self, task: F) -> AbortHandle
56 where
57 F: Future<Output = Result>,
58 F: Send + 'static,
59 {
60 self.blocks.spawn(task)
61 }
62
63 pub async fn execute(self) -> Result {
64 self.blocks.join_all().await;
65 Ok(())
66 }
67
68 #[cfg(feature = "std")]
69 pub fn read_stdin<T: core::str::FromStr>(&mut self) -> Inputs<T>
70 where
71 T: Send + 'static,
72 <T as core::str::FromStr>::Err: Send,
73 {
74 let (output, input) = super::Channel::bounded(1).into_inner(); let block = super::stdin(output);
76 self.blocks.spawn(block);
77 input
78 }
79
80 #[cfg(feature = "std")]
81 pub fn write_stdout<T: alloc::string::ToString>(&mut self) -> Outputs<T>
82 where
83 T: Send + 'static,
84 {
85 let (output, input) = super::Channel::bounded(1).into_inner(); let block = super::stdout(input);
87 self.blocks.spawn(block);
88 output
89 }
90}