async_flow/tokio/
system.rs1use crate::{
4 io::Result,
5 tokio::{Inputs, Outputs},
6};
7use tokio::task::{AbortHandle, JoinSet};
8
9pub type Subsystem = System;
10
11pub struct System {
12 pub(crate) blocks: JoinSet<Result>,
13}
14
15impl System {
16 pub fn bounded<T>(buffer: usize) -> (Outputs<T>, Inputs<T>) {
17 super::bounded(buffer)
18 }
19
20 pub async fn run<F: FnOnce(&mut Self)>(f: F) -> Result {
22 Self::build(f).execute().await
23 }
24
25 pub fn build<F: FnOnce(&mut Self)>(f: F) -> Self {
27 let mut system = Self::new();
28 f(&mut system);
29 system
30 }
31
32 pub fn new() -> Self {
34 Self {
35 blocks: JoinSet::new(),
36 }
37 }
38
39 pub fn connect<T>(&mut self, inputs: Inputs<T>, outputs: Outputs<T>)
40 where
41 T: Send + 'static,
42 {
43 self.blocks.spawn(async move {
44 let mut inputs = inputs;
45 let outputs = outputs;
46 while let Some(input) = inputs.recv().await? {
47 outputs.send(input).await?;
48 }
49 Ok(())
50 });
51 }
52
53 pub fn spawn<F>(&mut self, task: F) -> AbortHandle
54 where
55 F: Future<Output = Result>,
56 F: Send + 'static,
57 {
58 self.blocks.spawn(task)
59 }
60
61 pub async fn execute(self) -> Result {
62 self.blocks.join_all().await;
63 Ok(())
64 }
65
66 #[cfg(feature = "std")]
67 pub fn read_stdin<T: core::str::FromStr>(&mut self) -> Inputs<T>
68 where
69 T: Send + 'static,
70 <T as core::str::FromStr>::Err: Send,
71 {
72 let (output, input) = super::bounded(1);
73 let block = super::stdin(output);
74 self.blocks.spawn(block);
75 input
76 }
77
78 #[cfg(feature = "std")]
79 pub fn write_stdout<T: alloc::string::ToString>(&mut self) -> Outputs<T>
80 where
81 T: Send + 'static,
82 {
83 let (output, input) = super::bounded(1);
84 let block = super::stdout(input);
85 self.blocks.spawn(block);
86 output
87 }
88}