async_flow/tokio/
system.rs

1// This is free and unencumbered software released into the public domain.
2
3use crate::{
4    io::Result,
5    tokio::{Inputs, Outputs},
6};
7use tokio::task::{AbortHandle, JoinSet};
8
9pub type Subsystem = System;
10
11pub struct System {
12    pub(crate) blocks: JoinSet<Result>,
13}
14
15impl System {
16    pub fn bounded<T>(buffer: usize) -> (Outputs<T>, Inputs<T>) {
17        super::bounded(buffer)
18    }
19
20    /// Builds and executes a system, blocking until completion.
21    pub async fn run<F: FnOnce(&mut Self)>(f: F) -> Result {
22        Self::build(f).execute().await
23    }
24
25    /// Builds a new system.
26    pub fn build<F: FnOnce(&mut Self)>(f: F) -> Self {
27        let mut system = Self::new();
28        f(&mut system);
29        system
30    }
31
32    /// Instantiates a new system.
33    pub fn new() -> Self {
34        Self {
35            blocks: JoinSet::new(),
36        }
37    }
38
39    pub fn connect<T>(&mut self, inputs: Inputs<T>, outputs: Outputs<T>)
40    where
41        T: Send + 'static,
42    {
43        self.blocks.spawn(async move {
44            let mut inputs = inputs;
45            let outputs = outputs;
46            while let Some(input) = inputs.recv().await? {
47                outputs.send(input).await?;
48            }
49            Ok(())
50        });
51    }
52
53    pub fn spawn<F>(&mut self, task: F) -> AbortHandle
54    where
55        F: Future<Output = Result>,
56        F: Send + 'static,
57    {
58        self.blocks.spawn(task)
59    }
60
61    pub async fn execute(self) -> Result {
62        self.blocks.join_all().await;
63        Ok(())
64    }
65
66    #[cfg(feature = "std")]
67    pub fn read_stdin<T: core::str::FromStr>(&mut self) -> Inputs<T>
68    where
69        T: Send + 'static,
70        <T as core::str::FromStr>::Err: Send,
71    {
72        let (output, input) = super::bounded(1);
73        let block = super::stdin(output);
74        self.blocks.spawn(block);
75        input
76    }
77
78    #[cfg(feature = "std")]
79    pub fn write_stdout<T: alloc::string::ToString>(&mut self) -> Outputs<T>
80    where
81        T: Send + 'static,
82    {
83        let (output, input) = super::bounded(1);
84        let block = super::stdout(input);
85        self.blocks.spawn(block);
86        output
87    }
88}