flarrow_runtime/
runtime.rs1use std::{collections::HashMap, sync::Arc};
2
3use crate::prelude::*;
4
5pub struct Runtime {
7 pub clock: Arc<HLC>,
8
9 pub file_ext: Arc<FileExtManager>,
10 pub url_scheme: Arc<UrlSchemeManager>,
11
12 pub nodes: HashMap<NodeLayout, RuntimeNode>,
13}
14
15impl Runtime {
16 pub async fn new(
18 plugins: impl AsyncFnOnce(
19 &mut FileExtManagerBuilder,
20 &mut UrlSchemeManagerBuilder,
21 ) -> Result<()>,
22 ) -> Result<Self> {
23 let mut file_ext = FileExtManagerBuilder::new().await?;
24 let mut url_scheme = UrlSchemeManagerBuilder::new().await?;
25
26 file_ext
27 .load_statically_linked_plugin::<DefaultFileExtPlugin>()
28 .await?;
29
30 url_scheme
31 .load_statically_linked_plugin::<DefaultUrlSchemePlugin>()
32 .await?;
33
34 plugins(&mut file_ext, &mut url_scheme).await?;
35
36 Ok(Self {
37 clock: Arc::new(HLC::default()),
38 file_ext: Arc::new(FileExtManager::new(file_ext.plugins)),
39 url_scheme: Arc::new(UrlSchemeManager::new(url_scheme.plugins)),
40 nodes: HashMap::new(),
41 })
42 }
43
44 pub async fn run(
46 mut self,
47 flows: Flows,
48 nodes: impl AsyncFnOnce(&mut NodeLoader) -> Result<()>,
49 ) -> Result<()> {
50 let mut node_loader =
51 NodeLoader::new(self.file_ext, self.url_scheme, self.clock.clone(), flows);
52
53 nodes(&mut node_loader).await?;
54
55 self.nodes.extend(node_loader.nodes);
56
57 println!("Starting runtime... (press Ctrl+C to stop)");
58
59 let mut tasks = Vec::new();
60 for (layout, node) in self.nodes {
61 tasks.push(tokio::spawn(async move {
62 node.run().await.wrap_err(format!(
63 "Node '{}' (uuid: {}) failed",
64 layout.label, layout.uuid,
65 ))
66 }));
67 }
68
69 let join_all = tokio::spawn(async move {
70 let mut reports: Vec<eyre::Report> = Vec::new();
71
72 let mut is_ok = true;
73
74 for task in tasks {
75 let result = task.await?;
76
77 if let Err(report) = result {
78 is_ok = false;
79 reports.push(report);
80 }
81 }
82
83 let reports: eyre::Report = {
84 let report_str: String = reports.iter().fold(
85 "The runtime encountered multiple errors:".to_string(),
86 |acc, report| format!("{}\n\n{:?}", acc, report),
87 );
88
89 eyre::Report::msg(report_str)
90 };
91
92 match is_ok {
93 true => Ok(()),
94 false => Err(reports),
95 }
96 });
97
98 tokio::select! {
99 _ = tokio::signal::ctrl_c() => { Ok(()) },
100 results = join_all => { results? }
101 }
102 }
103}