jotty 0.3.1

JoTTy is an embeddable distributed processing framework for both short and long running batch jobs
Documentation
use crate::{EvalState, FailurePolicy, InnerJobType, Job, Queues, TaskStatus, JobType};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use zookeeper::ZooKeeper;
use anyhow::Context;

/// Evaluation States
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
enum Op {
    /// A task was spawned
    Spawn,
    /// No operation taken
    DoNothing,
    /// The task was successfully
    Success,
    Failure,
}

pub(crate) fn job_processing_loop(zk: Arc<ZooKeeper>, namespace: String, mut queue: Queues) {
    loop {
        let message = queue.job.consume(None);
        if message.is_err() {
            log::error!("unable to dequeue message. {:?}", message.err().unwrap());
            thread::sleep(Duration::from_secs(3));
            continue;
        }
        let message = message.unwrap();

        let task_id: String = String::from_utf8(message.body).unwrap();

        // check if the znode exists. if it doesn't, drop the message
        if zk.clone().exists(&*format!("/jotty/{}/job/{}", &namespace, &task_id), false).unwrap().is_none() {
            log::warn!("job {} consumed from work queue, but does not exist", &task_id);
            continue;
        }

        log::trace!("invoking jotty::engine::job::evaluate({})", &task_id);

        if let Err(error) = evaluate(zk.clone(), namespace.clone(), task_id.clone(), queue.clone()) {
            log::error!(
                "jotty::engine::job::evaluate({}) failed with {:#?}",
                &task_id,
                error
            );
        } else {
            log::trace!("jotty::engine::job::evaluate({}) succeeded", &task_id);
        }
    }
}

/// Evaluates the given job
pub(crate) fn evaluate(
    zk: Arc<ZooKeeper>,
    namespace: String,
    job_id: String,
    queue: Queues,
) -> anyhow::Result<()> {
    // load job definition from zookeeper
    let mut job: Job = Job::load(zk.clone(), namespace.clone(), job_id).context("loading job from zookeeper")?;

    log::trace!("job {} has an inner state: {:?}", &job.id, &job.inner);
    let eval = eval_inner_job(
        zk.clone(),
        namespace.clone(),
        job.id.clone(),
        job.inner.clone(),
        queue,
        job.injected_tasks.clone()
    )?;

    // if there are injected_tasks, we can assume `eval_inner_job` handles it and now we need to
    // empty the vec.
    if !job.injected_tasks.is_empty() {
        job.injected_tasks.clear();
    }

    log::trace!("job {} inner state will become (via {:?}): {:?}", &job.id, &eval.1,  &eval.0);

    job.inner = eval.0;

    match eval.1 {
        Op::Spawn => {
            if let TaskStatus::Pending(_, _) = job.status {
                // if the operation is spawn, and the job is in the pending state
                log::info!(
                    "transitioning job {} from '{:?}' to '{:?}'",
                    &job.id, &job.status, TaskStatus::Run
                );
                job.status = TaskStatus::Run;
            }
        }
        Op::DoNothing => {}
        Op::Success => {
            if let TaskStatus::Run = job.status {
                log::info!(
                    "transitioning job {} from '{:?}' to '{:?}'",
                    &job.id, &job.status, TaskStatus::Success
                );
                job.status = TaskStatus::Success;
            }
        }
        // TODO add logic to re-try here
        Op::Failure => {
            match job.scheduler.2 {
                FailurePolicy::FiniteRetry(i, j) => {
                    if i == j {
                        log::trace!("job {} has exhausted failure retries", job.id);
                        job.status = TaskStatus::Failure;
                    } else {
                        // decrement the count by 1
                        job.scheduler = (
                            job.scheduler.0,
                            job.scheduler.1,
                            FailurePolicy::FiniteRetry(i, j + 1),
                        )
                    }
                }
                // if the policy is to stop with a task failure, set the job status to Failure
                FailurePolicy::Stop => job.status = TaskStatus::Failure,
            }
        }
    }

    job.save(zk, namespace)?;
    // TODO we might want to submit a task to re-evaluate this job?
    Ok(())
}

