use clap::Args;
use clap_complete::ArgValueCandidates;
use console::style;
use indicatif::HumanCount;
use log::{debug, info, trace, warn};
use signal_hook::consts::{SIGINT, SIGTERM};
use signal_hook::flag;
use std::collections::HashSet;
use std::error::Error;
use std::fmt::Write as _;
use std::io::prelude::*;
use std::io::{self, IsTerminal};
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::time::Instant;
use wildmatch::WildMatch;
use crate::MultiProgressContainer;
use crate::cli::{GlobalOptions, autocomplete};
use crate::format::HumanDuration;
use crate::project::Project;
use crate::workflow::{Action, ResourceCost};
#[derive(Args, Debug)]
pub struct Arguments {
#[arg(short, long, value_name = "pattern", default_value_t=String::from("*"), display_order=0,
add=ArgValueCandidates::new(autocomplete::get_action_candidates))]
action: String,
#[arg(add=ArgValueCandidates::new(autocomplete::get_directory_candidates))]
directories: Vec<PathBuf>,
#[arg(long, display_order = 0, env = "ROW_YES", hide_env = true)]
yes: bool,
#[arg(long, display_order = 0)]
dry_run: bool,
#[arg(short, display_order = 0)]
n: Option<usize>,
}
pub fn submit<W: Write>(
options: &GlobalOptions,
args: Arguments,
multi_progress: &mut MultiProgressContainer,
output: &mut W,
) -> Result<(), Box<dyn Error>> {
debug!("Submitting workflow actions to the scheduler.");
let action_matcher = WildMatch::new(&args.action);
let mut project = Project::open(options.io_threads, &options.cluster, multi_progress)?;
let query_directories = if args.directories.is_empty() {
project.state().list_directories()
} else {
args.directories
};
let mut matching_action_count = 0;
let mut action_directory_set = HashSet::new();
let mut action_groups: Vec<(&Action, Vec<Vec<PathBuf>>)> =
Vec::with_capacity(project.workflow().action.len());
for action in &project.workflow().action {
if !action_matcher.matches(action.name()) {
trace!(
"Skipping action '{}'. It does not match the pattern '{}'.",
action.name(),
args.action
);
continue;
}
matching_action_count += 1;
let matching_directories =
project.find_matching_directories(action, query_directories.clone())?;
let status = project.separate_by_status(action, matching_directories)?;
let groups = project.separate_into_groups(action, status.eligible)?;
if action.group.submit_whole() {
let whole_groups = project.separate_into_groups(
action,
project.find_matching_directories(action, project.state().list_directories())?,
)?;
for group in &groups {
if !whole_groups.contains(group) {
return Err(Box::new(crate::Error::PartialGroupSubmission(
action.name().into(),
)));
}
}
}
for group in &groups {
for directory in group {
if !action_directory_set.insert((action.name.clone(), directory.clone())) {
return Err(Box::new(crate::Error::WouldSubmitMultipleTimes(
directory.clone(),
action.name().into(),
)));
}
}
}
action_groups.push((action, groups));
}
if matching_action_count == 0 {
warn!("No actions match '{}'.", args.action);
project.close(multi_progress)?;
return Ok(());
}
info!("Preparing jobs that execute the following actions:");
let mut total_cost = ResourceCost::new();
let mut action_directories: Vec<(Action, Vec<PathBuf>)> = Vec::new();
for (action, groups) in action_groups {
let mut cost = ResourceCost::new();
let mut job_count = 0;
for group in groups {
if let Some(n) = args.n
&& action_directories.len() >= n
{
break;
}
cost = cost + action.resources.cost(group.len());
action_directories.push((action.clone(), group.clone()));
job_count += 1;
}
if job_count > 0 {
info!(
" - {}: {} {} that may cost up to {}.",
action.name(),
job_count,
if job_count == 1 { "job" } else { "jobs" },
cost,
);
}
total_cost = total_cost + cost;
if let Some(n) = args.n
&& action_directories.len() >= n
{
break;
}
}
if action_directories.is_empty() {
warn!("There are no eligible jobs to submit.");
project.close(multi_progress)?;
return Ok(());
}
if args.dry_run {
let scheduler = project.scheduler();
info!("Execute without --dry-run to submit the following scripts...");
for (index, (action, directories)) in action_directories.iter().enumerate() {
info!("Script {}/{}:", index + 1, action_directories.len());
let script = scheduler.make_script(
action,
directories,
&project.workflow().workspace.path,
project.state().values(),
)?;
write!(output, "{script}")?;
output.flush()?;
}
project.close(multi_progress)?;
return Ok(());
}
write!(output, "Submitting ")?;
let jobs = if action_directories.len() == 1 {
"job"
} else {
"jobs"
};
write!(
output,
"{} ",
style(format!(
"{} {}",
HumanCount(action_directories.len() as u64),
jobs
))
.yellow()
.bold()
)?;
writeln!(
output,
"that may cost up to {}.",
style(total_cost).cyan().bold()
)?;
output.flush()?;
if std::io::stdout().is_terminal() && !args.yes {
let mut input = String::new();
multi_progress.suspend(|| {
print!("Proceed? [Y/n]: ");
io::stdout().flush().expect("Can flush stdout");
io::stdin()
.read_line(&mut input)
.expect("Failed to read line");
});
let selection = input.trim().to_lowercase();
if selection != "y" && !selection.is_empty() {
warn!("Cancelling submission.");
return Ok(());
}
}
project.close(multi_progress)?;
multi_progress.clear().unwrap();
let should_terminate = Arc::new(AtomicBool::new(false));
flag::register_conditional_shutdown(SIGINT, 10, Arc::clone(&should_terminate))?;
flag::register(SIGINT, Arc::clone(&should_terminate))?;
flag::register_conditional_shutdown(SIGTERM, 10, Arc::clone(&should_terminate))?;
flag::register(SIGTERM, Arc::clone(&should_terminate))?;
let instant = Instant::now();
for (index, (action, directories)) in action_directories.iter().enumerate() {
let scheduler = project.scheduler();
let mut message = format!(
"[{}/{}] Submitting action '{}' on directory {}",
HumanCount((index + 1) as u64),
HumanCount(action_directories.len() as u64),
style(action.name().to_string()).blue(),
style(directories[0].display().to_string()).bold()
);
if directories.len() > 1 {
message += &style(format!(" and {} more", directories.len() - 1))
.italic()
.to_string();
}
let _ = write!(
message,
" ({:#}).",
style(HumanDuration(instant.elapsed())).dim()
);
println!("{message}");
let result = scheduler.submit(
&project.workflow().root,
action,
directories,
&project.workflow().workspace.path,
project.state().values(),
Arc::clone(&should_terminate),
);
match result {
Err(error) => {
project.close(multi_progress)?;
return Err(error.into());
}
Ok(Some(job_id)) => {
println!("Row submitted job {job_id}.");
project.add_submitted(action.name(), directories, job_id);
}
Ok(None) => (),
}
}
project.close(multi_progress)?;
Ok(())
}