async_flow/tokio/
system.rs

1// This is free and unencumbered software released into the public domain.
2
3use super::{Channel, Inputs, ONESHOT, Outputs, UNLIMITED};
4use crate::io::Result;
5use tokio::task::{AbortHandle, JoinSet};
6
7pub type Subsystem = System;
8
9pub struct System {
10    pub(crate) blocks: JoinSet<Result>,
11}
12
13impl System {
14    pub fn oneshot<T>() -> Channel<T, ONESHOT> {
15        Channel::oneshot()
16    }
17
18    pub fn bounded<T>(buffer: usize) -> Channel<T, UNLIMITED> {
19        Channel::bounded(buffer)
20    }
21
22    /// Builds and executes a system, blocking until completion.
23    pub async fn run<F: FnOnce(&mut Self)>(f: F) -> Result {
24        Self::build(f).execute().await
25    }
26
27    /// Builds a new system.
28    pub fn build<F: FnOnce(&mut Self)>(f: F) -> Self {
29        let mut system = Self::new();
30        f(&mut system);
31        system
32    }
33
34    /// Instantiates a new system.
35    pub fn new() -> Self {
36        Self {
37            blocks: JoinSet::new(),
38        }
39    }
40
41    pub fn connect<T>(&mut self, inputs: Inputs<T>, outputs: Outputs<T>)
42    where
43        T: Send + 'static,
44    {
45        self.blocks.spawn(async move {
46            let mut inputs = inputs;
47            let outputs = outputs;
48            while let Some(input) = inputs.recv().await? {
49                outputs.send(input).await?;
50            }
51            Ok(())
52        });
53    }
54
55    pub fn spawn<F>(&mut self, task: F) -> AbortHandle
56    where
57        F: Future<Output = Result>,
58        F: Send + 'static,
59    {
60        self.blocks.spawn(task)
61    }
62
63    pub async fn execute(self) -> Result {
64        self.blocks.join_all().await;
65        Ok(())
66    }
67
68    #[cfg(feature = "std")]
69    pub fn read_stdin<T: core::str::FromStr>(&mut self) -> Inputs<T>
70    where
71        T: Send + 'static,
72        <T as core::str::FromStr>::Err: Send,
73    {
74        let (output, input) = super::Channel::bounded(1).into_inner(); // TODO
75        let block = super::stdin(output);
76        self.blocks.spawn(block);
77        input
78    }
79
80    #[cfg(feature = "std")]
81    pub fn write_stdout<T: alloc::string::ToString>(&mut self) -> Outputs<T>
82    where
83        T: Send + 'static,
84    {
85        let (output, input) = super::Channel::bounded(1).into_inner(); // TODO
86        let block = super::stdout(input);
87        self.blocks.spawn(block);
88        output
89    }
90}