flarrow_runtime/
runtime.rs1use std::{collections::HashMap, sync::Arc};
2
3use crate::prelude::*;
4
5pub struct DataflowRuntime {
6 pub clock: Arc<uhlc::HLC>,
7
8 pub nodes: HashMap<NodeID, RuntimeNode>,
9}
10
11impl DataflowRuntime {
12 pub async fn new(
13 flows: Flows,
14 url_plugin: RuntimeUrlPlugin,
15 load: impl AsyncFn(&mut Loader) -> Result<()>,
16 ) -> eyre::Result<Self> {
17 let clock = Arc::new(uhlc::HLC::default());
18 let mut loader = Loader::new(flows, url_plugin, clock.clone());
19
20 load(&mut loader).await.wrap_err("Failed to load flows")?;
21
22 Ok(Self {
23 clock,
24 nodes: loader.nodes,
25 })
26 }
27
28 pub async fn run(self) -> Result<()> {
29 let mut ids = Vec::new();
30 let mut futures = Vec::new();
31
32 for (id, node) in self.nodes {
33 ids.push(id);
34 futures.push(tokio::spawn(async move { node.run().await }));
35 }
36
37 let join_all = tokio::spawn(async move {
38 let mut results = Vec::new();
39
40 for future in futures {
41 results.push(future.await?);
42 }
43
44 Ok::<Vec<_>, eyre::Report>(results)
45 });
46
47 tokio::select! {
48 _ = tokio::signal::ctrl_c() => {}
49 results = join_all => {
50 let results = results??;
51 for result in results {
52 if let Err(error) = result {
53 eprintln!("{:?}", error);
54 }
55 }
56 }
57 }
58
59 Ok(())
60 }
61}