use anyhow::Error;
use bollard::Docker;
use croner::Cron;
use regex::Regex;
use tokio::task::JoinSet;
use tracing::{debug, warn};
use std::{collections::HashMap, fmt::Debug, time::Duration};
mod common;
mod exec;
mod run;
mod local;
mod servicerun;
pub use exec::ExecJobInfo;
pub use run::RunJobInfo;
pub use local::LocalJobInfo;
pub use servicerun::ServiceRunJobInfo;
fn schedule_to_cron(sched: &str) -> Result<Cron, Error> {
let mut sched = sched.trim().to_string();
let re = Regex::new("^@every\\s+(?<interval>[0-9]+)(?<unit>s|m|h)$").unwrap();
match re.captures(sched.as_str()) {
Some(c) => {
let interval: i32 = c.name("interval").unwrap().as_str().parse().unwrap();
let unit = c.name("unit").unwrap().as_str();
match unit {
"s" => sched = format!("*/{} * * * * *", interval).to_string(),
"m" => sched = format!("0 */{} * * * *", interval).to_string(),
"h" => sched = format!("0 0 */{} * * *", interval).to_string(),
_ => unreachable!("Encountered an unhandled time unit while parsing a schedule"),
}
},
None => {},
}
Cron::new(&sched).with_seconds_optional().parse().map_err(|e| Error::new(e))
}
async fn cron_sleep(cron: &Cron) -> Result<Option<bool>, Error> {
let current_time = chrono::Local::now();
let next_occurence = cron.find_next_occurrence(¤t_time, false).unwrap();
let sleep = (next_occurence - current_time).num_milliseconds();
assert!(sleep >= 0);
tokio::time::sleep(Duration::from_millis(sleep as u64)).await;
Ok(Some(true))
}
#[derive(Debug)]
pub enum JobInfo {
ExecJob(Box<ExecJobInfo>),
RunJob(Box<RunJobInfo>),
LocalJob(Box<LocalJobInfo>),
ServiceRunJob(Box<ServiceRunJobInfo>),
}
#[macro_export]
macro_rules! match_all_jobs {
($target: expr, $varname: ident, $processing: expr) => {
match $target {
JobInfo::ExecJob($varname) => $processing,
JobInfo::RunJob($varname) => $processing,
JobInfo::LocalJob($varname) => $processing,
JobInfo::ServiceRunJob($varname) => $processing,
}
};
}
pub use match_all_jobs;
impl TryFrom<HashMap<String, Vec<String>>> for JobInfo {
type Error = Error;
fn try_from(mut parameters: HashMap<String, Vec<String>>) -> Result<Self, Self::Error> {
let kind = parameters.remove("kind");
let command = parameters.remove("command");
let schedule = parameters.remove("schedule");
if kind == None {
return Err(Error::msg(format!["The job has no job type"]));
}
if kind.as_ref().unwrap().len() != 1 {
debug!["The job has several kinds set, using the last configured one"];
}
if command == None {
return Err(Error::msg(format!["The job has no command"]));
}
if schedule == None {
return Err(Error::msg(format!["The job has no schedule"]));
}
let schedule = schedule.unwrap().pop().unwrap();
let kind = kind.unwrap().pop().unwrap();
let job_info: JobInfo;
match kind.as_str() {
ExecJobInfo::LABEL => {
let job = ExecJobInfo {
name: parameters.remove("name").map_or_else(|| "".to_string(), |mut n| n.pop().unwrap()),
schedule: schedule_to_cron(&schedule.as_str()).unwrap(),
command: command.unwrap().pop().unwrap(),
container: parameters.remove("container").map(|mut c| c.pop().unwrap()).unwrap(),
user: parameters.remove("user").map(|mut u| u.pop().unwrap()),
tty: parameters.remove("tty").map_or(false, |mut t| t.pop().unwrap().parse().unwrap()),
environment: parameters.remove("environment").unwrap_or(Default::default()),
};
job_info = JobInfo::ExecJob(Box::new(job));
},
RunJobInfo::LABEL => {
let job = RunJobInfo {
name: parameters.remove("name").map_or_else(|| "".to_string(), |mut n| n.pop().unwrap()),
schedule: schedule_to_cron(&schedule.as_str()).unwrap(),
command: command.unwrap().pop().unwrap(),
image: parameters.remove("image").map(|mut c| c.pop().unwrap()),
user: parameters.remove("user").map_or(None, |mut u| Some(u.pop().unwrap())),
network: parameters.remove("network"),
hostname: parameters.remove("hostname").map_or(None, |mut u| Some(u.pop().unwrap())),
delete: parameters.remove("delete").map_or(true, |mut t| t.pop().unwrap().parse().unwrap()),
container: parameters.remove("container").map(|mut c| c.pop().unwrap()),
tty: parameters.remove("tty").map_or(false, |mut t| t.pop().unwrap().parse().unwrap()),
volume: parameters.remove("volume").unwrap_or_else(|| Default::default()),
environment: parameters.remove("environment").unwrap_or(Default::default()),
};
if job.image == None && job.container == None {
return Err(Error::msg(format!["The job {} has neither an image nor a container parameter. At least one of thse must be set.", job.name]));
}
job_info = JobInfo::RunJob(Box::new(job));
},
LocalJobInfo::LABEL => {
let job = LocalJobInfo {
name: parameters.remove("name").map_or_else(|| "".to_string(), |mut n| n.pop().unwrap()),
schedule: schedule_to_cron(&schedule.as_str()).unwrap(),
command: command.unwrap().pop().unwrap(),
dir: parameters.remove("dir").map(|mut d| d.pop().unwrap()),
environment: parameters.remove("environment").unwrap_or(Default::default()),
};
job_info = JobInfo::LocalJob(Box::new(job));
},
ServiceRunJobInfo::LABEL => {
let job = ServiceRunJobInfo {
name: parameters.remove("name").map_or_else(|| "".to_string(), |mut n| n.pop().unwrap()),
schedule: schedule_to_cron(&schedule.as_str()).unwrap(),
command: command.unwrap().pop().unwrap(),
image: parameters.remove("image").map(|mut c| c.pop().unwrap()),
user: parameters.remove("user").map_or(None, |mut u| Some(u.pop().unwrap())),
network: parameters.remove("network"),
delete: parameters.remove("delete").map_or(true, |mut t| t.pop().unwrap().parse().unwrap()),
container: parameters.remove("container").map(|mut c| c.pop().unwrap()),
tty: parameters.remove("tty").map_or(false, |mut t| t.pop().unwrap().parse().unwrap()),
};
job_info = JobInfo::ServiceRunJob(Box::new(job));
}
_ => return Err(Error::msg(format!["Unsupported job type {}", kind])),
}
if !parameters.is_empty() {
let k: Vec<&String> = parameters.keys().collect();
warn!["There are unused keys in the job. Unaffected values: {:?}", k];
}
Ok(job_info)
}
}
impl JobInfo {
pub async fn start(self, _handle: Docker) -> Result<Option<bool>, Error> {
let mut set = JoinSet::new();
let cron;
let may_run_parallel;
match_all_jobs!(&self, e, {cron = e.get_schedule(); may_run_parallel = e.may_run_parallel();});
let initial_cron = cron.clone();
set.spawn(async move {cron_sleep(&initial_cron).await});
while let Some(res) = set.join_next().await {
if let Ok(Ok(Some(_))) = res {
if may_run_parallel || set.is_empty() {
match_all_jobs!(&self, e, {
let exec_job = e.as_ref().clone();
set.spawn(async {exec_job.exec().await});
});
}
let cron = cron.clone();
set.spawn(async move {cron_sleep(&cron).await});
}
}
warn!["A job terminated, this is probably not desired: {:?}", self];
Err(Error::msg("Aborting because a job unexpectedly stopped"))
}
pub fn name(&self) -> &String {
match_all_jobs!(self, e, &e.name)
}
pub fn command(&self) -> &String {
match_all_jobs!(self, e, &e.command)
}
#[deprecated]
pub fn schedule(&self) -> &Cron {
match_all_jobs!(self, e, &e.schedule)
}
pub fn kind(&self) -> &str {
match self {
JobInfo::ExecJob(_) => ExecJobInfo::LABEL,
JobInfo::RunJob(_) => RunJobInfo::LABEL,
JobInfo::LocalJob(_) => LocalJobInfo::LABEL,
JobInfo::ServiceRunJob(_) => ServiceRunJobInfo::LABEL,
}
}
}