/// This function can be recursively called until a InnerJobType::Single is reached
#[allow(clippy::mut_range_bound)]
#[allow(clippy::boxed_local)]
fn eval_inner_job(
    zk: Arc<ZooKeeper>,
    namespace: String,
    job_id: String,
    inner: Box<InnerJobType>,
    queue: Queues,
    job_tasks: Vec<JobType>
) -> anyhow::Result<(Box<InnerJobType>, Op)> {
    match *inner {
        InnerJobType::Single(task_def, state, failures) => match state {
            EvalState::None(_) => Ok((
                Box::new(InnerJobType::Single(
                    task_def.clone(),
                    EvalState::Some(crate::engine::task::run_task(zk, namespace, &task_def, job_id, queue)?),
                    failures,
                )),
                Op::Spawn,
            )),
            EvalState::Some(inner) => {
                match crate::engine::task::get_task(zk, namespace, &inner)?.status {
                    TaskStatus::Pending(_, _) | TaskStatus::Scheduled | TaskStatus::Run => Ok((
                        Box::new(InnerJobType::Single(
                            task_def,
                            EvalState::Some(inner),
                            failures,
                        )),
                        Op::DoNothing,
                    )),
                    TaskStatus::Success => Ok(inject_jobs(InnerJobType::Single(
                            task_def,
                            EvalState::Done(inner),
                            failures,
                        ), job_tasks)
                    ),
                    TaskStatus::Failure => Ok((
                        Box::new(InnerJobType::Single(
                            task_def,
                            EvalState::Done(inner.clone()),
                            append_n_return(failures, inner),
                        )),
                        Op::Failure,
                    )),
                    TaskStatus::Lost(_) => Ok((
                        // reset the evaluation state to None, so it is re-evaluated
                        Box::new(InnerJobType::Single(
                            task_def,
                            EvalState::None("unset".to_string()),
                            append_n_return(failures, inner),
                        )),
                        Op::Failure,
                    )),
                }
            }
            EvalState::Done(inner) => {
                match crate::engine::task::get_task(zk, namespace, &inner)?.status {
                    // at this point, the task should not be in another state other than completed or failed
                    TaskStatus::Success => Ok((Box::from(InnerJobType::Single(
                        task_def,
                        EvalState::Done(inner),
                        failures,
                    )), Op::Success)),
                    TaskStatus::Failure => Ok((Box::new(InnerJobType::Single(
                        task_def,
                        EvalState::Done(inner),
                        failures,
                    )), Op::Failure)),
                    _ => panic!("job is in an invalid state"),
                }
            }
        },
        InnerJobType::Linear(tasks) => {
            log::trace!("[eval_inner_job] InnerJobType::Linear({:?})", &tasks);

            let mut new_tasks = vec![];

            let mut i = 0;
            let mut status = Op::DoNothing;
            let mut successes = 0;

            // start looping over tasks, evaluating each one in order
            // if the operation returned is not Done, then break
            for _ in 0..tasks.len() {
                let eval = eval_inner_job(
                    zk.clone(),
                    namespace.clone(),
                    job_id.clone(),
                    tasks[i].clone(),
                    queue.clone(),
                    job_tasks.clone()
                )?;

                i += 1;
                new_tasks.push(eval.0);

                match &eval.1 {
                    // If DoNothing, then the task is still running and there is nothing to do here
                    Op::DoNothing => break,
                    Op::Spawn => {
                        status = Op::Spawn;
                        break;
                    }
                    // if Failure, break but note the failure
                    Op::Failure => {
                        status = Op::Failure;
                        break;
                    }
                    // do nothing and continue onto the next one
                    Op::Success => successes += 1,
                }
            }

            // now, append everything else onto the new tasks vec
            for _ in i..tasks.len() {
                new_tasks.push(tasks[i].clone());

                i += 1;
            }

            // if the number of successes we have is equal to the number of tasks, all the tasks
            // completed successfully, and return as such
            if tasks.len() == successes {
                status = Op::Success;
            }

            Ok((Box::new(InnerJobType::Linear(new_tasks)), status))
        }
        InnerJobType::Branch(root, left, right) => {
            log::trace!("[eval_inner_job] InnerJobType::Branch({:?}, {:?}, {:?})", &root, &left, &right);
            let root_job = eval_inner_job(
                zk.clone(),
                namespace.clone(),
                job_id.clone(),
                root,
                queue.clone(),
                job_tasks.clone()
            )?;
            match root_job.1 {
                // nothing to do here, because the job is either stated or running
                Op::DoNothing | Op::Spawn => Ok((
                    Box::new(InnerJobType::Branch(root_job.0, left, right)),
                    root_job.1,
                )),
                // the root element was successful, we move onto the left element
                Op::Success => {
                    if left.is_none() {
                        // there is no job here, so just return the parent job
                        return Ok((
                            Box::new(InnerJobType::Branch(root_job.0, left, right)),
                            root_job.1,
                        ));
                    }
                    let left_job = eval_inner_job(zk, namespace, job_id, left.unwrap(), queue, job_tasks)?;
                    Ok((
                        Box::new(InnerJobType::Branch(root_job.0, Some(left_job.0), right)),
                        left_job.1,
                    ))
                }
                // the root element was not successful, we move on to the right element
                Op::Failure => {
                    if right.is_none() {
                        // there is no job here, so just return the parent job
                        return Ok((
                            Box::new(InnerJobType::Branch(root_job.0, left, right)),
                            root_job.1,
                        ));
                    }
                    let right_job = eval_inner_job(zk, namespace, job_id, right.unwrap(), queue, job_tasks)?;
                    Ok((
                        Box::new(InnerJobType::Branch(root_job.0, left, Some(right_job.0))),
                        right_job.1,
                    ))
                }
            }
        }
    }
}

