flarrow_runtime/
loader.rs1use std::{collections::HashMap, sync::Arc};
2
3use crate::prelude::*;
4
5pub struct NodeLoader {
7 pub file_ext: Arc<FileExtManager>,
8 pub url_scheme: Arc<UrlSchemeManager>,
9
10 pub clock: Arc<HLC>,
11
12 pub flows: Flows,
13 pub nodes: HashMap<NodeLayout, RuntimeNode>,
14}
15
16impl NodeLoader {
17 pub fn new(
18 file_ext: Arc<FileExtManager>,
19 url_scheme: Arc<UrlSchemeManager>,
20 clock: Arc<HLC>,
21 flows: Flows,
22 ) -> Self {
23 Self {
24 file_ext,
25 url_scheme,
26 clock,
27 flows,
28 nodes: HashMap::new(),
29 }
30 }
31
32 pub async fn load<T: Node + 'static>(
34 &mut self,
35 source: NodeLayout,
36 configuration: serde_yml::Value,
37 ) -> Result<()> {
38 let (inputs, outputs, queries, queryables) =
39 self.flows.node_io(self.clock.clone(), source.clone());
40
41 let node = RuntimeNode::StaticallyLinked(
42 T::new(inputs, outputs, queries, queryables, configuration)
43 .await?
44 .wrap_err(format!(
45 "Node '{}' (uuid: {}) failed to initialize",
46 source.label, source.uuid,
47 ))?,
48 );
49
50 tracing::debug!(
51 "Node '{}' (uuid: {}) loaded from Rust struct {}",
52 source.label,
53 source.uuid,
54 std::any::type_name::<T>()
55 );
56
57 self.nodes.insert(source, node);
58 Ok(())
59 }
60
61 pub async fn load_url(
64 &mut self,
65 url: Url,
66 source: NodeLayout,
67 configuration: serde_yml::Value,
68 ) -> Result<()> {
69 let (inputs, outputs, queries, queryables) =
70 self.flows.node_io(self.clock.clone(), source.clone());
71
72 let node = self
73 .url_scheme
74 .load(
75 url.clone(),
76 inputs,
77 outputs,
78 queries,
79 queryables,
80 configuration,
81 self.file_ext.clone(),
82 )
83 .await?;
84
85 tracing::debug!(
86 "Node '{}' (uuid: {}) loaded from URL {:?}",
87 source.label,
88 source.uuid,
89 url
90 );
91
92 self.nodes.insert(source, node);
93
94 Ok(())
95 }
96}