flarrow_runtime/
runtime.rs1use std::{collections::HashMap, sync::Arc};
2
3use crate::prelude::*;
4
5pub struct DataflowRuntime {
6 pub clock: Arc<uhlc::HLC>,
7
8 pub nodes: HashMap<NodeID, RuntimeNode>,
9}
10
11impl DataflowRuntime {
12 pub async fn new(
13 flows: Flows,
14 url_plugin: Option<RuntimeUrlPlugin>,
15 load: impl AsyncFn(&mut Loader) -> Result<()>,
16 ) -> eyre::Result<Self> {
17 let clock = Arc::new(uhlc::HLC::default());
18 let mut loader = Loader::new(
19 flows,
20 url_plugin.unwrap_or(
21 RuntimeUrlPlugin::new_statically_linked::<UrlDefaultPlugin>()
22 .await
23 .wrap_err("Failed to load URL plugin")?,
24 ),
25 clock.clone(),
26 );
27
28 load(&mut loader).await.wrap_err("Failed to load flows")?;
29
30 Ok(Self {
31 clock,
32 nodes: loader.nodes,
33 })
34 }
35
36 pub async fn run(self) -> Result<()> {
37 let mut ids = Vec::new();
38 let mut futures = Vec::new();
39
40 for (id, node) in self.nodes {
41 ids.push(id);
42 futures.push(tokio::spawn(async move { node.run().await }));
43 }
44
45 let join_all = tokio::spawn(async move {
46 let mut results = Vec::new();
47
48 for future in futures {
49 results.push(future.await?);
50 }
51
52 Ok::<Vec<_>, eyre::Report>(results)
53 });
54
55 tokio::select! {
56 _ = tokio::signal::ctrl_c() => {}
57 results = join_all => {
58 let results = results??;
59 for result in results {
60 if let Err(error) = result {
61 eprintln!("{:?}", error);
62 }
63 }
64 }
65 }
66
67 Ok(())
68 }
69}