flarrow_runtime/
runtime.rs

1use std::{collections::HashMap, sync::Arc};
2
3use crate::prelude::*;
4
5pub struct DataflowRuntime {
6    pub clock: Arc<uhlc::HLC>,
7
8    pub nodes: HashMap<NodeID, RuntimeNode>,
9}
10
11impl DataflowRuntime {
12    pub async fn new(
13        flows: Flows,
14        url_plugin: Option<RuntimeUrlPlugin>,
15        load: impl AsyncFn(&mut Loader) -> Result<()>,
16    ) -> eyre::Result<Self> {
17        let clock = Arc::new(uhlc::HLC::default());
18        let mut loader = Loader::new(flows, url_plugin, clock.clone());
19
20        load(&mut loader).await.wrap_err("Failed to load flows")?;
21
22        Ok(Self {
23            clock,
24            nodes: loader.nodes,
25        })
26    }
27
28    pub async fn run(self) -> Result<()> {
29        let mut ids = Vec::new();
30        let mut futures = Vec::new();
31
32        for (id, node) in self.nodes {
33            ids.push(id);
34            futures.push(tokio::spawn(async move { node.run().await }));
35        }
36
37        let join_all = tokio::spawn(async move {
38            let mut results = Vec::new();
39
40            for future in futures {
41                results.push(future.await?);
42            }
43
44            Ok::<Vec<_>, eyre::Report>(results)
45        });
46
47        tokio::select! {
48            _ = tokio::signal::ctrl_c() => {}
49            results = join_all => {
50                let results = results??;
51                for result in results {
52                    if let Err(error) = result {
53                        eprintln!("{:?}", error);
54                    }
55                }
56            }
57        }
58
59        Ok(())
60    }
61}