/// Method to inject injected tasks into a Job's execution tree.
///
/// Called when InnerJobType::Single is in a completed state. If there are jobs to be injected,
/// converts the job to InnerJobType::Linear with the freshly completed task as the first element,
/// and then all injected tasks after it.
///
/// The new job tree is returned, and the new task(s) should run during the next evaluation loop.
fn inject_jobs(current_job: InnerJobType, new_jobs: Vec<JobType>) -> (Box<InnerJobType>, Op) {
    if let InnerJobType::Single(_, _, _) = current_job { } else { panic!("idiot"); }
    if new_jobs.is_empty() {
        return (Box::new(current_job), Op::Success);
    }

    let mut contents = vec![ Box::new(current_job) ];
    for job in new_jobs {
        contents.push(job.load());
    }

    (Box::new(InnerJobType::Linear(contents)), Op::Spawn)
}

/// Iterate over all known job
pub(crate) fn job_walk(zk: Arc<ZooKeeper>, namespace: String, queue: zkmq::ZkMQ) {
    let last_run = format!("/jotty/{}/run/job_walk_last", &namespace);
    let job_path = format!("/jotty/{}/job", &namespace);
    loop {
        thread::sleep(Duration::from_secs(5)); // TODO make this dynamic
        if zk.exists(last_run.as_str(), false).unwrap().is_none() {
            zk.create(
                last_run.as_str(),
                serde_json::to_vec(&Utc::now()).unwrap(),
                zookeeper::Acl::open_unsafe().clone(),
                zookeeper::CreateMode::Persistent,
            )
            .unwrap();
        } else {
            let run_time: DateTime<Utc> =
                serde_json::from_slice(zk.get_data(last_run.as_str(), false).unwrap().0.as_slice())
                    .unwrap();
            if run_time + chrono::Duration::seconds(10) > Utc::now() {
                // TODO make this dynamic as well
                log::trace!("not time to do another job walk");
                continue;
            }
            zk.set_data(
                last_run.as_str(),
                serde_json::to_vec(&Utc::now()).unwrap(),
                None,
            )
            .unwrap();
        }

        // TODO add latch to prevent this from running more than once at the same time at /jotty/{}/run/task_walk/latch
        let children = zk.get_children(job_path.as_str(), false).unwrap();
        for task in &children {
            if let Err(error) = queue.produce(task.as_bytes()) {
                log::error!("unable to re-enqueue job {} due to error: {:?}. task is lost!", &task, error);
            } else {
                log::trace!("enqueued job {} for re-evaluation successfully", &task);
            }
        }

        log::trace!("evaluated {} jobs", children.len());
    }
}

// why is this not a default feature in rust?
fn append_n_return<T: Sized + Clone>(parent: Vec<T>, element: T) -> Vec<T> {
    let mut new = vec![element];
    #[allow(clippy::redundant_clone)]
    new.append(&mut parent.clone());
    new
}

#[cfg(test)]
mod tests {
    // use crate::engine::job::{eval_inner_job, Op};
    // use std::time::Duration;
    // use std::env;
    // use std::sync::Arc;
    // use crate::{TaskDef, Queues};

