use ::std::sync::atomic::AtomicUsize;
use ::std::sync::atomic::Ordering;
use ::std::sync::Arc;
use ::clap::StructOpt;
use ::dashmap::DashMap;
use ::log::debug;
use ::log::info;
use ::rand::Rng;
use ::rayon::iter::{IntoParallelIterator, ParallelIterator};
use ::rayon::ThreadPoolBuilder;
use crate::cmd::cmd_io::current_time_s;
use crate::cmd::cmd_io::read;
use crate::cmd::cmd_io::write;
use crate::cmd::cmd_type::RunId;
use crate::cmd::cmd_type::RunningTask;
use crate::cmd::cmd_type::TaskStack;
use crate::cmd::cmd_type::TaskType;
use crate::ExitStatus;
#[derive(StructOpt, Debug)]
#[structopt(
name = "cmdo",
about = "Execute a command and remove it from the stack if successful. See also cmadd, cmlist, cmdrop"
)]
pub struct DoArgs {
#[structopt(short = 'n', long, default_value = "")]
pub namespace: String,
#[structopt(short = 'c', long, default_value = "1")]
pub count: u32,
#[structopt(short = 'a', long, conflicts_with = "count")]
pub all: bool,
#[structopt(short = 'p', long = "parallel", default_value = "1")]
pub parallel: u32,
#[structopt(short = 'g', long = "restart-running")]
pub restart_running: bool,
#[structopt(short = 'f', long = "continue-on-error")]
pub continue_on_error: bool,
#[structopt(short = 'r', long = "drop-failed")]
pub drop_failed: bool,
#[structopt(short = 'k', long = "keep")]
pub keep_successful: bool,
#[structopt(short = 'q', long)]
pub quiet: bool,
#[structopt(short = '0', long = "allow-empty")]
pub allow_empty: bool,
}
#[test]
fn test_cli_args() {
use clap::IntoApp;
DoArgs::into_app().debug_assert()
}
pub fn do_cmd(args: DoArgs) -> bool {
let args = verify_args(args);
let ts_s = current_time_s();
let mut tasks = read(args.namespace.clone());
if tasks.is_empty() {
if args.allow_empty {
return true;
}
eprintln!("there are no commands to run, use cmadd to add them");
return false;
}
let to_run = mark_tasks_to_run(args.restart_running, args.all, args.count, &mut tasks, ts_s);
write(args.namespace.clone(), &tasks);
let statuses = run_tasks(to_run, args.continue_on_error, args.parallel, args.quiet);
let tasks = read(args.namespace.clone());
let remaining = remove_completed_tasks(&args, tasks, &statuses);
write(args.namespace, &remaining);
if !args.quiet {
if remaining.is_empty() {
println!("no commands left");
} else {
println!("{} command(s) left", remaining.len());
}
}
let all_ok = statuses
.iter()
.all(|entry| *entry.value() == Status::Success);
all_ok
}
pub fn run_tasks(
to_run: Vec<RunningTask>,
continue_on_error: bool,
parallel: u32,
quiet: bool,
) -> Arc<DashMap<RunId, Status>> {
let statuses = Arc::new(DashMap::new());
to_run
.iter()
.map(|task| task.run_id)
.map(|run_id| (run_id, Status::Skipped))
.for_each(|(id, status)| {
statuses.insert(id, status);
});
let total_count = to_run.len();
let current_nr = AtomicUsize::new(1);
if continue_on_error {
ThreadPoolBuilder::new()
.num_threads(parallel as usize)
.build()
.expect("failed to create thread pool")
.install(|| {
to_run
.into_par_iter()
.map(|task| {
let (id, status) = exec(
task,
current_nr.fetch_add(1, Ordering::AcqRel),
total_count,
quiet,
);
statuses.insert(id, status);
})
.for_each(|_| {});
});
} else {
assert!(
parallel <= 1,
"cannot use parallel mode when continue-on-error is true"
);
to_run
.into_iter()
.map(|task| {
let (id, status) = exec(
task,
current_nr.fetch_add(1, Ordering::AcqRel),
total_count,
quiet,
);
statuses.insert(id, status);
status
})
.take_while(|status| status == &Status::Success)
.for_each(|_| {});
}
statuses
}
fn verify_args(mut args: DoArgs) -> DoArgs {
if args.parallel > 1 && !args.continue_on_error {
info!("enabling --continue-on-error because of --parallel");
args.continue_on_error = true
}
if args.all {
args.count = 0 }
if args.all || args.count < args.parallel {
info!(
"--parallel ({}) is higher than --count ({})",
args.parallel, args.count
);
}
args
}
fn exec(task: RunningTask, current_nr: usize, total_count: usize, quiet: bool) -> (RunId, Status) {
if !quiet {
if total_count > 1 {
println!("run {}/{}: {}", current_nr, total_count, task.as_str());
} else {
println!("run: {}", task.as_str());
}
}
let id = task.run_id;
let status = Status::from(task.task.execute_sync(!quiet));
(id, status)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Status {
Success,
Failed(ExitStatus),
Skipped,
}
impl Status {
pub fn exit_status(&self) -> ExitStatus {
match self {
Status::Success => ExitStatus::ok(),
Status::Skipped => ExitStatus::ok(),
Status::Failed(code) => ExitStatus::of(code.code),
}
}
}
impl From<ExitStatus> for Status {
fn from(exit: ExitStatus) -> Self {
if exit.is_ok() {
Status::Success
} else {
Status::Failed(exit)
}
}
}
pub fn mark_tasks_to_run(
restart_running: bool,
all: bool,
count: u32,
tasks: &mut TaskStack,
ts_s: u32,
) -> Vec<RunningTask> {
let rand_id = rand::thread_rng().gen::<u32>();
let mut run_nr = 0;
let mut to_run = vec![];
for task in tasks.iter_mut() {
let pending = match task {
TaskType::Running(running) => {
debug!(
"still running with run-id {}, command {}",
running.run_id,
running.as_str()
);
if restart_running {
running.task.clone()
} else {
eprintln!("skipping command because it is already running or has failed without contact: {}",
running.as_str());
continue;
}
}
TaskType::Pending(task) => task.clone(),
};
let run_id = RunId {
run_ts_s: ts_s,
run_rand_id: rand_id,
cmd_id: run_nr,
};
debug!(
"assigning run-id {} to command {}",
run_id,
pending.as_str()
);
let run_task = RunningTask::new(pending, run_id);
to_run.push(run_task.clone());
*task = TaskType::Running(run_task);
run_nr += 1;
if !all && run_nr >= count {
break;
}
}
to_run
}
fn remove_completed_tasks(
args: &DoArgs,
tasks: TaskStack,
statuses: &DashMap<RunId, Status>,
) -> TaskStack {
let filtered_tasks = tasks
.iter_old2new()
.flat_map(|task| should_keep_completed_task(task, args, statuses))
.collect();
TaskStack::from(filtered_tasks)
}
fn should_keep_completed_task(
task: &TaskType,
args: &DoArgs,
statuses: &DashMap<RunId, Status>,
) -> Option<TaskType> {
let cmd = task.as_cmd_str();
match task {
TaskType::Pending(pending) => {
debug!("keep command because it is not running: {}", &cmd);
Some(TaskType::Pending(pending.clone()))
}
TaskType::Running(running) => match statuses.get(&running.run_id) {
Some(value_ref) => match value_ref.value() {
Status::Success => {
if args.keep_successful {
debug!("keep successful command because all tasks kept: {}", &cmd);
Some(TaskType::Running(running.clone()))
} else {
debug!("removing successful command: {}", &cmd);
None
}
}
Status::Failed(code) => {
if args.drop_failed {
debug!(
"removing failed command (as requested with --drop-failed): {} (code {})",
&cmd, code
);
None
} else {
debug!(
"keep failed command to be retried: {} (code {})",
&cmd, code
);
Some(TaskType::Running(running.clone()))
}
}
Status::Skipped => {
debug!("keep skipped command to be retried: {}", &cmd);
Some(TaskType::Running(running.clone()))
}
},
None => {
eprintln!(
"command is running but not started by current run: {}",
&cmd
);
Some(TaskType::Running(running.clone()))
}
},
}
}