#![deny(clippy::unwrap_used, clippy::expect_used)]
use core::str::FromStr;
use std::path::PathBuf;
use std::process::exit;
use std::sync::Arc;
use std::{env, thread};
use clap::{Arg, ArgMatches, Command};
use env_logger::Builder;
use flowrlib::discovery::discover_service;
use log::{error, info, trace, LevelFilter};
use simpath::Simpath;
#[cfg(feature = "flowstdlib")]
use url::Url;
use flowcore::errors::{Result, ResultExt};
use flowcore::meta_provider::MetaProvider;
use flowcore::provider::Provider;
use flowrlib::executor::Executor;
use flowrlib::info as flowrlib_info;
use flowrlib::services::{CONTROL_SERVICE_NAME, JOB_SERVICE_NAME, RESULTS_JOB_SERVICE_NAME};
pub mod errors;
fn main() {
match run() {
Err(ref e) => {
error!("{e}");
for e in e.iter().skip(1) {
error!("caused by: {e}");
}
if let Some(backtrace) = e.backtrace() {
error!("backtrace: {backtrace:?}");
}
exit(1);
}
Ok(()) => exit(0),
}
}
fn run() -> Result<()> {
let matches = get_matches();
let default = String::from("error");
let verbosity = matches.get_one::<String>("verbosity").unwrap_or(&default);
let level = LevelFilter::from_str(verbosity).unwrap_or(LevelFilter::Error);
let mut builder = Builder::from_default_env();
builder.filter_level(level).init();
info!(
"'{}' version {}",
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_VERSION")
);
info!("'flowrlib' version {}", flowrlib_info::version());
start_executors(num_threads(&matches))?;
info!("'{}' has exited", env!("CARGO_PKG_NAME"));
Ok(())
}
fn start_executors(num_threads: usize) -> Result<()> {
loop {
#[allow(unused_mut)]
let mut executor = Executor::new();
#[cfg(feature = "flowstdlib")]
executor.add_lib(
flowstdlib::manifest::get()
.chain_err(|| "Could not get 'native' flowstdlib manifest")?,
Url::parse("memory://")?,
)?;
trace!(
"'flowstdlib' loaded into '{}' executors",
env!("CARGO_PKG_NAME")
);
let provider =
Arc::new(MetaProvider::new(Simpath::new(""), PathBuf::from("/"))) as Arc<dyn Provider>;
let job_service = format!("tcp://{}", discover_service(JOB_SERVICE_NAME)?);
let results_service = format!("tcp://{}", discover_service(RESULTS_JOB_SERVICE_NAME)?);
let control_service = format!("tcp://{}", discover_service(CONTROL_SERVICE_NAME)?);
trace!("Starting '{}' executors", env!("CARGO_PKG_NAME"));
executor.start(
&provider,
num_threads,
&job_service,
&results_service,
&control_service,
);
trace!("Waiting for all executors to complete");
executor.wait();
trace!("All executors completed, exiting");
}
}
#[allow(clippy::redundant_closure_for_method_calls)]
fn num_threads(matches: &ArgMatches) -> usize {
match matches.get_one::<usize>("threads") {
Some(num_threads) => *num_threads,
None => thread::available_parallelism().map_or(1, |n| n.get()),
}
}
fn get_matches() -> ArgMatches {
let app = Command::new(env!("CARGO_PKG_NAME")).version(env!("CARGO_PKG_VERSION"));
let app = app
.arg(Arg::new("threads")
.short('t')
.long("threads")
.number_of_values(1)
.value_parser(clap::value_parser!(usize))
.value_name("THREADS")
.help("Set number of threads to use to execute jobs (default: cores available)"))
.arg(Arg::new("verbosity")
.short('v')
.long("verbosity")
.number_of_values(1)
.value_name("VERBOSITY_LEVEL")
.help("Set verbosity level for output (trace, debug, info, warn, error (default), off)"));
app.get_matches()
}