flarrow_runtime/
node.rs

1use std::{collections::HashMap, sync::Arc};
2
3use url::Url;
4
5use crate::prelude::*;
6
7pub struct Loader {
8    pub flows: Flows,
9    pub url_plugin: RuntimeUrlPlugin,
10
11    pub clock: Arc<uhlc::HLC>,
12    pub nodes: HashMap<NodeID, RuntimeNode>,
13}
14
15impl Loader {
16    pub fn new(flows: Flows, url_plugin: RuntimeUrlPlugin, clock: Arc<uhlc::HLC>) -> Self {
17        Loader {
18            flows,
19            url_plugin,
20            clock,
21            nodes: HashMap::new(),
22        }
23    }
24
25    pub async fn load_statically_linked<T: Node + 'static>(
26        &mut self,
27        node: NodeID,
28        configuration: serde_yml::Value,
29    ) -> eyre::Result<()> {
30        let inputs = Inputs::new(node, self.flows.receivers.clone());
31        let outputs = Outputs::new(node, self.clock.clone(), self.flows.senders.clone());
32
33        self.nodes.insert(
34            node,
35            RuntimeNode::StaticallyLinked(
36                T::new(inputs, outputs, configuration)
37                    .await
38                    .wrap_err("Failed to await statically linked node")?
39                    .wrap_err("Failed to create statically linked node")?,
40            ),
41        );
42
43        Ok(())
44    }
45
46    pub async fn load_from_url(
47        &mut self,
48        node: NodeID,
49        url: Url,
50        configuration: serde_yml::Value,
51    ) -> eyre::Result<()> {
52        let inputs = Inputs::new(node, self.flows.receivers.clone());
53        let outputs = Outputs::new(node, self.clock.clone(), self.flows.senders.clone());
54
55        let handle = self
56            .url_plugin
57            .load(url.clone(), inputs, outputs, configuration)
58            .await
59            .wrap_err(format!("Failed to await node from URL: {}", url))?
60            .wrap_err(format!("Failed to create node from URL: {}", url))?;
61
62        self.nodes.insert(node, handle);
63
64        Ok(())
65    }
66}