1use std::collections::HashMap;
2use std::panic;
3use std::sync::{Arc, RwLock};
4use std::thread;
5use std::thread::JoinHandle;
6
7use log::{debug, error, info, trace};
8use url::Url;
9
10use flowcore::errors::{bail, Result, ResultExt};
11use flowcore::model::lib_manifest::{
12 ImplementationLocator::Native, ImplementationLocator::RelativePath, LibraryManifest,
13};
14use flowcore::provider::Provider;
15use flowcore::Implementation;
16
17use crate::job::Payload;
18use crate::wasm;
19
20pub struct Executor {
24 loaded_lib_manifests: Arc<RwLock<HashMap<Url, (LibraryManifest, Url)>>>,
28 executors: Vec<JoinHandle<()>>,
29}
30
31impl Default for Executor {
32 fn default() -> Self {
33 Self::new()
34 }
35}
36
37impl Executor {
38 #[must_use]
40 pub fn new() -> Self {
41 Executor {
42 loaded_lib_manifests: Arc::new(RwLock::new(
43 HashMap::<Url, (LibraryManifest, Url)>::new(),
44 )),
45 executors: vec![],
46 }
47 }
48
49 pub fn add_lib(&mut self, lib_manifest: LibraryManifest, resolved_url: Url) -> Result<()> {
59 let mut lib_manifests = self
60 .loaded_lib_manifests
61 .write()
62 .map_err(|_| "Could not gain write access to loaded library manifests map")?;
63
64 debug!(
65 "Manifest of library '{}' loaded from '{}' and added to Executor",
66 lib_manifest.lib_url, resolved_url
67 );
68
69 lib_manifests.insert(lib_manifest.lib_url.clone(), (lib_manifest, resolved_url));
70
71 Ok(())
72 }
73
74 pub fn start(
80 &mut self,
81 provider: &Arc<dyn Provider>,
82 number_of_executors: usize,
83 job_service: &str,
84 results_service: &str,
85 control_service: &str,
86 ) {
87 let loaded_implementations =
88 Arc::new(RwLock::new(HashMap::<Url, Arc<dyn Implementation>>::new()));
89
90 info!("Starting {number_of_executors} executor threads");
91 for executor_number in 0..number_of_executors {
92 let thread_provider = provider.clone();
93 let thread_context = zmq::Context::new();
94 let thread_implementations = loaded_implementations.clone();
95 let thread_loaded_manifests = self.loaded_lib_manifests.clone();
96 let results_sink = results_service.into();
97 let job_source = job_service.into();
98 let control_address = control_service.into();
99 self.executors.push(thread::spawn(move || {
100 trace!("Executor #{executor_number} entering execution loop");
101 if let Err(e) = execution_loop(
102 &thread_provider,
103 &format!("Executor #{executor_number}"),
104 &thread_context,
105 &thread_implementations,
106 &thread_loaded_manifests,
107 job_source,
108 results_sink,
109 control_address,
110 ) {
111 error!("Execution loop error: {e}");
112 }
113 }));
114 }
115 }
116
117 pub fn wait(self) {
119 for executor in self.executors {
120 let _ = executor.join();
121 }
122 }
123}
124
125#[allow(clippy::too_many_arguments)]
126#[allow(clippy::needless_pass_by_value)]
127fn execution_loop(
128 provider: &Arc<dyn Provider>,
129 name: &str,
130 context: &zmq::Context,
131 loaded_implementations: &Arc<RwLock<HashMap<Url, Arc<dyn Implementation>>>>,
132 loaded_lib_manifests: &Arc<RwLock<HashMap<Url, (LibraryManifest, Url)>>>,
133 job_service: String,
134 results_service: String,
135 control_address: String,
136) -> Result<()> {
137 let job_source = context
138 .socket(zmq::PULL)
139 .map_err(|e| format!("Could not create PULL end of job socket: {e}"))?;
140 job_source
141 .connect(&job_service)
142 .map_err(|e| format!("Could not connect to PULL end of job socket: '{job_service}' {e}"))?;
143
144 let results_sink = context
145 .socket(zmq::PUSH)
146 .map_err(|e| format!("Could not create PUSH end of results socket: {e}"))?;
147 results_sink
148 .connect(&results_service)
149 .map_err(|e| format!("Could not connect to PUSH end of results socket: {e}"))?;
150
151 let control_socket = context
152 .socket(zmq::SocketType::SUB)
153 .map_err(|e| format!("Could not create SUB end of control socket: {e}"))?;
154 control_socket
155 .connect(&control_address)
156 .map_err(|e| format!("Could not connect to SUB end of control socket: {e}"))?;
157 control_socket
158 .set_subscribe(&[])
159 .map_err(|e| format!("Could not subscribe to SUB end of control socket: {e}"))?;
160
161 let mut process_jobs = true;
162
163 set_panic_hook();
164
165 let mut items: Vec<zmq::PollItem> = vec![
166 job_source.as_poll_item(zmq::POLLIN),
167 control_socket.as_poll_item(zmq::POLLIN),
168 ];
169
170 while process_jobs {
171 trace!("{name} waiting for a job to execute or a DONE signal");
172 match zmq::poll(&mut items, -1).map_err(|_| "Error while polling for Jobs to execute") {
173 Ok(_) => {
174 if items
175 .first()
176 .ok_or("Could not get poll item 0")?
177 .is_readable()
178 {
179 let msg = job_source
180 .recv_msg(0)
181 .map_err(|_| "Error receiving Job for execution")?;
182 let message_string = msg.as_str().ok_or("Could not get message as str")?;
183 let payload: Payload = serde_json::from_str(message_string)
184 .map_err(|_| "Could not deserialize Message to Job")?;
185
186 trace!("Job #{}: Received by {}", payload.job_id, name);
187 match execute_job(
188 provider,
189 &payload,
190 &results_sink,
191 name,
192 &loaded_implementations.clone(),
193 &loaded_lib_manifests.clone(),
194 ) {
195 Ok(keep_processing) => process_jobs = keep_processing,
196 Err(e) => error!("{e}"),
197 }
198 }
199
200 if items
201 .get(1)
202 .ok_or("Could not get poll item 1")?
203 .is_readable()
204 {
205 let msg = control_socket
206 .recv_msg(0)
207 .map_err(|_| "Error receiving Control message")?;
208 match msg.as_str().ok_or("Could not get message as str") {
209 Ok("DONE") => {
210 trace!("'DONE' message received in executor");
211 return Ok(());
212 }
213 Ok(_) => error!("Unexpected Control message"),
214 _ => error!("Error parsing Control message"),
215 }
216 }
217 }
218 Err(e) => {
219 error!("Error while polling for Jobs or Control messages: {e}");
220 }
221 }
222 }
223
224 Ok(())
225}
226
227fn set_panic_hook() {
229 panic::set_hook(Box::new(|panic_info| {
230 if let Some(location) = panic_info.location() {
231 error!(
232 "Panic in file '{}' at line {}",
233 location.file(),
234 location.line()
235 );
236 }
237 }));
238}
239
240fn execute_job(
242 provider: &Arc<dyn Provider>,
243 payload: &Payload,
244 results_sink: &zmq::Socket,
245 name: &str,
246 loaded_implementations: &Arc<RwLock<HashMap<Url, Arc<dyn Implementation>>>>,
247 loaded_lib_manifests: &Arc<RwLock<HashMap<Url, (LibraryManifest, Url)>>>,
248) -> Result<bool> {
249 let mut implementations = loaded_implementations
251 .write()
252 .map_err(|_| "Could not gain read access to loaded implementations map")?;
253 if implementations.get(&payload.implementation_url).is_none() {
254 trace!(
255 "Implementation '{}' is not loaded",
256 payload.implementation_url
257 );
258 let implementation = match payload.implementation_url.scheme() {
259 "lib" => {
260 let mut lib_root_url = payload.implementation_url.clone();
261 lib_root_url.set_path("");
262 load_referenced_implementation(
263 provider,
264 &lib_root_url,
265 loaded_lib_manifests,
266 &payload.implementation_url,
267 )?
268 }
269 "context" => {
270 let mut lib_root_url = payload.implementation_url.clone();
271 let _ = lib_root_url.set_host(Some(""));
272 lib_root_url.set_path("");
273 load_referenced_implementation(
274 provider,
275 &lib_root_url,
276 loaded_lib_manifests,
277 &payload.implementation_url,
278 )?
279 }
280 "file" => Arc::new(wasm::load(provider, &payload.implementation_url)?),
281 _ => bail!("Unsupported scheme on implementation_url"),
282 };
283 implementations.insert(payload.implementation_url.clone(), implementation);
284 trace!(
285 "Implementation '{}' added to executor",
286 payload.implementation_url
287 );
288 }
289
290 let implementation = implementations
291 .get(&payload.implementation_url)
292 .ok_or("Could not find implementation")?;
293
294 trace!("Job #{}: Started executing on '{name}'", payload.job_id);
295 let result = implementation.run(&payload.input_set);
296 trace!("Job #{}: Finished executing on '{name}'", payload.job_id);
297
298 results_sink
299 .send(
300 serde_json::to_string(&(payload.job_id, result))?.as_bytes(),
301 0,
302 )
303 .map_err(|_| "Could not send result of Job")?;
304
305 Ok(true)
306}
307
308fn load_referenced_implementation(
310 provider: &Arc<dyn Provider>,
311 lib_root_url: &Url,
312 loaded_lib_manifests: &Arc<RwLock<HashMap<Url, (LibraryManifest, Url)>>>,
313 implementation_url: &Url,
314) -> Result<Arc<dyn Implementation>> {
315 let (lib_manifest, resolved_lib_url) =
316 get_lib_manifest_tuple(provider, loaded_lib_manifests, lib_root_url)?;
317
318 let locator = lib_manifest
319 .locators
320 .get(implementation_url)
321 .ok_or(format!(
322 "Could not find ImplementationLocator for '{implementation_url}' in library"
323 ))?;
324
325 let implementation = match locator {
327 RelativePath(wasm_source_relative) => {
328 let wasm_url = resolved_lib_url
330 .join(wasm_source_relative)
331 .map_err(|e| e.to_string())?;
332 debug!("Attempting to load wasm from source file: '{wasm_url}'");
333 let wasm_executor = wasm::load(provider, &wasm_url)?;
335 Arc::new(wasm_executor) as Arc<dyn Implementation>
336 }
337 Native(native_impl) => native_impl.clone(),
338 };
339
340 Ok(implementation)
341}
342
343fn get_lib_manifest_tuple(
345 provider: &Arc<dyn Provider>,
346 loaded_lib_manifests: &Arc<RwLock<HashMap<Url, (LibraryManifest, Url)>>>,
347 lib_root_url: &Url,
348) -> Result<(LibraryManifest, Url)> {
349 let mut lib_manifests = loaded_lib_manifests
350 .write()
351 .map_err(|_| "Could not get write access to the loaded lib manifests")?;
352
353 if lib_manifests.get(lib_root_url).is_none() {
354 info!("Attempting to load library manifest'{lib_root_url}'");
355 let manifest_tuple = LibraryManifest::load(provider, lib_root_url)
356 .chain_err(|| format!("Could not load library with root url: '{lib_root_url}'"))?;
357 lib_manifests.insert(lib_root_url.clone(), manifest_tuple);
358 }
359
360 lib_manifests
362 .get(lib_root_url)
363 .ok_or_else(|| "Could not find (supposedly already loaded) library manifest".into())
364 .cloned()
365}
366
367#[cfg(test)]
368#[allow(clippy::unwrap_used, clippy::expect_used)]
369mod test {
370 use std::collections::HashMap;
371 use std::sync::{Arc, RwLock};
372
373 use url::Url;
374
375 use flowcore::errors::Result;
376 use flowcore::model::lib_manifest::LibraryManifest;
377 use flowcore::model::metadata::MetaData;
378 use flowcore::provider::Provider;
379 use flowcore::Implementation;
380
381 use crate::job::{Job, Payload};
382
383 use super::Executor;
384
385 fn test_meta_data() -> MetaData {
386 MetaData {
387 name: "test".into(),
388 version: "0.0.0".into(),
389 description: "a test".into(),
390 authors: vec!["me".into()],
391 }
392 }
393
394 #[allow(clippy::module_name_repetitions)]
395 pub struct TestProvider {
396 test_content: &'static str,
397 }
398
399 impl Provider for TestProvider {
400 fn resolve_url(
401 &self,
402 source: &Url,
403 _default_filename: &str,
404 _extensions: &[&str],
405 ) -> Result<(Url, Option<Url>)> {
406 Ok((source.clone(), None))
407 }
408
409 fn get_contents(&self, _url: &Url) -> Result<Vec<u8>> {
410 Ok(self.test_content.as_bytes().to_owned())
411 }
412 }
413
414 struct ManifestTestProvider {
417 test_content: &'static str,
418 }
419
420 impl Provider for ManifestTestProvider {
421 fn resolve_url(
422 &self,
423 _source: &Url,
424 _default_filename: &str,
425 _extensions: &[&str],
426 ) -> Result<(Url, Option<Url>)> {
427 let resolved = Url::parse("file:///tmp/manifest.json").map_err(|e| e.to_string())?;
429 Ok((resolved, None))
430 }
431
432 fn get_contents(&self, _url: &Url) -> Result<Vec<u8>> {
433 Ok(self.test_content.as_bytes().to_owned())
434 }
435 }
436
437 #[test]
438 fn add_a_lib() {
439 let library = LibraryManifest::new(
440 Url::parse("lib://testlib").expect("Could not parse lib url"),
441 test_meta_data(),
442 );
443
444 let mut executor = Executor::new();
445 assert!(executor
446 .add_lib(
447 library,
448 Url::parse("file://fake/lib/location").expect("Could not parse Url")
449 )
450 .is_ok());
451 }
452
453 #[test]
454 fn default_same_as_new() {
455 let default_executor = Executor::default();
456 let new_executor = Executor::new();
457
458 let default_manifests = default_executor
460 .loaded_lib_manifests
461 .read()
462 .expect("Could not read default executor manifests");
463 let new_manifests = new_executor
464 .loaded_lib_manifests
465 .read()
466 .expect("Could not read new executor manifests");
467
468 assert!(
469 default_manifests.is_empty(),
470 "default() executor should have no loaded manifests"
471 );
472 assert!(
473 new_manifests.is_empty(),
474 "new() executor should have no loaded manifests"
475 );
476
477 assert!(
479 default_executor.executors.is_empty(),
480 "default() executor should have no executor threads"
481 );
482 assert!(
483 new_executor.executors.is_empty(),
484 "new() executor should have no executor threads"
485 );
486 }
487
488 #[test]
489 fn add_multiple_libs() {
490 let lib1 = LibraryManifest::new(
491 Url::parse("lib://testlib1").expect("Could not parse lib url"),
492 test_meta_data(),
493 );
494 let lib2 = LibraryManifest::new(
495 Url::parse("lib://testlib2").expect("Could not parse lib url"),
496 MetaData {
497 name: "test2".into(),
498 version: "0.0.1".into(),
499 description: "another test".into(),
500 authors: vec!["someone".into()],
501 },
502 );
503
504 let mut executor = Executor::new();
505 assert!(executor
506 .add_lib(
507 lib1,
508 Url::parse("file://fake/lib1/location").expect("Could not parse Url")
509 )
510 .is_ok());
511 assert!(executor
512 .add_lib(
513 lib2,
514 Url::parse("file://fake/lib2/location").expect("Could not parse Url")
515 )
516 .is_ok());
517
518 let manifests = executor
519 .loaded_lib_manifests
520 .read()
521 .expect("Could not read manifests");
522 assert_eq!(manifests.len(), 2, "Should have two loaded manifests");
523 assert!(
524 manifests.contains_key(&Url::parse("lib://testlib1").expect("Could not parse lib url"))
525 );
526 assert!(
527 manifests.contains_key(&Url::parse("lib://testlib2").expect("Could not parse lib url"))
528 );
529 }
530
531 #[test]
532 fn add_same_lib_twice_overwrites() {
533 let lib_url = Url::parse("lib://testlib").expect("Could not parse lib url");
534
535 let lib1 = LibraryManifest::new(
536 lib_url.clone(),
537 MetaData {
538 name: "original".into(),
539 version: "0.0.0".into(),
540 description: "original lib".into(),
541 authors: vec!["me".into()],
542 },
543 );
544 let lib2 = LibraryManifest::new(
545 lib_url.clone(),
546 MetaData {
547 name: "replacement".into(),
548 version: "1.0.0".into(),
549 description: "replacement lib".into(),
550 authors: vec!["someone_else".into()],
551 },
552 );
553
554 let resolved1 = Url::parse("file://fake/lib/location1").expect("Could not parse Url");
555 let resolved2 = Url::parse("file://fake/lib/location2").expect("Could not parse Url");
556
557 let mut executor = Executor::new();
558 assert!(executor.add_lib(lib1, resolved1).is_ok());
559 assert!(executor.add_lib(lib2, resolved2.clone()).is_ok());
560
561 let manifests = executor
562 .loaded_lib_manifests
563 .read()
564 .expect("Could not read manifests");
565 assert_eq!(
566 manifests.len(),
567 1,
568 "Should have only one manifest after adding the same lib_url twice"
569 );
570
571 let (manifest, resolved_url) = manifests
572 .get(&lib_url)
573 .expect("Could not find manifest for lib url");
574 assert_eq!(
575 manifest.metadata.name, "replacement",
576 "Manifest should be the second (replacement) one"
577 );
578 assert_eq!(
579 manifest.metadata.version, "1.0.0",
580 "Version should be from the replacement manifest"
581 );
582 assert_eq!(
583 *resolved_url, resolved2,
584 "Resolved URL should be from the second add_lib call"
585 );
586 }
587
588 #[test]
589 fn execute_job_unsupported_scheme() {
590 let payload = Payload {
591 job_id: 0,
592 input_set: vec![],
593 implementation_url: Url::parse("http://example.com/some/impl")
594 .expect("Could not parse Url"),
595 };
596
597 let loaded_implementations =
598 Arc::new(RwLock::new(HashMap::<Url, Arc<dyn Implementation>>::new()));
599 let loaded_lib_manifests =
600 Arc::new(RwLock::new(HashMap::<Url, (LibraryManifest, Url)>::new()));
601 let provider = Arc::new(TestProvider { test_content: "" }) as Arc<dyn Provider>;
602 let context = zmq::Context::new();
603 let results_sink = context
604 .socket(zmq::PUSH)
605 .expect("Could not create PUSH end of results-sink socket");
606 results_sink
607 .connect("tcp://127.0.0.1:3459")
608 .expect("Could not connect to PUSH end of results-sink socket");
609
610 let result = super::execute_job(
611 &provider,
612 &payload,
613 &results_sink,
614 "test executor",
615 &loaded_implementations,
616 &loaded_lib_manifests,
617 );
618
619 assert!(result.is_err(), "Unsupported scheme should return an error");
620 let err_msg = result.expect_err("Expected an error").to_string();
621 assert!(
622 err_msg.contains("Unsupported scheme"),
623 "Error should mention unsupported scheme, got: {err_msg}"
624 );
625 }
626
627 #[test]
628 fn execute_job_lib_impl_not_in_manifest() {
629 let manifest_json = r#"{
631 "lib_url": "lib://flowstdlib",
632 "metadata": {
633 "name": "flowstdlib",
634 "version": "0.0.0",
635 "description": "test",
636 "authors": ["me"]
637 },
638 "locators": {},
639 "source_urls": {}
640 }"#;
641
642 let payload = Payload {
643 job_id: 0,
644 input_set: vec![],
645 implementation_url: Url::parse("lib://flowstdlib/math/add")
646 .expect("Could not parse Url"),
647 };
648
649 let loaded_implementations =
650 Arc::new(RwLock::new(HashMap::<Url, Arc<dyn Implementation>>::new()));
651 let loaded_lib_manifests =
652 Arc::new(RwLock::new(HashMap::<Url, (LibraryManifest, Url)>::new()));
653 let provider = Arc::new(ManifestTestProvider {
654 test_content: manifest_json,
655 }) as Arc<dyn Provider>;
656 let context = zmq::Context::new();
657 let results_sink = context
658 .socket(zmq::PUSH)
659 .expect("Could not create PUSH end of results-sink socket");
660 results_sink
661 .connect("tcp://127.0.0.1:3460")
662 .expect("Could not connect to PUSH end of results-sink socket");
663
664 let result = super::execute_job(
665 &provider,
666 &payload,
667 &results_sink,
668 "test executor",
669 &loaded_implementations,
670 &loaded_lib_manifests,
671 );
672
673 assert!(
674 result.is_err(),
675 "Should error when implementation is not in the manifest"
676 );
677 let err_msg = result.expect_err("Expected an error").to_string();
678 assert!(
679 err_msg.contains("Could not find ImplementationLocator"),
680 "Error should mention missing locator, got: {err_msg}"
681 );
682 }
683
684 #[test]
685 fn execute_job_context_impl_not_in_manifest() {
686 let manifest_json = r#"{
688 "lib_url": "context://stdio",
689 "metadata": {
690 "name": "stdio",
691 "version": "0.0.0",
692 "description": "test context",
693 "authors": ["me"]
694 },
695 "locators": {},
696 "source_urls": {}
697 }"#;
698
699 let payload = Payload {
700 job_id: 0,
701 input_set: vec![],
702 implementation_url: Url::parse("context://stdio/stdout").expect("Could not parse Url"),
703 };
704
705 let loaded_implementations =
706 Arc::new(RwLock::new(HashMap::<Url, Arc<dyn Implementation>>::new()));
707 let loaded_lib_manifests =
708 Arc::new(RwLock::new(HashMap::<Url, (LibraryManifest, Url)>::new()));
709 let provider = Arc::new(ManifestTestProvider {
710 test_content: manifest_json,
711 }) as Arc<dyn Provider>;
712 let context = zmq::Context::new();
713 let results_sink = context
714 .socket(zmq::PUSH)
715 .expect("Could not create PUSH end of results-sink socket");
716 results_sink
717 .connect("tcp://127.0.0.1:3461")
718 .expect("Could not connect to PUSH end of results-sink socket");
719
720 let result = super::execute_job(
721 &provider,
722 &payload,
723 &results_sink,
724 "test executor",
725 &loaded_implementations,
726 &loaded_lib_manifests,
727 );
728
729 assert!(
730 result.is_err(),
731 "Should error when context implementation is not in the manifest"
732 );
733 }
734
735 #[test]
736 fn execute_job_lib_preloaded_but_impl_missing() {
737 let lib_url = Url::parse("lib://flowstdlib").expect("Could not parse lib url");
740 let manifest = LibraryManifest::new(lib_url.clone(), test_meta_data());
741 let resolved_url =
742 Url::parse("file://fake/flowstdlib/location").expect("Could not parse Url");
743
744 let loaded_implementations =
745 Arc::new(RwLock::new(HashMap::<Url, Arc<dyn Implementation>>::new()));
746 let loaded_lib_manifests =
747 Arc::new(RwLock::new(HashMap::<Url, (LibraryManifest, Url)>::new()));
748
749 {
751 let mut manifests = loaded_lib_manifests
752 .write()
753 .expect("Could not write to manifests");
754 manifests.insert(lib_url, (manifest, resolved_url));
755 }
756
757 let payload = Payload {
758 job_id: 0,
759 input_set: vec![],
760 implementation_url: Url::parse("lib://flowstdlib/math/add")
761 .expect("Could not parse Url"),
762 };
763
764 let provider = Arc::new(TestProvider { test_content: "" }) as Arc<dyn Provider>;
765 let context = zmq::Context::new();
766 let results_sink = context
767 .socket(zmq::PUSH)
768 .expect("Could not create PUSH end of results-sink socket");
769 results_sink
770 .connect("tcp://127.0.0.1:3462")
771 .expect("Could not connect to PUSH end of results-sink socket");
772
773 let result = super::execute_job(
774 &provider,
775 &payload,
776 &results_sink,
777 "test executor",
778 &loaded_implementations,
779 &loaded_lib_manifests,
780 );
781
782 assert!(
783 result.is_err(),
784 "Should error when implementation is not in the pre-loaded manifest"
785 );
786 let err_msg = result.expect_err("Expected an error").to_string();
787 assert!(
788 err_msg.contains("Could not find ImplementationLocator"),
789 "Error should mention missing locator, got: {err_msg}"
790 );
791 }
792
793 #[test]
794 fn get_lib_manifest_tuple_loads_from_provider() {
795 let manifest_json = r#"{
798 "lib_url": "lib://testlib",
799 "metadata": {
800 "name": "testlib",
801 "version": "1.0.0",
802 "description": "a test lib",
803 "authors": ["tester"]
804 },
805 "locators": {},
806 "source_urls": {}
807 }"#;
808
809 let provider = Arc::new(ManifestTestProvider {
810 test_content: manifest_json,
811 }) as Arc<dyn Provider>;
812 let loaded_lib_manifests =
813 Arc::new(RwLock::new(HashMap::<Url, (LibraryManifest, Url)>::new()));
814 let lib_root_url = Url::parse("lib://testlib").expect("Could not parse lib url");
815
816 let result = super::get_lib_manifest_tuple(&provider, &loaded_lib_manifests, &lib_root_url);
817
818 assert!(
819 result.is_ok(),
820 "Should successfully load manifest from provider"
821 );
822 let (manifest, _resolved_url) = result.expect("Could not get manifest tuple");
823 assert_eq!(manifest.metadata.name, "testlib");
824 assert_eq!(manifest.metadata.version, "1.0.0");
825 }
826
827 #[test]
828 fn get_lib_manifest_tuple_uses_cached() {
829 let lib_url = Url::parse("lib://cachedlib").expect("Could not parse lib url");
832 let cached_manifest = LibraryManifest::new(
833 lib_url.clone(),
834 MetaData {
835 name: "cachedlib".into(),
836 version: "2.0.0".into(),
837 description: "cached".into(),
838 authors: vec!["cacher".into()],
839 },
840 );
841 let cached_resolved = Url::parse("file://cached/location").expect("Could not parse Url");
842
843 let loaded_lib_manifests =
844 Arc::new(RwLock::new(HashMap::<Url, (LibraryManifest, Url)>::new()));
845 {
846 let mut manifests = loaded_lib_manifests
847 .write()
848 .expect("Could not write to manifests");
849 manifests.insert(lib_url.clone(), (cached_manifest, cached_resolved.clone()));
850 }
851
852 let provider = Arc::new(TestProvider {
854 test_content: "invalid json",
855 }) as Arc<dyn Provider>;
856
857 let result = super::get_lib_manifest_tuple(&provider, &loaded_lib_manifests, &lib_url);
858
859 assert!(
860 result.is_ok(),
861 "Should return cached manifest without calling provider"
862 );
863 let (manifest, resolved_url) = result.expect("Could not get manifest tuple");
864 assert_eq!(
865 manifest.metadata.name, "cachedlib",
866 "Should return the cached manifest"
867 );
868 assert_eq!(
869 manifest.metadata.version, "2.0.0",
870 "Should return the cached manifest version"
871 );
872 assert_eq!(
873 resolved_url, cached_resolved,
874 "Should return the cached resolved URL"
875 );
876 }
877
878 #[test]
879 fn get_lib_manifest_tuple_provider_returns_invalid_json() {
880 let provider = Arc::new(ManifestTestProvider {
883 test_content: "not valid json at all",
884 }) as Arc<dyn Provider>;
885 let loaded_lib_manifests =
886 Arc::new(RwLock::new(HashMap::<Url, (LibraryManifest, Url)>::new()));
887 let lib_root_url = Url::parse("lib://badlib").expect("Could not parse lib url");
888
889 let result = super::get_lib_manifest_tuple(&provider, &loaded_lib_manifests, &lib_root_url);
890
891 assert!(
892 result.is_err(),
893 "Should error when provider returns invalid manifest content"
894 );
895 }
896
897 #[test]
898 fn execute_job() {
899 let job1 = Job {
900 process_id: 1,
901 #[cfg(feature = "debugger")]
902 function_name: String::new(),
903 parent_id: 0,
904 connections: vec![],
905 payload: Payload {
906 job_id: 0,
907 input_set: vec![],
908 implementation_url: Url::parse("lib://flowstdlib/math/add")
909 .expect("Could not parse Url"),
910 },
911 result: Ok((None, false)),
912 };
913
914 let job2 = Job {
915 process_id: 1,
916 #[cfg(feature = "debugger")]
917 function_name: String::new(),
918 parent_id: 0,
919 connections: vec![],
920 payload: Payload {
921 job_id: 0,
922 input_set: vec![],
923 implementation_url: Url::parse("context://stdio/stdout")
924 .expect("Could not parse Url"),
925 },
926 result: Ok((None, false)),
927 };
928
929 let job3 = Job {
930 process_id: 1,
931 #[cfg(feature = "debugger")]
932 function_name: String::new(),
933 parent_id: 0,
934 connections: vec![],
935 payload: Payload {
936 job_id: 0,
937 input_set: vec![],
938 implementation_url: Url::parse("file://fake/path").expect("Could not parse Url"),
939 },
940 result: Ok((None, false)),
941 };
942
943 for job in [job1, job2, job3] {
944 let loaded_implementations =
945 Arc::new(RwLock::new(HashMap::<Url, Arc<dyn Implementation>>::new()));
946 let loaded_lib_manifests =
947 Arc::new(RwLock::new(HashMap::<Url, (LibraryManifest, Url)>::new()));
948 let provider = Arc::new(TestProvider { test_content: "" }) as Arc<dyn Provider>;
949 let context = zmq::Context::new();
950 let results_sink = context
951 .socket(zmq::PUSH)
952 .expect("Could not createPUSH end of results-sink socket");
953 results_sink
954 .connect("tcp://127.0.0.1:3458")
955 .expect("Could not connect to PULL end of results-sink socket");
956
957 assert!(super::execute_job(
958 &provider,
959 &job.payload,
960 &results_sink,
961 "test executor",
962 &loaded_implementations,
963 &loaded_lib_manifests,
964 )
965 .is_err());
966 }
967 }
968}