Skip to main content

flowrlib/
executor.rs

1use std::collections::HashMap;
2use std::panic;
3use std::sync::{Arc, RwLock};
4use std::thread;
5use std::thread::JoinHandle;
6
7use log::{debug, error, info, trace};
8use url::Url;
9
10use flowcore::errors::{bail, Result, ResultExt};
11use flowcore::model::lib_manifest::{
12    ImplementationLocator::Native, ImplementationLocator::RelativePath, LibraryManifest,
13};
14use flowcore::provider::Provider;
15use flowcore::Implementation;
16
17use crate::job::Payload;
18use crate::wasm;
19
20/// An `Executor` struct is used to receive jobs, execute them, and return results.
21/// It can load libraries and keep track of the `Function` `Implementations` loaded for use
22/// in job execution.
23pub struct Executor {
24    // HashMap of library manifests already loaded. The key is the library reference Url
25    // (e.g. lib:://flowstdlib), and the entry is a tuple of the LibraryManifest
26    // and the resolved Url of where the manifest was read from
27    loaded_lib_manifests: Arc<RwLock<HashMap<Url, (LibraryManifest, Url)>>>,
28    executors: Vec<JoinHandle<()>>,
29}
30
31impl Default for Executor {
32    fn default() -> Self {
33        Self::new()
34    }
35}
36
37impl Executor {
38    /// Create a new executor that receives jobs, executes them, and returns results.
39    #[must_use]
40    pub fn new() -> Self {
41        Executor {
42            loaded_lib_manifests: Arc::new(RwLock::new(
43                HashMap::<Url, (LibraryManifest, Url)>::new(),
44            )),
45            executors: vec![],
46        }
47    }
48
49    /// Add a library manifest so that it can be used later on to load implementations that are
50    /// required to execute jobs. Also provide the Url that the library url resolves to, so that
51    /// later it can be used when resolving the locations of implementations in this library.
52    ///
53    /// # Errors
54    ///
55    /// Returns an error if this `LibraryManifest` cannot be added to the set of manifests used
56    /// by the runtime to load functions.
57    ///
58    pub fn add_lib(&mut self, lib_manifest: LibraryManifest, resolved_url: Url) -> Result<()> {
59        let mut lib_manifests = self
60            .loaded_lib_manifests
61            .write()
62            .map_err(|_| "Could not gain write access to loaded library manifests map")?;
63
64        debug!(
65            "Manifest of library '{}' loaded from '{}' and added to Executor",
66            lib_manifest.lib_url, resolved_url
67        );
68
69        lib_manifests.insert(lib_manifest.lib_url.clone(), (lib_manifest, resolved_url));
70
71        Ok(())
72    }
73
74    /// Start executing jobs, specifying:
75    /// - the `Provider` to use to fetch implementation content
76    /// - the number of executor threads
77    /// - the address of the job socket to get jobs from
78    /// - the address of the results socket to return results from executed jobs to
79    pub fn start(
80        &mut self,
81        provider: &Arc<dyn Provider>,
82        number_of_executors: usize,
83        job_service: &str,
84        results_service: &str,
85        control_service: &str,
86    ) {
87        let loaded_implementations =
88            Arc::new(RwLock::new(HashMap::<Url, Arc<dyn Implementation>>::new()));
89
90        info!("Starting {number_of_executors} executor threads");
91        for executor_number in 0..number_of_executors {
92            let thread_provider = provider.clone();
93            let thread_context = zmq::Context::new();
94            let thread_implementations = loaded_implementations.clone();
95            let thread_loaded_manifests = self.loaded_lib_manifests.clone();
96            let results_sink = results_service.into();
97            let job_source = job_service.into();
98            let control_address = control_service.into();
99            self.executors.push(thread::spawn(move || {
100                trace!("Executor #{executor_number} entering execution loop");
101                if let Err(e) = execution_loop(
102                    &thread_provider,
103                    &format!("Executor #{executor_number}"),
104                    &thread_context,
105                    &thread_implementations,
106                    &thread_loaded_manifests,
107                    job_source,
108                    results_sink,
109                    control_address,
110                ) {
111                    error!("Execution loop error: {e}");
112                }
113            }));
114        }
115    }
116
117    /// Wait until all threads end
118    pub fn wait(self) {
119        for executor in self.executors {
120            let _ = executor.join();
121        }
122    }
123}
124
125#[allow(clippy::too_many_arguments)]
126#[allow(clippy::needless_pass_by_value)]
127fn execution_loop(
128    provider: &Arc<dyn Provider>,
129    name: &str,
130    context: &zmq::Context,
131    loaded_implementations: &Arc<RwLock<HashMap<Url, Arc<dyn Implementation>>>>,
132    loaded_lib_manifests: &Arc<RwLock<HashMap<Url, (LibraryManifest, Url)>>>,
133    job_service: String,
134    results_service: String,
135    control_address: String,
136) -> Result<()> {
137    let job_source = context
138        .socket(zmq::PULL)
139        .map_err(|e| format!("Could not create PULL end of job socket: {e}"))?;
140    job_source
141        .connect(&job_service)
142        .map_err(|e| format!("Could not connect to PULL end of job socket: '{job_service}' {e}"))?;
143
144    let results_sink = context
145        .socket(zmq::PUSH)
146        .map_err(|e| format!("Could not create PUSH end of results socket: {e}"))?;
147    results_sink
148        .connect(&results_service)
149        .map_err(|e| format!("Could not connect to PUSH end of results socket: {e}"))?;
150
151    let control_socket = context
152        .socket(zmq::SocketType::SUB)
153        .map_err(|e| format!("Could not create SUB end of control socket: {e}"))?;
154    control_socket
155        .connect(&control_address)
156        .map_err(|e| format!("Could not connect to SUB end of control socket: {e}"))?;
157    control_socket
158        .set_subscribe(&[])
159        .map_err(|e| format!("Could not subscribe to SUB end of control socket: {e}"))?;
160
161    let mut process_jobs = true;
162
163    set_panic_hook();
164
165    let mut items: Vec<zmq::PollItem> = vec![
166        job_source.as_poll_item(zmq::POLLIN),
167        control_socket.as_poll_item(zmq::POLLIN),
168    ];
169
170    while process_jobs {
171        trace!("{name} waiting for a job to execute or a DONE signal");
172        match zmq::poll(&mut items, -1).map_err(|_| "Error while polling for Jobs to execute") {
173            Ok(_) => {
174                if items
175                    .first()
176                    .ok_or("Could not get poll item 0")?
177                    .is_readable()
178                {
179                    let msg = job_source
180                        .recv_msg(0)
181                        .map_err(|_| "Error receiving Job for execution")?;
182                    let message_string = msg.as_str().ok_or("Could not get message as str")?;
183                    let payload: Payload = serde_json::from_str(message_string)
184                        .map_err(|_| "Could not deserialize Message to Job")?;
185
186                    trace!("Job #{}: Received by {}", payload.job_id, name);
187                    match execute_job(
188                        provider,
189                        &payload,
190                        &results_sink,
191                        name,
192                        &loaded_implementations.clone(),
193                        &loaded_lib_manifests.clone(),
194                    ) {
195                        Ok(keep_processing) => process_jobs = keep_processing,
196                        Err(e) => error!("{e}"),
197                    }
198                }
199
200                if items
201                    .get(1)
202                    .ok_or("Could not get poll item 1")?
203                    .is_readable()
204                {
205                    let msg = control_socket
206                        .recv_msg(0)
207                        .map_err(|_| "Error receiving Control message")?;
208                    match msg.as_str().ok_or("Could not get message as str") {
209                        Ok("DONE") => {
210                            trace!("'DONE' message received in executor");
211                            return Ok(());
212                        }
213                        Ok(_) => error!("Unexpected Control message"),
214                        _ => error!("Error parsing Control message"),
215                    }
216                }
217            }
218            Err(e) => {
219                error!("Error while polling for Jobs or Control messages: {e}");
220            }
221        }
222    }
223
224    Ok(())
225}
226
227// Replace the standard panic hook with one that just outputs the file and line of any panic.
228fn set_panic_hook() {
229    panic::set_hook(Box::new(|panic_info| {
230        if let Some(location) = panic_info.location() {
231            error!(
232                "Panic in file '{}' at line {}",
233                location.file(),
234                location.line()
235            );
236        }
237    }));
238}
239
240// Return Ok(keep_processing) flag as true or false to keep processing
241fn execute_job(
242    provider: &Arc<dyn Provider>,
243    payload: &Payload,
244    results_sink: &zmq::Socket,
245    name: &str,
246    loaded_implementations: &Arc<RwLock<HashMap<Url, Arc<dyn Implementation>>>>,
247    loaded_lib_manifests: &Arc<RwLock<HashMap<Url, (LibraryManifest, Url)>>>,
248) -> Result<bool> {
249    // TODO see if we can avoid write access until we know it's needed
250    let mut implementations = loaded_implementations
251        .write()
252        .map_err(|_| "Could not gain read access to loaded implementations map")?;
253    if implementations.get(&payload.implementation_url).is_none() {
254        trace!(
255            "Implementation '{}' is not loaded",
256            payload.implementation_url
257        );
258        let implementation = match payload.implementation_url.scheme() {
259            "lib" => {
260                let mut lib_root_url = payload.implementation_url.clone();
261                lib_root_url.set_path("");
262                load_referenced_implementation(
263                    provider,
264                    &lib_root_url,
265                    loaded_lib_manifests,
266                    &payload.implementation_url,
267                )?
268            }
269            "context" => {
270                let mut lib_root_url = payload.implementation_url.clone();
271                let _ = lib_root_url.set_host(Some(""));
272                lib_root_url.set_path("");
273                load_referenced_implementation(
274                    provider,
275                    &lib_root_url,
276                    loaded_lib_manifests,
277                    &payload.implementation_url,
278                )?
279            }
280            "file" => Arc::new(wasm::load(provider, &payload.implementation_url)?),
281            _ => bail!("Unsupported scheme on implementation_url"),
282        };
283        implementations.insert(payload.implementation_url.clone(), implementation);
284        trace!(
285            "Implementation '{}' added to executor",
286            payload.implementation_url
287        );
288    }
289
290    let implementation = implementations
291        .get(&payload.implementation_url)
292        .ok_or("Could not find implementation")?;
293
294    trace!("Job #{}: Started executing on '{name}'", payload.job_id);
295    let result = implementation.run(&payload.input_set);
296    trace!("Job #{}: Finished executing on '{name}'", payload.job_id);
297
298    results_sink
299        .send(
300            serde_json::to_string(&(payload.job_id, result))?.as_bytes(),
301            0,
302        )
303        .map_err(|_| "Could not send result of Job")?;
304
305    Ok(true)
306}
307
308// Load a context or library implementation
309fn load_referenced_implementation(
310    provider: &Arc<dyn Provider>,
311    lib_root_url: &Url,
312    loaded_lib_manifests: &Arc<RwLock<HashMap<Url, (LibraryManifest, Url)>>>,
313    implementation_url: &Url,
314) -> Result<Arc<dyn Implementation>> {
315    let (lib_manifest, resolved_lib_url) =
316        get_lib_manifest_tuple(provider, loaded_lib_manifests, lib_root_url)?;
317
318    let locator = lib_manifest
319        .locators
320        .get(implementation_url)
321        .ok_or(format!(
322            "Could not find ImplementationLocator for '{implementation_url}' in library"
323        ))?;
324
325    // find the implementation we need from the locator
326    let implementation = match locator {
327        RelativePath(wasm_source_relative) => {
328            // Path to the wasm source could be relative to the URL where we loaded the manifest from
329            let wasm_url = resolved_lib_url
330                .join(wasm_source_relative)
331                .map_err(|e| e.to_string())?;
332            debug!("Attempting to load wasm from source file: '{wasm_url}'");
333            // Wasm implementation being added. Wrap it with the Wasm Native Implementation
334            let wasm_executor = wasm::load(provider, &wasm_url)?;
335            Arc::new(wasm_executor) as Arc<dyn Implementation>
336        }
337        Native(native_impl) => native_impl.clone(),
338    };
339
340    Ok(implementation)
341}
342
343// Get the tuple of the lib manifest and the url from where it was loaded from
344fn get_lib_manifest_tuple(
345    provider: &Arc<dyn Provider>,
346    loaded_lib_manifests: &Arc<RwLock<HashMap<Url, (LibraryManifest, Url)>>>,
347    lib_root_url: &Url,
348) -> Result<(LibraryManifest, Url)> {
349    let mut lib_manifests = loaded_lib_manifests
350        .write()
351        .map_err(|_| "Could not get write access to the loaded lib manifests")?;
352
353    if lib_manifests.get(lib_root_url).is_none() {
354        info!("Attempting to load library manifest'{lib_root_url}'");
355        let manifest_tuple = LibraryManifest::load(provider, lib_root_url)
356            .chain_err(|| format!("Could not load library with root url: '{lib_root_url}'"))?;
357        lib_manifests.insert(lib_root_url.clone(), manifest_tuple);
358    }
359
360    // TODO avoid this clone and return references
361    lib_manifests
362        .get(lib_root_url)
363        .ok_or_else(|| "Could not find (supposedly already loaded) library manifest".into())
364        .cloned()
365}
366
367#[cfg(test)]
368#[allow(clippy::unwrap_used, clippy::expect_used)]
369mod test {
370    use std::collections::HashMap;
371    use std::sync::{Arc, RwLock};
372
373    use url::Url;
374
375    use flowcore::errors::Result;
376    use flowcore::model::lib_manifest::LibraryManifest;
377    use flowcore::model::metadata::MetaData;
378    use flowcore::provider::Provider;
379    use flowcore::Implementation;
380
381    use crate::job::{Job, Payload};
382
383    use super::Executor;
384
385    fn test_meta_data() -> MetaData {
386        MetaData {
387            name: "test".into(),
388            version: "0.0.0".into(),
389            description: "a test".into(),
390            authors: vec!["me".into()],
391        }
392    }
393
394    #[allow(clippy::module_name_repetitions)]
395    pub struct TestProvider {
396        test_content: &'static str,
397    }
398
399    impl Provider for TestProvider {
400        fn resolve_url(
401            &self,
402            source: &Url,
403            _default_filename: &str,
404            _extensions: &[&str],
405        ) -> Result<(Url, Option<Url>)> {
406            Ok((source.clone(), None))
407        }
408
409        fn get_contents(&self, _url: &Url) -> Result<Vec<u8>> {
410            Ok(self.test_content.as_bytes().to_owned())
411        }
412    }
413
414    /// A test provider that resolves URLs to a `.json` file URL, allowing
415    /// `LibraryManifest::load` to find the correct JSON deserializer.
416    struct ManifestTestProvider {
417        test_content: &'static str,
418    }
419
420    impl Provider for ManifestTestProvider {
421        fn resolve_url(
422            &self,
423            _source: &Url,
424            _default_filename: &str,
425            _extensions: &[&str],
426        ) -> Result<(Url, Option<Url>)> {
427            // Return a file:// URL with .json extension so the deserializer can be found
428            let resolved = Url::parse("file:///tmp/manifest.json").map_err(|e| e.to_string())?;
429            Ok((resolved, None))
430        }
431
432        fn get_contents(&self, _url: &Url) -> Result<Vec<u8>> {
433            Ok(self.test_content.as_bytes().to_owned())
434        }
435    }
436
437    #[test]
438    fn add_a_lib() {
439        let library = LibraryManifest::new(
440            Url::parse("lib://testlib").expect("Could not parse lib url"),
441            test_meta_data(),
442        );
443
444        let mut executor = Executor::new();
445        assert!(executor
446            .add_lib(
447                library,
448                Url::parse("file://fake/lib/location").expect("Could not parse Url")
449            )
450            .is_ok());
451    }
452
453    #[test]
454    fn default_same_as_new() {
455        let default_executor = Executor::default();
456        let new_executor = Executor::new();
457
458        // Both should start with empty lib manifests
459        let default_manifests = default_executor
460            .loaded_lib_manifests
461            .read()
462            .expect("Could not read default executor manifests");
463        let new_manifests = new_executor
464            .loaded_lib_manifests
465            .read()
466            .expect("Could not read new executor manifests");
467
468        assert!(
469            default_manifests.is_empty(),
470            "default() executor should have no loaded manifests"
471        );
472        assert!(
473            new_manifests.is_empty(),
474            "new() executor should have no loaded manifests"
475        );
476
477        // Both should start with no executor threads
478        assert!(
479            default_executor.executors.is_empty(),
480            "default() executor should have no executor threads"
481        );
482        assert!(
483            new_executor.executors.is_empty(),
484            "new() executor should have no executor threads"
485        );
486    }
487
488    #[test]
489    fn add_multiple_libs() {
490        let lib1 = LibraryManifest::new(
491            Url::parse("lib://testlib1").expect("Could not parse lib url"),
492            test_meta_data(),
493        );
494        let lib2 = LibraryManifest::new(
495            Url::parse("lib://testlib2").expect("Could not parse lib url"),
496            MetaData {
497                name: "test2".into(),
498                version: "0.0.1".into(),
499                description: "another test".into(),
500                authors: vec!["someone".into()],
501            },
502        );
503
504        let mut executor = Executor::new();
505        assert!(executor
506            .add_lib(
507                lib1,
508                Url::parse("file://fake/lib1/location").expect("Could not parse Url")
509            )
510            .is_ok());
511        assert!(executor
512            .add_lib(
513                lib2,
514                Url::parse("file://fake/lib2/location").expect("Could not parse Url")
515            )
516            .is_ok());
517
518        let manifests = executor
519            .loaded_lib_manifests
520            .read()
521            .expect("Could not read manifests");
522        assert_eq!(manifests.len(), 2, "Should have two loaded manifests");
523        assert!(
524            manifests.contains_key(&Url::parse("lib://testlib1").expect("Could not parse lib url"))
525        );
526        assert!(
527            manifests.contains_key(&Url::parse("lib://testlib2").expect("Could not parse lib url"))
528        );
529    }
530
531    #[test]
532    fn add_same_lib_twice_overwrites() {
533        let lib_url = Url::parse("lib://testlib").expect("Could not parse lib url");
534
535        let lib1 = LibraryManifest::new(
536            lib_url.clone(),
537            MetaData {
538                name: "original".into(),
539                version: "0.0.0".into(),
540                description: "original lib".into(),
541                authors: vec!["me".into()],
542            },
543        );
544        let lib2 = LibraryManifest::new(
545            lib_url.clone(),
546            MetaData {
547                name: "replacement".into(),
548                version: "1.0.0".into(),
549                description: "replacement lib".into(),
550                authors: vec!["someone_else".into()],
551            },
552        );
553
554        let resolved1 = Url::parse("file://fake/lib/location1").expect("Could not parse Url");
555        let resolved2 = Url::parse("file://fake/lib/location2").expect("Could not parse Url");
556
557        let mut executor = Executor::new();
558        assert!(executor.add_lib(lib1, resolved1).is_ok());
559        assert!(executor.add_lib(lib2, resolved2.clone()).is_ok());
560
561        let manifests = executor
562            .loaded_lib_manifests
563            .read()
564            .expect("Could not read manifests");
565        assert_eq!(
566            manifests.len(),
567            1,
568            "Should have only one manifest after adding the same lib_url twice"
569        );
570
571        let (manifest, resolved_url) = manifests
572            .get(&lib_url)
573            .expect("Could not find manifest for lib url");
574        assert_eq!(
575            manifest.metadata.name, "replacement",
576            "Manifest should be the second (replacement) one"
577        );
578        assert_eq!(
579            manifest.metadata.version, "1.0.0",
580            "Version should be from the replacement manifest"
581        );
582        assert_eq!(
583            *resolved_url, resolved2,
584            "Resolved URL should be from the second add_lib call"
585        );
586    }
587
588    #[test]
589    fn execute_job_unsupported_scheme() {
590        let payload = Payload {
591            job_id: 0,
592            input_set: vec![],
593            implementation_url: Url::parse("http://example.com/some/impl")
594                .expect("Could not parse Url"),
595        };
596
597        let loaded_implementations =
598            Arc::new(RwLock::new(HashMap::<Url, Arc<dyn Implementation>>::new()));
599        let loaded_lib_manifests =
600            Arc::new(RwLock::new(HashMap::<Url, (LibraryManifest, Url)>::new()));
601        let provider = Arc::new(TestProvider { test_content: "" }) as Arc<dyn Provider>;
602        let context = zmq::Context::new();
603        let results_sink = context
604            .socket(zmq::PUSH)
605            .expect("Could not create PUSH end of results-sink socket");
606        results_sink
607            .connect("tcp://127.0.0.1:3459")
608            .expect("Could not connect to PUSH end of results-sink socket");
609
610        let result = super::execute_job(
611            &provider,
612            &payload,
613            &results_sink,
614            "test executor",
615            &loaded_implementations,
616            &loaded_lib_manifests,
617        );
618
619        assert!(result.is_err(), "Unsupported scheme should return an error");
620        let err_msg = result.expect_err("Expected an error").to_string();
621        assert!(
622            err_msg.contains("Unsupported scheme"),
623            "Error should mention unsupported scheme, got: {err_msg}"
624        );
625    }
626
627    #[test]
628    fn execute_job_lib_impl_not_in_manifest() {
629        // Create a valid manifest JSON that the ManifestTestProvider will return
630        let manifest_json = r#"{
631            "lib_url": "lib://flowstdlib",
632            "metadata": {
633                "name": "flowstdlib",
634                "version": "0.0.0",
635                "description": "test",
636                "authors": ["me"]
637            },
638            "locators": {},
639            "source_urls": {}
640        }"#;
641
642        let payload = Payload {
643            job_id: 0,
644            input_set: vec![],
645            implementation_url: Url::parse("lib://flowstdlib/math/add")
646                .expect("Could not parse Url"),
647        };
648
649        let loaded_implementations =
650            Arc::new(RwLock::new(HashMap::<Url, Arc<dyn Implementation>>::new()));
651        let loaded_lib_manifests =
652            Arc::new(RwLock::new(HashMap::<Url, (LibraryManifest, Url)>::new()));
653        let provider = Arc::new(ManifestTestProvider {
654            test_content: manifest_json,
655        }) as Arc<dyn Provider>;
656        let context = zmq::Context::new();
657        let results_sink = context
658            .socket(zmq::PUSH)
659            .expect("Could not create PUSH end of results-sink socket");
660        results_sink
661            .connect("tcp://127.0.0.1:3460")
662            .expect("Could not connect to PUSH end of results-sink socket");
663
664        let result = super::execute_job(
665            &provider,
666            &payload,
667            &results_sink,
668            "test executor",
669            &loaded_implementations,
670            &loaded_lib_manifests,
671        );
672
673        assert!(
674            result.is_err(),
675            "Should error when implementation is not in the manifest"
676        );
677        let err_msg = result.expect_err("Expected an error").to_string();
678        assert!(
679            err_msg.contains("Could not find ImplementationLocator"),
680            "Error should mention missing locator, got: {err_msg}"
681        );
682    }
683
684    #[test]
685    fn execute_job_context_impl_not_in_manifest() {
686        // Create a valid manifest for a context library
687        let manifest_json = r#"{
688            "lib_url": "context://stdio",
689            "metadata": {
690                "name": "stdio",
691                "version": "0.0.0",
692                "description": "test context",
693                "authors": ["me"]
694            },
695            "locators": {},
696            "source_urls": {}
697        }"#;
698
699        let payload = Payload {
700            job_id: 0,
701            input_set: vec![],
702            implementation_url: Url::parse("context://stdio/stdout").expect("Could not parse Url"),
703        };
704
705        let loaded_implementations =
706            Arc::new(RwLock::new(HashMap::<Url, Arc<dyn Implementation>>::new()));
707        let loaded_lib_manifests =
708            Arc::new(RwLock::new(HashMap::<Url, (LibraryManifest, Url)>::new()));
709        let provider = Arc::new(ManifestTestProvider {
710            test_content: manifest_json,
711        }) as Arc<dyn Provider>;
712        let context = zmq::Context::new();
713        let results_sink = context
714            .socket(zmq::PUSH)
715            .expect("Could not create PUSH end of results-sink socket");
716        results_sink
717            .connect("tcp://127.0.0.1:3461")
718            .expect("Could not connect to PUSH end of results-sink socket");
719
720        let result = super::execute_job(
721            &provider,
722            &payload,
723            &results_sink,
724            "test executor",
725            &loaded_implementations,
726            &loaded_lib_manifests,
727        );
728
729        assert!(
730            result.is_err(),
731            "Should error when context implementation is not in the manifest"
732        );
733    }
734
735    #[test]
736    fn execute_job_lib_preloaded_but_impl_missing() {
737        // Pre-load a manifest with no locators, then try to execute a job
738        // referencing an implementation that doesn't exist in it
739        let lib_url = Url::parse("lib://flowstdlib").expect("Could not parse lib url");
740        let manifest = LibraryManifest::new(lib_url.clone(), test_meta_data());
741        let resolved_url =
742            Url::parse("file://fake/flowstdlib/location").expect("Could not parse Url");
743
744        let loaded_implementations =
745            Arc::new(RwLock::new(HashMap::<Url, Arc<dyn Implementation>>::new()));
746        let loaded_lib_manifests =
747            Arc::new(RwLock::new(HashMap::<Url, (LibraryManifest, Url)>::new()));
748
749        // Pre-load the manifest
750        {
751            let mut manifests = loaded_lib_manifests
752                .write()
753                .expect("Could not write to manifests");
754            manifests.insert(lib_url, (manifest, resolved_url));
755        }
756
757        let payload = Payload {
758            job_id: 0,
759            input_set: vec![],
760            implementation_url: Url::parse("lib://flowstdlib/math/add")
761                .expect("Could not parse Url"),
762        };
763
764        let provider = Arc::new(TestProvider { test_content: "" }) as Arc<dyn Provider>;
765        let context = zmq::Context::new();
766        let results_sink = context
767            .socket(zmq::PUSH)
768            .expect("Could not create PUSH end of results-sink socket");
769        results_sink
770            .connect("tcp://127.0.0.1:3462")
771            .expect("Could not connect to PUSH end of results-sink socket");
772
773        let result = super::execute_job(
774            &provider,
775            &payload,
776            &results_sink,
777            "test executor",
778            &loaded_implementations,
779            &loaded_lib_manifests,
780        );
781
782        assert!(
783            result.is_err(),
784            "Should error when implementation is not in the pre-loaded manifest"
785        );
786        let err_msg = result.expect_err("Expected an error").to_string();
787        assert!(
788            err_msg.contains("Could not find ImplementationLocator"),
789            "Error should mention missing locator, got: {err_msg}"
790        );
791    }
792
793    #[test]
794    fn get_lib_manifest_tuple_loads_from_provider() {
795        // Test that get_lib_manifest_tuple can load a manifest from the provider
796        // when it's not already in the loaded manifests map
797        let manifest_json = r#"{
798            "lib_url": "lib://testlib",
799            "metadata": {
800                "name": "testlib",
801                "version": "1.0.0",
802                "description": "a test lib",
803                "authors": ["tester"]
804            },
805            "locators": {},
806            "source_urls": {}
807        }"#;
808
809        let provider = Arc::new(ManifestTestProvider {
810            test_content: manifest_json,
811        }) as Arc<dyn Provider>;
812        let loaded_lib_manifests =
813            Arc::new(RwLock::new(HashMap::<Url, (LibraryManifest, Url)>::new()));
814        let lib_root_url = Url::parse("lib://testlib").expect("Could not parse lib url");
815
816        let result = super::get_lib_manifest_tuple(&provider, &loaded_lib_manifests, &lib_root_url);
817
818        assert!(
819            result.is_ok(),
820            "Should successfully load manifest from provider"
821        );
822        let (manifest, _resolved_url) = result.expect("Could not get manifest tuple");
823        assert_eq!(manifest.metadata.name, "testlib");
824        assert_eq!(manifest.metadata.version, "1.0.0");
825    }
826
827    #[test]
828    fn get_lib_manifest_tuple_uses_cached() {
829        // Test that get_lib_manifest_tuple returns a cached manifest
830        // rather than loading from provider
831        let lib_url = Url::parse("lib://cachedlib").expect("Could not parse lib url");
832        let cached_manifest = LibraryManifest::new(
833            lib_url.clone(),
834            MetaData {
835                name: "cachedlib".into(),
836                version: "2.0.0".into(),
837                description: "cached".into(),
838                authors: vec!["cacher".into()],
839            },
840        );
841        let cached_resolved = Url::parse("file://cached/location").expect("Could not parse Url");
842
843        let loaded_lib_manifests =
844            Arc::new(RwLock::new(HashMap::<Url, (LibraryManifest, Url)>::new()));
845        {
846            let mut manifests = loaded_lib_manifests
847                .write()
848                .expect("Could not write to manifests");
849            manifests.insert(lib_url.clone(), (cached_manifest, cached_resolved.clone()));
850        }
851
852        // Provider returns different content - but should not be used since manifest is cached
853        let provider = Arc::new(TestProvider {
854            test_content: "invalid json",
855        }) as Arc<dyn Provider>;
856
857        let result = super::get_lib_manifest_tuple(&provider, &loaded_lib_manifests, &lib_url);
858
859        assert!(
860            result.is_ok(),
861            "Should return cached manifest without calling provider"
862        );
863        let (manifest, resolved_url) = result.expect("Could not get manifest tuple");
864        assert_eq!(
865            manifest.metadata.name, "cachedlib",
866            "Should return the cached manifest"
867        );
868        assert_eq!(
869            manifest.metadata.version, "2.0.0",
870            "Should return the cached manifest version"
871        );
872        assert_eq!(
873            resolved_url, cached_resolved,
874            "Should return the cached resolved URL"
875        );
876    }
877
878    #[test]
879    fn get_lib_manifest_tuple_provider_returns_invalid_json() {
880        // Test that get_lib_manifest_tuple returns an error when the provider
881        // returns invalid manifest content
882        let provider = Arc::new(ManifestTestProvider {
883            test_content: "not valid json at all",
884        }) as Arc<dyn Provider>;
885        let loaded_lib_manifests =
886            Arc::new(RwLock::new(HashMap::<Url, (LibraryManifest, Url)>::new()));
887        let lib_root_url = Url::parse("lib://badlib").expect("Could not parse lib url");
888
889        let result = super::get_lib_manifest_tuple(&provider, &loaded_lib_manifests, &lib_root_url);
890
891        assert!(
892            result.is_err(),
893            "Should error when provider returns invalid manifest content"
894        );
895    }
896
897    #[test]
898    fn execute_job() {
899        let job1 = Job {
900            process_id: 1,
901            #[cfg(feature = "debugger")]
902            function_name: String::new(),
903            parent_id: 0,
904            connections: vec![],
905            payload: Payload {
906                job_id: 0,
907                input_set: vec![],
908                implementation_url: Url::parse("lib://flowstdlib/math/add")
909                    .expect("Could not parse Url"),
910            },
911            result: Ok((None, false)),
912        };
913
914        let job2 = Job {
915            process_id: 1,
916            #[cfg(feature = "debugger")]
917            function_name: String::new(),
918            parent_id: 0,
919            connections: vec![],
920            payload: Payload {
921                job_id: 0,
922                input_set: vec![],
923                implementation_url: Url::parse("context://stdio/stdout")
924                    .expect("Could not parse Url"),
925            },
926            result: Ok((None, false)),
927        };
928
929        let job3 = Job {
930            process_id: 1,
931            #[cfg(feature = "debugger")]
932            function_name: String::new(),
933            parent_id: 0,
934            connections: vec![],
935            payload: Payload {
936                job_id: 0,
937                input_set: vec![],
938                implementation_url: Url::parse("file://fake/path").expect("Could not parse Url"),
939            },
940            result: Ok((None, false)),
941        };
942
943        for job in [job1, job2, job3] {
944            let loaded_implementations =
945                Arc::new(RwLock::new(HashMap::<Url, Arc<dyn Implementation>>::new()));
946            let loaded_lib_manifests =
947                Arc::new(RwLock::new(HashMap::<Url, (LibraryManifest, Url)>::new()));
948            let provider = Arc::new(TestProvider { test_content: "" }) as Arc<dyn Provider>;
949            let context = zmq::Context::new();
950            let results_sink = context
951                .socket(zmq::PUSH)
952                .expect("Could not createPUSH end of results-sink socket");
953            results_sink
954                .connect("tcp://127.0.0.1:3458")
955                .expect("Could not connect to PULL end of results-sink socket");
956
957            assert!(super::execute_job(
958                &provider,
959                &job.payload,
960                &results_sink,
961                "test executor",
962                &loaded_implementations,
963                &loaded_lib_manifests,
964            )
965            .is_err());
966        }
967    }
968}