flarrow_runtime/
runtime.rs

1use std::{collections::HashMap, sync::Arc};
2
3use crate::prelude::*;
4
5/// Create a new runtime instance.
6pub struct Runtime {
7    pub clock: Arc<HLC>,
8
9    pub file_ext: Arc<FileExtManager>,
10    pub url_scheme: Arc<UrlSchemeManager>,
11
12    pub nodes: HashMap<NodeLayout, RuntimeNode>,
13}
14
15impl Runtime {
16    /// Create a new runtime instance with plugins.
17    pub async fn new(
18        plugins: impl AsyncFnOnce(
19            &mut FileExtManagerBuilder,
20            &mut UrlSchemeManagerBuilder,
21        ) -> Result<()>,
22    ) -> Result<Self> {
23        let mut file_ext = FileExtManagerBuilder::new().await?;
24        let mut url_scheme = UrlSchemeManagerBuilder::new().await?;
25
26        file_ext
27            .load_statically_linked_plugin::<DefaultFileExtPlugin>()
28            .await?;
29
30        url_scheme
31            .load_statically_linked_plugin::<DefaultUrlSchemePlugin>()
32            .await?;
33
34        plugins(&mut file_ext, &mut url_scheme).await?;
35
36        Ok(Self {
37            clock: Arc::new(HLC::default()),
38            file_ext: Arc::new(FileExtManager::new(file_ext.plugins)),
39            url_scheme: Arc::new(UrlSchemeManager::new(url_scheme.plugins)),
40            nodes: HashMap::new(),
41        })
42    }
43
44    /// Load all nodes with the flows provided and run them all.
45    pub async fn run(
46        mut self,
47        flows: Flows,
48        nodes: impl AsyncFnOnce(&mut NodeLoader) -> Result<()>,
49    ) -> Result<()> {
50        let mut node_loader =
51            NodeLoader::new(self.file_ext, self.url_scheme, self.clock.clone(), flows);
52
53        nodes(&mut node_loader).await?;
54
55        self.nodes.extend(node_loader.nodes);
56
57        println!("Starting runtime... (press Ctrl+C to stop)");
58
59        let mut tasks = Vec::new();
60        for (layout, node) in self.nodes {
61            tasks.push(tokio::spawn(async move {
62                node.run().await.wrap_err(format!(
63                    "Node '{}' (uuid: {}) failed",
64                    layout.label, layout.uuid,
65                ))
66            }));
67        }
68
69        let join_all = tokio::spawn(async move {
70            let mut reports: Vec<eyre::Report> = Vec::new();
71
72            let mut is_ok = true;
73
74            for task in tasks {
75                let result = task.await?;
76
77                if let Err(report) = result {
78                    is_ok = false;
79                    reports.push(report);
80                }
81            }
82
83            let reports: eyre::Report = {
84                let report_str: String = reports.iter().fold(
85                    "The runtime encountered multiple errors:".to_string(),
86                    |acc, report| format!("{}\n\n{:?}", acc, report),
87                );
88
89                eyre::Report::msg(report_str)
90            };
91
92            match is_ok {
93                true => Ok(()),
94                false => Err(reports),
95            }
96        });
97
98        tokio::select! {
99            _ = tokio::signal::ctrl_c() => { Ok(()) },
100            results = join_all => { results? }
101        }
102    }
103}