1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
use std::collections::HashMap;
use std::sync::Arc;
use log::{debug, info, trace};
use url::Url;
use flowcore::errors::*;
use flowcore::Implementation;
use flowcore::meta_provider::Provider;
use flowcore::model::flow_manifest::FlowManifest;
use flowcore::model::lib_manifest::{
ImplementationLocator::Native, ImplementationLocator::Wasm, LibraryManifest,
};
use crate::wasm;
/// A `Loader` is responsible for loading a compiled `Flow` from it's `Manifest`, loading the required
/// libraries needed by the flow and keeping track of the `Function` `Implementations` that
/// will be used to execute it.
#[derive(Default)]
pub struct Loader {
/// HashMap of libraries that have already had their manifests read. The key is the library
/// reference Url (e.g. lib:://context) and the entry is a tuple of the LibraryManifest
/// and the resolved Url of where the manifest was read from
loaded_libraries_manifests: HashMap<Url, (LibraryManifest, Url)>,
loaded_lib_implementations: HashMap<Url, Arc<dyn Implementation>>,
}
impl Loader {
/// Create a new `Loader` that will be used to parse the manifest file create structs in
/// memory ready for execution
pub fn new() -> Self {
Loader {
loaded_libraries_manifests: HashMap::<Url, (LibraryManifest, Url)>::new(),
loaded_lib_implementations: HashMap::<Url, Arc<dyn Implementation>>::new(),
}
}
/// Return a HashMap of the Implementations loaded, with the Url for the function
/// as the key
pub fn get_lib_implementations(&self) -> &HashMap<Url, Arc<dyn Implementation>> {
&self.loaded_lib_implementations
}
/// Load all the functions defined in a manifest, and then find all the
/// implementations required for function execution later.
///
/// A flow is dynamically loaded, so none of the implementations it brings can be static,
/// they must all be dynamically loaded from WASM. Each one will be wrapped in a
/// Native "WasmExecutor" implementation to make it present the same interface as a native
/// implementation.
///
/// The run-time that (statically) links this library may provide some native implementations
/// already, before this is called.
///
/// It may have processes that use Implementations in libraries. Those must have been
/// loaded previously. They maybe Native or Wasm implementations, but the Wasm ones will
/// have been wrapped in a Native "WasmExecutor" implementation to make it appear native.
/// Thus, all library implementations found will be Native.
pub fn load_flow(
&mut self,
provider: &dyn Provider,
flow_manifest_url: &Url,
) -> Result<FlowManifest> {
debug!("Loading flow manifest from '{}'", flow_manifest_url);
let (mut flow_manifest, resolved_url) =
FlowManifest::load(provider, flow_manifest_url)
.chain_err(|| format!("Could not load manifest from: '{}'", flow_manifest_url))?;
self.load_library_implementations(provider, &flow_manifest)
.chain_err(|| format!("Could not load libraries referenced by manifest at: {}",
resolved_url))?;
// Find the implementations for all functions used in this flow
self.resolve_implementations(provider, &mut flow_manifest, &resolved_url)
.chain_err(|| "Could not resolve implementations required for flow execution")?;
Ok(flow_manifest)
}
/// Load the library manifest if not already loaded
fn load_manifest_if_needed(
&mut self,
provider: &dyn Provider,
lib_root_url: &Url,
) -> Result<()> {
if self.loaded_libraries_manifests.get(lib_root_url).is_none() {
info!("Attempting to load library '{}'", lib_root_url);
let new_manifest_tuple =
LibraryManifest::load(provider, lib_root_url).chain_err(|| {
format!("Could not load library with root url: '{}'", lib_root_url)
})?;
self.loaded_libraries_manifests
.insert(lib_root_url.clone(), new_manifest_tuple);
}
Ok(())
}
fn get_manifest_tuple(
&mut self,
provider: &dyn Provider,
lib_root_url: &Url,
) -> Result<(LibraryManifest, Url)> {
self.load_manifest_if_needed(provider, lib_root_url)?;
let tuple = self
.loaded_libraries_manifests
.get(lib_root_url)
.ok_or_else(|| {
std::io::Error::new(
std::io::ErrorKind::Other,
"Could not find (supposedly already loaded) library manifest",
)
})?;
Ok(tuple.clone()) // TODO avoid the clone and return a reference
}
/// Load libraries implementations referenced in the flow manifest
fn load_library_implementations(
&mut self,
provider: &dyn Provider,
manifest: &FlowManifest,
) -> Result<()> {
for library_implementation_reference in manifest.get_lib_references() {
// zero out the path to the implementation to get the root lib url
let mut lib_root_url = library_implementation_reference.clone();
lib_root_url.set_path("");
let manifest_tuple = self.get_manifest_tuple(provider, &lib_root_url)?;
self.add_lib_implementation(
provider,
library_implementation_reference,
&manifest_tuple,
)?;
}
Ok(())
}
/// Add a library and all the implementations it contains to the run-time by adding
/// all its ImplementationLocators from the manifest to the
/// table for this run-time, so that then when we try to load a flow that references functions
/// in the library, they can be found.
pub fn add_lib(
&mut self,
provider: &dyn Provider,
lib_manifest: LibraryManifest,
lib_manifest_url: &Url,
) -> Result<()> {
if self
.loaded_libraries_manifests
.get(lib_manifest_url)
.is_none()
{
let lib_manifest_tuple = (lib_manifest.clone(), lib_manifest_url.clone());
// Load all the implementations in the library
for (implementation_reference, locator) in lib_manifest.locators {
// if we don't already have an implementation loaded for that reference
if self
.loaded_lib_implementations
.get(&implementation_reference)
.is_none()
{
// create or find the implementation we need
let implementation = match locator {
Wasm(wasm_source_relative) => {
// Path to the wasm source could be relative to the URL where we loaded the manifest from
let wasm_url = lib_manifest_url
.join(&wasm_source_relative)
.map_err(|e| e.to_string())?;
debug!("Looking for wasm source: '{}'", wasm_url);
// Wasm implementation being added. Wrap it with the Wasm Native Implementation
let wasm_executor = wasm::load(provider, &wasm_url)?;
Arc::new(wasm_executor) as Arc<dyn Implementation>
}
// Native implementation from Lib
Native(implementation) => implementation,
};
self.loaded_lib_implementations
.insert(implementation_reference, implementation);
}
}
// track the fact we have already loaded this library
self.loaded_libraries_manifests
.insert(lib_manifest_url.clone(), lib_manifest_tuple);
info!("Loaded library: '{}'", lib_manifest_url);
}
Ok(())
}
/// Add a library implementation to the table for this run-time, for use in execution
pub fn add_lib_implementation(
&mut self,
provider: &dyn Provider,
implementation_reference: &Url,
lib_manifest_tuple: &(LibraryManifest, Url),
) -> Result<()> {
// if we don't already have an implementation loaded for that reference
if self
.loaded_lib_implementations
.get(implementation_reference)
.is_none()
{
let locator = lib_manifest_tuple
.0
.locators
.get(implementation_reference)
.ok_or(format!(
"Could not find ImplementationLocator for '{}' in library",
implementation_reference
))?;
// find the implementation we need from the locator
let implementation = match locator {
Wasm(wasm_source_relative) => {
// Path to the wasm source could be relative to the URL where we loaded the manifest from
let wasm_url = lib_manifest_tuple
.1
.join(wasm_source_relative)
.map_err(|e| e.to_string())?;
debug!("Looking for wasm source: '{}'", wasm_url);
// Wasm implementation being added. Wrap it with the Wasm Native Implementation
let wasm_executor = wasm::load(provider, &wasm_url)?;
Arc::new(wasm_executor) as Arc<dyn Implementation>
}
Native(native_impl) => native_impl.clone(),
};
self.loaded_lib_implementations
.insert(implementation_reference.clone(), implementation);
}
Ok(())
}
/// Resolve or "find" all the implementations of functions for a flow
/// The `root_url` is the url of the manifest or the directory where the manifest is located
/// and is used in resolving relative references to other files.
pub fn resolve_implementations(
&mut self,
provider: &dyn Provider,
flow_manifest: &mut FlowManifest,
manifest_url: &Url,
) -> Result<()> {
debug!("Resolving implementations");
// find in a library, or load the supplied implementation - as specified by the source
for function in flow_manifest.get_functions() {
match function.implementation_location().split_once(':') {
Some(("lib", _)) | Some(("context", _)) => {
let implementation_url = Url::parse(function.implementation_location())
.chain_err(|| {
"Could not create a Url from a lib: implementation location"
})?;
let implementation = self
.loaded_lib_implementations
.get(&implementation_url)
.ok_or_else(|| format!(
"Implementation at '{}' is not in loaded libraries",
function.implementation_location()
))?;
trace!(
"\tFunction #{}({}) implementation loaded from '{}'",
function.id(), function.get_flow_id(), function.implementation_location()
);
// Set the location of the implementation of this function
function.set_implementation(implementation.clone()); // Only clone of an Arc, not the object
}
/*** These below are not 'lib:' references - hence are supplied implementations ***/
_ => {
let implementation_url = manifest_url
.join(function.implementation_location())
.map_err(|_| {
format!(
"Could not create supplied implementation url joining '{}' to manifest Url: {}",
function.implementation_location(), manifest_url
)
})?;
// load the actual implementation of the function
let wasm_executor = wasm::load(provider, &implementation_url)?;
function.set_implementation(Arc::new(wasm_executor) as Arc<dyn Implementation>);
}
}
}
Ok(())
}
}
#[cfg(test)]
mod test {
use super::Loader;
#[test]
fn test_create() {
let _ = Loader::new();
}
}