use std::collections::HashMap;
use std::panic;
use std::sync::{Arc, mpsc, Mutex, RwLock};
use std::sync::mpsc::{Receiver, Sender};
use std::thread;
use std::time::Duration;
use log::{debug, error, info, trace};
use url::Url;
use flowcore::errors::*;
use flowcore::Implementation;
use flowcore::meta_provider::{MetaProvider, Provider};
use flowcore::model::lib_manifest::{
ImplementationLocator::Native, ImplementationLocator::Wasm, LibraryManifest,
};
use crate::job::Job;
use crate::wasm;
pub struct Executor {
job_sender: Sender<Job>,
results_receiver: Receiver<Job>,
job_timeout: Option<Duration>,
loaded_lib_manifests: Arc<RwLock<HashMap<Url, (LibraryManifest, Url)>>>,
}
impl Executor {
pub fn new(metaprovider: MetaProvider, number_of_executors: usize, job_timeout: Option<Duration>) -> Self {
let (job_sender, job_receiver) = mpsc::channel();
let (results_sender, results_receiver) = mpsc::channel();
let provider = Arc::new(metaprovider);
info!("Starting {} local executor threads", number_of_executors);
let shared_job_receiver = Arc::new(Mutex::new(job_receiver));
let loaded_implementations = Arc::new(RwLock::new(HashMap::<Url, Arc<dyn Implementation>>::new()));
let loaded_lib_manifests = Arc::new(RwLock::new(HashMap::<Url, (LibraryManifest, Url)>::new()));
start_local_executors(provider, number_of_executors, shared_job_receiver, results_sender,
loaded_implementations, loaded_lib_manifests.clone());
Executor {
job_sender,
results_receiver,
job_timeout,
loaded_lib_manifests,
}
}
pub fn set_timeout(&mut self, timeout: Option<Duration>) {
self.job_timeout = timeout;
}
pub fn get_next_result(&mut self) -> Result<Job> {
match self.job_timeout {
Some(t) => self.results_receiver.recv_timeout(t)
.chain_err(|| "Timeout while waiting for Job result"),
None => self.results_receiver.recv()
.chain_err(|| "Error while trying to receive Job results")
}
}
pub(crate) fn send_job_for_execution(&mut self, job: &Job) -> Result<()> {
self.job_sender
.send(job.clone())
.chain_err(|| "Sending of job for execution failed")?;
trace!(
"Job #{}: Sent for execution of Function #{}",
job.job_id,
job.function_id
);
Ok(())
}
pub fn add_lib(
&mut self,
lib_manifest: LibraryManifest,
resolved_url: Url
) -> Result<()> {
let mut lib_manifests = self.loaded_lib_manifests.try_write()
.map_err(|_| "Could not gain write access to loaded library manifests map")?;
debug!("Manifest of library {} loaded from {} and added to Executor",
lib_manifest.lib_url, resolved_url);
lib_manifests.insert(lib_manifest.lib_url.clone(), (lib_manifest, resolved_url));
Ok(())
}
}
fn start_local_executors(
provider: Arc<dyn Provider>,
number_of_executors: usize,
shared_job_receiver: Arc<Mutex<Receiver<Job>>>,
job_sender: Sender<Job>,
loaded_implementations: Arc<RwLock<HashMap<Url, Arc<dyn Implementation>>>>,
loaded_lib_manifests: Arc<RwLock<HashMap<Url, (LibraryManifest, Url)>>>,
) {
for executor_number in 0..number_of_executors {
create_executor_thread(
provider.clone(),
format!("Executor #{}", executor_number),
shared_job_receiver.clone(),
job_sender.clone(),
loaded_implementations.clone(),
loaded_lib_manifests.clone(),
); }
}
fn create_executor_thread(
provider: Arc<dyn Provider>,
name: String,
job_receiver: Arc<Mutex<Receiver<Job>>>,
job_sender: Sender<Job>,
loaded_implementations: Arc<RwLock<HashMap<Url, Arc<dyn Implementation>>>>,
loaded_lib_manifests: Arc<RwLock<HashMap<Url, (LibraryManifest, Url)>>>,
) {
let builder = thread::Builder::new();
let _ = builder.spawn(move || {
set_panic_hook();
loop {
let _ = get_and_execute_job(provider.clone(), &job_receiver, &job_sender,
&name,
loaded_implementations.clone(),
loaded_lib_manifests.clone()
);
}
});
}
fn set_panic_hook() {
panic::set_hook(Box::new(|panic_info| {
if let Some(location) = panic_info.location() {
error!(
"Panic in file '{}' at line {}",
location.file(),
location.line()
);
}
}));
}
fn get_and_execute_job(
provider: Arc<dyn Provider>,
job_receiver: &Arc<Mutex<Receiver<Job>>>,
job_sender: &Sender<Job>,
name: &str,
loaded_implementations: Arc<RwLock<HashMap<Url, Arc<dyn Implementation>>>>,
loaded_lib_manifests: Arc<RwLock<HashMap<Url, (LibraryManifest, Url)>>>,
) -> Result<()> {
let guard = job_receiver
.lock()
.map_err(|e| format!("Error locking receiver to get job: '{}'", e))?;
let job = guard
.recv()
.map_err(|e| format!("Error receiving job for execution: '{}'", e))?;
trace!("Job received for execution: {}", job);
let mut implementations = loaded_implementations.try_write()
.map_err(|_| "Could not gain write access to loaded implementations map")?;
if implementations.get(&job.implementation_url).is_none() {
let implementation = match job.implementation_url.scheme() {
"lib" => {
let mut lib_root_url = job.implementation_url.clone();
lib_root_url.set_path("");
load_referenced_implementation(provider,
lib_root_url,
loaded_lib_manifests,
&job.implementation_url)?
},
"context" => {
let mut lib_root_url = job.implementation_url.clone();
let _ = lib_root_url.set_host(Some(""));
lib_root_url.set_path("");
load_referenced_implementation(provider,
lib_root_url,
loaded_lib_manifests,
&job.implementation_url)?
},
"file" => resolve_implementation(provider, &job.implementation_url)?,
_ => bail!("Unsupported scheme on implementation_url")
};
implementations.insert(job.implementation_url.clone(), implementation);
}
let implementation = implementations.get(&job.implementation_url)
.ok_or("Could not find implementation")?;
execute_job(job, job_sender, name, implementation)
}
fn execute_job(
mut job: Job,
job_tx: &Sender<Job>,
name: &str,
implementation: &Arc<dyn Implementation>,
) -> Result<()> {
trace!("Job #{}: Started executing on '{name}'", job.job_id);
job.result = implementation.run(&job.input_set);
trace!("Job #{}: Finished executing on '{name}'", job.job_id);
job_tx.send(job).chain_err(|| "Error sending job result back after execution")
}
fn resolve_implementation(provider: Arc<dyn Provider>,
implementation_url: &Url,
) -> Result<Arc<dyn Implementation>> {
format!("Implementation at '{}' is not loaded", implementation_url);
let wasm_executor = wasm::load(&* provider, implementation_url)?;
Ok(Arc::new(wasm_executor) as Arc<dyn Implementation>)
}
fn load_referenced_implementation(
provider: Arc<dyn Provider>,
lib_root_url: Url,
loaded_lib_manifests: Arc<RwLock<HashMap<Url, (LibraryManifest, Url)>>>,
implementation_url: &Url
) -> Result<Arc<dyn Implementation>> {
let (lib_manifest, resolved_lib_url) = get_lib_manifest_tuple(provider.clone(), loaded_lib_manifests, &lib_root_url)?;
let locator = lib_manifest
.locators
.get(implementation_url)
.ok_or(format!(
"Could not find ImplementationLocator for '{}' in library",
implementation_url
))?;
let implementation = match locator {
Wasm(wasm_source_relative) => {
let wasm_url = resolved_lib_url
.join(wasm_source_relative)
.map_err(|e| e.to_string())?;
debug!("Attempting to load wasm from source file: '{}'", wasm_url);
let wasm_executor = wasm::load(&*provider as &dyn Provider, &wasm_url)?;
Arc::new(wasm_executor) as Arc<dyn Implementation>
}
Native(native_impl) => native_impl.clone(),
};
Ok(implementation)
}
fn get_lib_manifest_tuple(
provider: Arc<dyn Provider>,
loaded_lib_manifests: Arc<RwLock<HashMap<Url, (LibraryManifest, Url)>>>,
lib_root_url: &Url,
) -> Result<(LibraryManifest, Url)> {
let mut lib_manifests = loaded_lib_manifests.try_write()
.map_err(|_| "Could not get write access to the loaded lib manifests")?;
if lib_manifests.get(lib_root_url).is_none() {
info!("Attempting to load library manifest'{}'", lib_root_url);
let manifest_tuple =
LibraryManifest::load(&*provider as &dyn Provider, lib_root_url).chain_err(|| {
format!("Could not load library with root url: '{}'", lib_root_url)
})?;
lib_manifests
.insert(lib_root_url.clone(), manifest_tuple);
}
let tuple = lib_manifests
.get(lib_root_url)
.ok_or_else(|| {
std::io::Error::new(
std::io::ErrorKind::Other,
"Could not find (supposedly already loaded) library manifest",
)
})?;
Ok(tuple.clone())
}