    // #[test]
    // fn test_eval_job_single() {
    //     struct NoopWatcher;
    //
    //     impl zookeeper::Watcher for NoopWatcher {
    //         fn handle(&self, _ev: zookeeper::WatchedEvent) {}
    //     }
    //
    //     fn zk_server_urls() -> String {
    //         let key = "ZOOKEEPER_SERVERS";
    //         match env::var(key) {
    //             Ok(val) => val,
    //             Err(_) => "localhost:2181".to_string(),
    //         }
    //     }
    //     let zk_urls = zk_server_urls();
    //
    //     let zk = zookeeper::ZooKeeper::connect(&*zk_urls, Duration::from_millis(2500), NoopWatcher).unwrap();
    //     let zka = Arc::new(zk);
    //
    //     let task = TaskDef {
    //         method: "numm.method".to_string(),
    //         arguments: Default::default(),
    //         resources: Default::default(),
    //         policy: None
    //     };
    //
    //     let r = eval_inner_job(zka, "testing".to_string(), Box::new(InnerJobType::Single(task.clone(), EvalState::None)), Queues{ work: (), task: () });
    //
    //     assert_eq!(r.is_ok(), true);
    //     let inner = r.unwrap();
    //     assert_eq!(
    //         inner.1,
    //         Op::DoNothing
    //     );
    //     match *inner.0 {
    //         InnerJobType::Single(innertask, state) => {
    //             assert_eq!(task, innertask);
    //             match state {
    //                 EvalState::Some(_) => assert_eq!(true, true),
    //                 _ => panic!("invalid job state")
    //             }
    //         },
    //         _ => panic!("invalid job type")
    //     }
    // }
    //
    // #[test]
    // fn test_eval_job_branch() {
    //     struct NoopWatcher;
    //
    //     impl zookeeper::Watcher for NoopWatcher {
    //         fn handle(&self, _ev: zookeeper::WatchedEvent) {}
    //     }
    //
    //     fn zk_server_urls() -> String {
    //         let key = "ZOOKEEPER_SERVERS";
    //         match env::var(key) {
    //             Ok(val) => val,
    //             Err(_) => "localhost:2181".to_string(),
    //         }
    //     }
    //     let zk_urls = zk_server_urls();
    //
    //     let zk = zookeeper::ZooKeeper::connect(&*zk_urls, Duration::from_millis(2500), NoopWatcher).unwrap();
    //     let zka = Arc::new(zk);
    //
    //     let task = TaskDef {
    //         method: "numm.method".to_string(),
    //         arguments: Default::default(),
    //         resources: Default::default(),
    //         policy: None
    //     };
    //     let task_left = TaskDef {
    //         method: "left.method".to_string(),
    //         arguments: Default::default(),
    //         resources: Default::default(),
    //         policy: None
    //     };
    //
    //     let job = JobType::Branch(
    //         Box::new(JobType::Single(task.clone())),
    //         Some(Box::new(JobType::Single(task_left.clone()))),
    //         None
    //     );
    //
    //     let r = eval_inner_job(zka, "testing".to_string(), job.load());
    //
    //     assert_eq!(r.is_ok(), true);
    //     let inner = r.unwrap();
    //     assert_eq!(
    //         inner.1,
    //         Op::DoNothing
    //     );
    //     match *inner.0 {
    //         InnerJobType::Branch(orig, left, right) => {
    //             // make sure right is empty
    //             assert_eq!(right, None);
    //
    //             // make sure orig value is correct
    //             match *orig {
    //                 InnerJobType::Single(itask, state) => {
    //                     assert_eq!(itask, task);
    //
    //                     assert_eq!(match state {
    //                         EvalState::Some(_) => true,
    //                         _ => false
    //                     }, true);
    //                 }
    //                 _ => panic!("unexpected start job in branch job")
    //             }
    //
    //             // make sure the left value is right
    //             assert_eq!(left.is_some(), true);
    //             match *left.unwrap() {
    //                 InnerJobType::Single(itask, state) => {
    //                     assert_eq!(itask, task_left);
    //
    //                     assert_eq!(match state {
    //                         EvalState::None => true,
    //                         _ => false
    //                     }, true);
    //                 }
    //                 _ => panic!("unexpected left job in branch job")
    //             }
    //         },
    //         _ => panic!("invalid job type")
    //     }
    // }
    //
}