flarrow_runtime/
runtime.rs

1use std::{collections::HashMap, sync::Arc};
2
3use crate::prelude::*;
4
5pub struct DataflowRuntime {
6    pub clock: Arc<uhlc::HLC>,
7
8    pub nodes: HashMap<NodeID, RuntimeNode>,
9}
10
11impl DataflowRuntime {
12    pub async fn new(
13        flows: Flows,
14        url_plugin: Option<RuntimeUrlPlugin>,
15        load: impl AsyncFn(&mut Loader) -> Result<()>,
16    ) -> eyre::Result<Self> {
17        let clock = Arc::new(uhlc::HLC::default());
18        let mut loader = Loader::new(
19            flows,
20            url_plugin.unwrap_or(
21                RuntimeUrlPlugin::new_statically_linked::<UrlDefaultPlugin>()
22                    .await
23                    .wrap_err("Failed to load URL plugin")?,
24            ),
25            clock.clone(),
26        );
27
28        load(&mut loader).await.wrap_err("Failed to load flows")?;
29
30        Ok(Self {
31            clock,
32            nodes: loader.nodes,
33        })
34    }
35
36    pub async fn run(self) -> Result<()> {
37        let mut ids = Vec::new();
38        let mut futures = Vec::new();
39
40        for (id, node) in self.nodes {
41            ids.push(id);
42            futures.push(tokio::spawn(async move { node.run().await }));
43        }
44
45        let join_all = tokio::spawn(async move {
46            let mut results = Vec::new();
47
48            for future in futures {
49                results.push(future.await?);
50            }
51
52            Ok::<Vec<_>, eyre::Report>(results)
53        });
54
55        tokio::select! {
56            _ = tokio::signal::ctrl_c() => {}
57            results = join_all => {
58                let results = results??;
59                for result in results {
60                    if let Err(error) = result {
61                        eprintln!("{:?}", error);
62                    }
63                }
64            }
65        }
66
67        Ok(())
68    }
69}