flarrow_runtime/
loader.rs

1use std::{collections::HashMap, sync::Arc};
2
3use crate::prelude::*;
4
5/// Loader struct passed to the user closure to load nodes
6pub struct NodeLoader {
7    pub file_ext: Arc<FileExtManager>,
8    pub url_scheme: Arc<UrlSchemeManager>,
9
10    pub clock: Arc<HLC>,
11
12    pub flows: Flows,
13    pub nodes: HashMap<NodeLayout, RuntimeNode>,
14}
15
16impl NodeLoader {
17    pub fn new(
18        file_ext: Arc<FileExtManager>,
19        url_scheme: Arc<UrlSchemeManager>,
20        clock: Arc<HLC>,
21        flows: Flows,
22    ) -> Self {
23        Self {
24            file_ext,
25            url_scheme,
26            clock,
27            flows,
28            nodes: HashMap::new(),
29        }
30    }
31
32    /// Load a node from a Rust struct directly (statically linked)
33    pub async fn load<T: Node + 'static>(
34        &mut self,
35        source: NodeLayout,
36        configuration: serde_yml::Value,
37    ) -> Result<()> {
38        let (inputs, outputs, queries, queryables) =
39            self.flows.node_io(self.clock.clone(), source.clone());
40
41        let node = RuntimeNode::StaticallyLinked(
42            T::new(inputs, outputs, queries, queryables, configuration)
43                .await?
44                .wrap_err(format!(
45                    "Node '{}' (uuid: {}) failed to initialize",
46                    source.label, source.uuid,
47                ))?,
48        );
49
50        tracing::debug!(
51            "Node '{}' (uuid: {}) loaded from Rust struct {}",
52            source.label,
53            source.uuid,
54            std::any::type_name::<T>()
55        );
56
57        self.nodes.insert(source, node);
58        Ok(())
59    }
60
61    /// Load a node from an URL. Be careful, you must ensure that the runtime has the necessary plugins to process this URL.
62    /// By default you can pass all URL for the builtins nodes (builtin://) and all URL for dynamic libraries on the computer (file:///path/to/library.so)
63    pub async fn load_url(
64        &mut self,
65        url: Url,
66        source: NodeLayout,
67        configuration: serde_yml::Value,
68    ) -> Result<()> {
69        let (inputs, outputs, queries, queryables) =
70            self.flows.node_io(self.clock.clone(), source.clone());
71
72        let node = self
73            .url_scheme
74            .load(
75                url.clone(),
76                inputs,
77                outputs,
78                queries,
79                queryables,
80                configuration,
81                self.file_ext.clone(),
82            )
83            .await?;
84
85        tracing::debug!(
86            "Node '{}' (uuid: {}) loaded from URL {:?}",
87            source.label,
88            source.uuid,
89            url
90        );
91
92        self.nodes.insert(source, node);
93
94        Ok(())
95    }
96}