use crate::{EvalState, FailurePolicy, InnerJobType, Job, Queues, TaskStatus, JobType};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use zookeeper::ZooKeeper;
use anyhow::Context;
/// Evaluation States
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
enum Op {
/// A task was spawned
Spawn,
/// No operation taken
DoNothing,
/// The task was successfully
Success,
Failure,
}
pub(crate) fn job_processing_loop(zk: Arc<ZooKeeper>, namespace: String, mut queue: Queues) {
loop {
let message = queue.job.consume(None);
if message.is_err() {
log::error!("unable to dequeue message. {:?}", message.err().unwrap());
thread::sleep(Duration::from_secs(3));
continue;
}
let message = message.unwrap();
let task_id: String = String::from_utf8(message.body).unwrap();
// check if the znode exists. if it doesn't, drop the message
if zk.clone().exists(&*format!("/jotty/{}/job/{}", &namespace, &task_id), false).unwrap().is_none() {
log::warn!("job {} consumed from work queue, but does not exist", &task_id);
continue;
}
log::trace!("invoking jotty::engine::job::evaluate({})", &task_id);
if let Err(error) = evaluate(zk.clone(), namespace.clone(), task_id.clone(), queue.clone()) {
log::error!(
"jotty::engine::job::evaluate({}) failed with {:#?}",
&task_id,
error
);
} else {
log::trace!("jotty::engine::job::evaluate({}) succeeded", &task_id);
}
}
}
/// Evaluates the given job
pub(crate) fn evaluate(
zk: Arc<ZooKeeper>,
namespace: String,
job_id: String,
queue: Queues,
) -> anyhow::Result<()> {
// load job definition from zookeeper
let mut job: Job = Job::load(zk.clone(), namespace.clone(), job_id).context("loading job from zookeeper")?;
log::trace!("job {} has an inner state: {:?}", &job.id, &job.inner);
let eval = eval_inner_job(
zk.clone(),
namespace.clone(),
job.id.clone(),
job.inner.clone(),
queue,
job.injected_tasks.clone()
)?;
// if there are injected_tasks, we can assume `eval_inner_job` handles it and now we need to
// empty the vec.
if !job.injected_tasks.is_empty() {
job.injected_tasks.clear();
}
log::trace!("job {} inner state will become (via {:?}): {:?}", &job.id, &eval.1, &eval.0);
job.inner = eval.0;
match eval.1 {
Op::Spawn => {
if let TaskStatus::Pending(_, _) = job.status {
// if the operation is spawn, and the job is in the pending state
log::info!(
"transitioning job {} from '{:?}' to '{:?}'",
&job.id, &job.status, TaskStatus::Run
);
job.status = TaskStatus::Run;
}
}
Op::DoNothing => {}
Op::Success => {
if let TaskStatus::Run = job.status {
log::info!(
"transitioning job {} from '{:?}' to '{:?}'",
&job.id, &job.status, TaskStatus::Success
);
job.status = TaskStatus::Success;
}
}
// TODO add logic to re-try here
Op::Failure => {
match job.scheduler.2 {
FailurePolicy::FiniteRetry(i, j) => {
if i == j {
log::trace!("job {} has exhausted failure retries", job.id);
job.status = TaskStatus::Failure;
} else {
// decrement the count by 1
job.scheduler = (
job.scheduler.0,
job.scheduler.1,
FailurePolicy::FiniteRetry(i, j + 1),
)
}
}
// if the policy is to stop with a task failure, set the job status to Failure
FailurePolicy::Stop => job.status = TaskStatus::Failure,
}
}
}
job.save(zk, namespace)?;
// TODO we might want to submit a task to re-evaluate this job?
Ok(())
}
/// This function can be recursively called until a InnerJobType::Single is reached
#[allow(clippy::mut_range_bound)]
#[allow(clippy::boxed_local)]
fn eval_inner_job(
zk: Arc<ZooKeeper>,
namespace: String,
job_id: String,
inner: Box<InnerJobType>,
queue: Queues,
job_tasks: Vec<JobType>
) -> anyhow::Result<(Box<InnerJobType>, Op)> {
match *inner {
InnerJobType::Single(task_def, state, failures) => match state {
EvalState::None(_) => Ok((
Box::new(InnerJobType::Single(
task_def.clone(),
EvalState::Some(crate::engine::task::run_task(zk, namespace, &task_def, job_id, queue)?),
failures,
)),
Op::Spawn,
)),
EvalState::Some(inner) => {
match crate::engine::task::get_task(zk, namespace, &inner)?.status {
TaskStatus::Pending(_, _) | TaskStatus::Scheduled | TaskStatus::Run => Ok((
Box::new(InnerJobType::Single(
task_def,
EvalState::Some(inner),
failures,
)),
Op::DoNothing,
)),
TaskStatus::Success => Ok(inject_jobs(InnerJobType::Single(
task_def,
EvalState::Done(inner),
failures,
), job_tasks)
),
TaskStatus::Failure => Ok((
Box::new(InnerJobType::Single(
task_def,
EvalState::Done(inner.clone()),
append_n_return(failures, inner),
)),
Op::Failure,
)),
TaskStatus::Lost(_) => Ok((
// reset the evaluation state to None, so it is re-evaluated
Box::new(InnerJobType::Single(
task_def,
EvalState::None("unset".to_string()),
append_n_return(failures, inner),
)),
Op::Failure,
)),
}
}
EvalState::Done(inner) => {
match crate::engine::task::get_task(zk, namespace, &inner)?.status {
// at this point, the task should not be in another state other than completed or failed
TaskStatus::Success => Ok((Box::from(InnerJobType::Single(
task_def,
EvalState::Done(inner),
failures,
)), Op::Success)),
TaskStatus::Failure => Ok((Box::new(InnerJobType::Single(
task_def,
EvalState::Done(inner),
failures,
)), Op::Failure)),
_ => panic!("job is in an invalid state"),
}
}
},
InnerJobType::Linear(tasks) => {
log::trace!("[eval_inner_job] InnerJobType::Linear({:?})", &tasks);
let mut new_tasks = vec![];
let mut i = 0;
let mut status = Op::DoNothing;
let mut successes = 0;
// start looping over tasks, evaluating each one in order
// if the operation returned is not Done, then break
for _ in 0..tasks.len() {
let eval = eval_inner_job(
zk.clone(),
namespace.clone(),
job_id.clone(),
tasks[i].clone(),
queue.clone(),
job_tasks.clone()
)?;
i += 1;
new_tasks.push(eval.0);
match &eval.1 {
// If DoNothing, then the task is still running and there is nothing to do here
Op::DoNothing => break,
Op::Spawn => {
status = Op::Spawn;
break;
}
// if Failure, break but note the failure
Op::Failure => {
status = Op::Failure;
break;
}
// do nothing and continue onto the next one
Op::Success => successes += 1,
}
}
// now, append everything else onto the new tasks vec
for _ in i..tasks.len() {
new_tasks.push(tasks[i].clone());
i += 1;
}
// if the number of successes we have is equal to the number of tasks, all the tasks
// completed successfully, and return as such
if tasks.len() == successes {
status = Op::Success;
}
Ok((Box::new(InnerJobType::Linear(new_tasks)), status))
}
InnerJobType::Branch(root, left, right) => {
log::trace!("[eval_inner_job] InnerJobType::Branch({:?}, {:?}, {:?})", &root, &left, &right);
let root_job = eval_inner_job(
zk.clone(),
namespace.clone(),
job_id.clone(),
root,
queue.clone(),
job_tasks.clone()
)?;
match root_job.1 {
// nothing to do here, because the job is either stated or running
Op::DoNothing | Op::Spawn => Ok((
Box::new(InnerJobType::Branch(root_job.0, left, right)),
root_job.1,
)),
// the root element was successful, we move onto the left element
Op::Success => {
if left.is_none() {
// there is no job here, so just return the parent job
return Ok((
Box::new(InnerJobType::Branch(root_job.0, left, right)),
root_job.1,
));
}
let left_job = eval_inner_job(zk, namespace, job_id, left.unwrap(), queue, job_tasks)?;
Ok((
Box::new(InnerJobType::Branch(root_job.0, Some(left_job.0), right)),
left_job.1,
))
}
// the root element was not successful, we move on to the right element
Op::Failure => {
if right.is_none() {
// there is no job here, so just return the parent job
return Ok((
Box::new(InnerJobType::Branch(root_job.0, left, right)),
root_job.1,
));
}
let right_job = eval_inner_job(zk, namespace, job_id, right.unwrap(), queue, job_tasks)?;
Ok((
Box::new(InnerJobType::Branch(root_job.0, left, Some(right_job.0))),
right_job.1,
))
}
}
}
}
}
/// Method to inject injected tasks into a Job's execution tree.
///
/// Called when InnerJobType::Single is in a completed state. If there are jobs to be injected,
/// converts the job to InnerJobType::Linear with the freshly completed task as the first element,
/// and then all injected tasks after it.
///
/// The new job tree is returned, and the new task(s) should run during the next evaluation loop.
fn inject_jobs(current_job: InnerJobType, new_jobs: Vec<JobType>) -> (Box<InnerJobType>, Op) {
if let InnerJobType::Single(_, _, _) = current_job { } else { panic!("idiot"); }
if new_jobs.is_empty() {
return (Box::new(current_job), Op::Success);
}
let mut contents = vec![ Box::new(current_job) ];
for job in new_jobs {
contents.push(job.load());
}
(Box::new(InnerJobType::Linear(contents)), Op::Spawn)
}
/// Iterate over all known job
pub(crate) fn job_walk(zk: Arc<ZooKeeper>, namespace: String, queue: zkmq::ZkMQ) {
let last_run = format!("/jotty/{}/run/job_walk_last", &namespace);
let job_path = format!("/jotty/{}/job", &namespace);
loop {
thread::sleep(Duration::from_secs(5)); // TODO make this dynamic
if zk.exists(last_run.as_str(), false).unwrap().is_none() {
zk.create(
last_run.as_str(),
serde_json::to_vec(&Utc::now()).unwrap(),
zookeeper::Acl::open_unsafe().clone(),
zookeeper::CreateMode::Persistent,
)
.unwrap();
} else {
let run_time: DateTime<Utc> =
serde_json::from_slice(zk.get_data(last_run.as_str(), false).unwrap().0.as_slice())
.unwrap();
if run_time + chrono::Duration::seconds(10) > Utc::now() {
// TODO make this dynamic as well
log::trace!("not time to do another job walk");
continue;
}
zk.set_data(
last_run.as_str(),
serde_json::to_vec(&Utc::now()).unwrap(),
None,
)
.unwrap();
}
// TODO add latch to prevent this from running more than once at the same time at /jotty/{}/run/task_walk/latch
let children = zk.get_children(job_path.as_str(), false).unwrap();
for task in &children {
if let Err(error) = queue.produce(task.as_bytes()) {
log::error!("unable to re-enqueue job {} due to error: {:?}. task is lost!", &task, error);
} else {
log::trace!("enqueued job {} for re-evaluation successfully", &task);
}
}
log::trace!("evaluated {} jobs", children.len());
}
}
// why is this not a default feature in rust?
fn append_n_return<T: Sized + Clone>(parent: Vec<T>, element: T) -> Vec<T> {
let mut new = vec![element];
#[allow(clippy::redundant_clone)]
new.append(&mut parent.clone());
new
}
#[cfg(test)]
mod tests {
// use crate::engine::job::{eval_inner_job, Op};
// use std::time::Duration;
// use std::env;
// use std::sync::Arc;
// use crate::{TaskDef, Queues};
// #[test]
// fn test_eval_job_single() {
// struct NoopWatcher;
//
// impl zookeeper::Watcher for NoopWatcher {
// fn handle(&self, _ev: zookeeper::WatchedEvent) {}
// }
//
// fn zk_server_urls() -> String {
// let key = "ZOOKEEPER_SERVERS";
// match env::var(key) {
// Ok(val) => val,
// Err(_) => "localhost:2181".to_string(),
// }
// }
// let zk_urls = zk_server_urls();
//
// let zk = zookeeper::ZooKeeper::connect(&*zk_urls, Duration::from_millis(2500), NoopWatcher).unwrap();
// let zka = Arc::new(zk);
//
// let task = TaskDef {
// method: "numm.method".to_string(),
// arguments: Default::default(),
// resources: Default::default(),
// policy: None
// };
//
// let r = eval_inner_job(zka, "testing".to_string(), Box::new(InnerJobType::Single(task.clone(), EvalState::None)), Queues{ work: (), task: () });
//
// assert_eq!(r.is_ok(), true);
// let inner = r.unwrap();
// assert_eq!(
// inner.1,
// Op::DoNothing
// );
// match *inner.0 {
// InnerJobType::Single(innertask, state) => {
// assert_eq!(task, innertask);
// match state {
// EvalState::Some(_) => assert_eq!(true, true),
// _ => panic!("invalid job state")
// }
// },
// _ => panic!("invalid job type")
// }
// }
//
// #[test]
// fn test_eval_job_branch() {
// struct NoopWatcher;
//
// impl zookeeper::Watcher for NoopWatcher {
// fn handle(&self, _ev: zookeeper::WatchedEvent) {}
// }
//
// fn zk_server_urls() -> String {
// let key = "ZOOKEEPER_SERVERS";
// match env::var(key) {
// Ok(val) => val,
// Err(_) => "localhost:2181".to_string(),
// }
// }
// let zk_urls = zk_server_urls();
//
// let zk = zookeeper::ZooKeeper::connect(&*zk_urls, Duration::from_millis(2500), NoopWatcher).unwrap();
// let zka = Arc::new(zk);
//
// let task = TaskDef {
// method: "numm.method".to_string(),
// arguments: Default::default(),
// resources: Default::default(),
// policy: None
// };
// let task_left = TaskDef {
// method: "left.method".to_string(),
// arguments: Default::default(),
// resources: Default::default(),
// policy: None
// };
//
// let job = JobType::Branch(
// Box::new(JobType::Single(task.clone())),
// Some(Box::new(JobType::Single(task_left.clone()))),
// None
// );
//
// let r = eval_inner_job(zka, "testing".to_string(), job.load());
//
// assert_eq!(r.is_ok(), true);
// let inner = r.unwrap();
// assert_eq!(
// inner.1,
// Op::DoNothing
// );
// match *inner.0 {
// InnerJobType::Branch(orig, left, right) => {
// // make sure right is empty
// assert_eq!(right, None);
//
// // make sure orig value is correct
// match *orig {
// InnerJobType::Single(itask, state) => {
// assert_eq!(itask, task);
//
// assert_eq!(match state {
// EvalState::Some(_) => true,
// _ => false
// }, true);
// }
// _ => panic!("unexpected start job in branch job")
// }
//
// // make sure the left value is right
// assert_eq!(left.is_some(), true);
// match *left.unwrap() {
// InnerJobType::Single(itask, state) => {
// assert_eq!(itask, task_left);
//
// assert_eq!(match state {
// EvalState::None => true,
// _ => false
// }, true);
// }
// _ => panic!("unexpected left job in branch job")
// }
// },
// _ => panic!("invalid job type")
// }
// }
//
}