jotty 0.3.1

JoTTy is an embeddable distributed processing framework for both short and long running batch jobs
Documentation
use std::collections::HashMap;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;

use crate::executor::{ExecutorState, JoTTyExecutor};
use crate::manager::Manager;
use crate::proto::JoTTyResult;
use crate::rcm::{ResourceChunk, ResourceManager};
use anyhow::Context;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::time::Duration;
use uuid::Uuid;
use zookeeper::{ZkResult, ZooKeeper, ZooKeeperExt};

pub use anyhow::{bail, Result};
pub use zkstate::ZkState;

mod engine;
pub mod executor;
pub mod manager;
pub mod proto;
pub mod rcm;

const JOB_DEFAULT_POLICY: (SchedulerType, SchedulerPolicy, FailurePolicy) = (
    SchedulerType::Finite(1),
    SchedulerPolicy::Immediate,
    FailurePolicy::Stop,
);

const TASK_DEFAULT_POLICY: (SchedulerType, SchedulerPolicy, FailurePolicy) = (
    SchedulerType::Finite(1),
    SchedulerPolicy::Immediate,
    FailurePolicy::Stop,
);

pub enum JoTTyError {
    ProtobufDecodeError(),
}

#[derive(Debug)]
pub enum TaskError {
    NoSuchTask,
    WaitTimeout,
}

/// Queue Object
#[derive(Clone)]
struct Queues {
    job: zkmq::ZkMQ,
    task: zkmq::ZkMQ,
}

/// Builder for a jotty object
#[allow(dead_code)]
pub struct Builder {
    id: String,
    namespace: String,
    executor_enabled: bool,
    executor_channels: usize,
    executor_resources: ResourceManager,
    manager_enabled: bool,
}
impl Builder {
    /// Creates a jotty instance under the given namespace
    pub fn new(namespace: String) -> Self {
        Builder {
            id: Uuid::new_v4().to_string(),
            namespace,
            executor_enabled: false,
            executor_channels: 32,
            executor_resources: Default::default(),
            manager_enabled: false,
        }
    }
    /// Overrides the generated ID
    pub fn set_id(mut self, id: String) -> Self {
        self.id = id;
        self
    }

    /// Enable or Disable the executor functionality for the jotty object
    pub fn executor(mut self, enabled: bool) -> Self {
        self.executor_enabled = enabled;
        self
    }

    /// Enable or Disable the manager functionality for the jotty object
    pub fn manager(mut self, enabled: bool) -> Self {
        self.manager_enabled = enabled;
        self
    }

    /// Set number of executor channels
    pub fn channels(mut self, number: usize) -> Self {
        self.executor_channels = number;
        self
    }

    /// Set the Resource Manager Object
    pub fn set_rcm(mut self, rcm: rcm::ResourceManager) -> Self {
        self.executor_resources = rcm;
        self
    }

    /// Set the Resource Manager Object
    pub fn set_resource(
        mut self,
        rname: &str,
        amount: usize,
        block_size: usize,
        overcommit: f32,
    ) -> Self {
        let _ = self
            .executor_resources
            .declare(rname, amount, block_size, overcommit);
        self
    }

    /// Builds a jotty Object with the given configuration
    pub fn build(self, zk: Arc<ZooKeeper>) -> anyhow::Result<JoTTy> {
        // znode to place the zkmq instances
        let queue_path = format!("/jotty/{}/queue", &self.namespace);
        zk.ensure_path(queue_path.as_str())?;

        // znode where job info will be to stored
        let mut job_path = format!("/jotty/{}/job", self.namespace);
        zk.ensure_path(job_path.as_str())?;

        // znode to place task information in
        let task_path = format!("/jotty/{}/task", &self.namespace);
        zk.ensure_path(task_path.as_str())?;

        // Locks path
        let run_path = format!("/jotty/{}/run", &self.namespace);
        zk.ensure_path(run_path.as_str())?;

        // Executor Path
        let exec_path = format!("/jotty/{}/executor", self.namespace);
        zk.ensure_path(exec_path.as_str())?;

        // zkstate for each job
        let state_path = format!("/jotty/{}/job_state", self.namespace);
        zk.ensure_path(state_path.as_str())?;

        job_path.push('/');
        job_path.push_str(self.id.as_str());

        let queues = Queues {
            job: zkmq::ZkMQBuilder::new(zk.clone())
                .consumer(true)
                .producer(true)
                .set_base(format!("{}/job", &queue_path))
                .set_id(format!("{}-job", &self.id))
                .build()
                .context("building job queue")?,
            task: zkmq::ZkMQBuilder::new(zk.clone())
                .consumer(true)
                .producer(true)
                .set_base(format!("{}/task", &queue_path))
                .set_id(format!("{}-task", &self.id))
                .build()
                .context("building task queue")?,
        };

        let client = JoTTyClient {
            zk: zk.clone(),
            namespace: self.namespace.clone(),
            queue: queues.clone(),
        };

        Ok(JoTTy {
            _id: self.id.clone(),
            manager: Manager::new(
                zk.clone(),
                self.namespace.clone(),
                queues,
                self.id.clone(),
                self.manager_enabled,
            ),
            executor: JoTTyExecutor::new(
                zk,
                self.namespace,
                self.executor_resources,
                self.executor_channels,
                Some(self.id),
            )
            .attach_client(client.clone()),
            client,
        })
    }
}

pub struct JoTTy {
    _id: String,
    pub executor: JoTTyExecutor,
    pub client: JoTTyClient,
    pub manager: Manager,
}

/// Independent jotty Client
#[derive(Clone)]
pub struct JoTTyClient {
    zk: Arc<ZooKeeper>,
    namespace: String,
    queue: Queues,
}
impl JoTTyClient {
    /// Create a Job with a single Task
    pub fn add_task(
        &self,
        method: &str,
        arguments: serde_json::Value,
        resources: HashMap<String, usize>,
    ) -> anyhow::Result<String> {
        let start = chrono::Utc::now();

        let id = uuid::Uuid::new_v4().to_string();

        // create a Single Job with a TaskDef with the given inputs
        let task = JobType::Single(TaskDef {
            method: method.to_string(),
            arguments,
            resources,
            // for this, assume this will be a one off that should run ASAP
            policy: Some(TASK_DEFAULT_POLICY.clone()),
        });

        log::debug!("creating {:?} under job {}", &task, &id);

        let job = Job {
            id: id.clone(),
            // Spawn after the current time, this will prevent time travel...
            status: TaskStatus::Pending(chrono::Utc::now(), 0),
            timings: Default::default(),
            shared_data: None,
            inner: task.load(),
            logs: TaskLogger::new(),
            scheduler: JOB_DEFAULT_POLICY.clone(),
            injected_tasks: vec![]
        };

        job.save(self.zk.clone(), self.namespace.clone())
            .context("unable to save job definition")?;

        self.queue
            .job
            .produce(job.id.into_bytes())
            .context("enqueuing job for evaluation")?;

        log::info!(
            "took {}ms to create task {}",
            (chrono::Utc::now() - start).num_milliseconds(),
            &id
        );

        Ok(id)
    }

    pub fn add_job(&self, job: JobDef) -> anyhow::Result<String> {
        let start = chrono::Utc::now();

        let scheduler = job.policy.unwrap_or_else(|| JOB_DEFAULT_POLICY.clone());
        let status = match scheduler.1 {
            SchedulerPolicy::Immediate => TaskStatus::Pending(chrono::Utc::now(), 0),
            SchedulerPolicy::Scheduled(time) => TaskStatus::Pending(time, 0),
        };

        let id = job.name.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
        let job = Job {
            id,
            status,
            timings: Default::default(),
            shared_data: None,
            inner: job.tree.load(),
            logs: Default::default(),
            scheduler,
            injected_tasks: vec![]
        };

        let id = job.id.clone();

        job.save(self.zk.clone(), self.namespace.clone())
            .context("unable to save job definition")?;
        self.queue
            .job
            .produce(job.id.into_bytes())
            .context("enqueuing job for evaluation")?;

        log::info!(
            "took {}ms to create job {}",
            (chrono::Utc::now() - start).num_milliseconds(),
            &id
        );
        Ok(id)
    }

    /// Return a HashMap of resources
    pub fn get_resources(&self) -> anyhow::Result<HashMap<String, HashMap<String, usize>>> {
        // as this is a read once type of operation, we'll skip initialization of the zkstruct
        let zk = self.zk.clone();
        let mut results = HashMap::new();
        for child in
            zk.get_children(&*format!("/jotty/{}/executors", &self.namespace), false)?
        {
            let raw_payload = zk
                .get_data(
                    &*format!(
                        "/jotty/{}/executors/{}/payload",
                        &self.namespace, &child
                    ),
                    false,
                )
                .context(format!("getting state for executor {}", &child))?
                .0;
            let payload: ExecutorState = serde_json::from_slice(&*raw_payload)
                .context("unmarshalling payload into Struct")?;
            results.insert(child, payload.resources.dump()?);
        }

        Ok(results)
    }

    pub fn wait_for_job(
        &self,
        id: String,
        duration: Option<chrono::Duration>,
    ) -> Result<HandlerResult, TaskError> {
        let path = format!("/jotty/{}/job/{}", self.namespace, id);
        log::debug!("loading {}", &path);

        let start_time = chrono::Utc::now();
        let end_time = start_time + duration.unwrap_or_else(|| chrono::Duration::seconds(10));

        // infinite loop where we keep reading the state. using a watcher would also work, but they're
        // not the most reliable and cannot be relied on. This would require us to both wait for a signal and poll.
        // so eh, we'll poll for now
        loop {
            if chrono::Utc::now() > end_time {
                return Err(TaskError::WaitTimeout);
            }

            let req = self.zk.get_data(path.as_str(), false);
            if req.is_err() {
                log::warn!("{} does not exist or cannot be read", &path);
                return Err(TaskError::NoSuchTask);
            }

            let data: Task = serde_json::from_slice(req.unwrap().0.as_slice()).unwrap();

            match &data.status {
                TaskStatus::Failure | TaskStatus::Success => {
                    log::debug!(
                        "job {} completed {:?}. took {}ms",
                        &id,
                        &data.status,
                        (chrono::Utc::now() - start_time).num_milliseconds()
                    );
                    return Ok(HandlerResult::success());
                }
                _ => {
                    log::trace!(
                        "job {} is not in a completed state ({:?}). sleeping",
                        &id,
                        &data.status
                    );
                    std::thread::sleep(Duration::from_secs(1));
                }
            }
        }
    }

    pub fn load_response<T: prost::Message>(&self, id: String) -> JoTTyResult<T> {
        JoTTyResult {
            id,
            inner: None,
            status: TaskStatus::Scheduled,
        }
    }
}

/// Scheduler Type.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum SchedulerType {
    /// Job should be run a limited number of times
    Finite(u32),
    /// Job should run forever
    Infinite,
}

/// Scheduler Policy for Tasks & Jobs
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum SchedulerPolicy {
    /// Job should be run as soon as possible
    Immediate,
    /// Job should be run only after this point in time
    Scheduled(DateTime<Utc>),
}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum FailurePolicy {
    /// Retry for a limited number of times.
    /// First value is total retries, second is current number of retries
    FiniteRetry(u32, u32),
    Stop,
}

#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum RetryReason {
    Lost,
    MethodNotFound(String),
    SerdeError(String),
    DimensionsExhausted(String),
}

#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum HandlerResult {
    /// The task returned successfully
    Success(serde_json::Value),
    /// The task returned with an error
    Failure(serde_json::Value),
    /// Reschedule the Task according to the reschedule policy. If all retries are exhausted, set as failure.
    /// This is used for pre-execution errors. A retry error is caused by executors being unable to
    /// spawn the task for one reason or another.
    Retry(RetryReason),
}
impl HandlerResult {
    pub fn success() -> Self {
        Self::Success(serde_json::Value::Null)
    }
    pub fn success_reason(reason: serde_json::Value) -> Self {
        Self::Success(reason)
    }
    pub fn failure() -> Self {
        Self::Failure(serde_json::Value::Null)
    }
    pub fn failure_reason(reason: serde_json::Value) -> Self {
        Self::Failure(reason)
    }
    pub fn retry(reason: RetryReason) -> Self {
        Self::Retry(reason)
    }
}

#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum TaskStatus {
    /// Task needs to be evaluated (Evaluate After / Current Evaluation)
    Pending(DateTime<Utc>, u16),

    /// Task is scheduled on executor
    Scheduled,

    /// Task has started on the executor
    Run,

    /// Task has exitted successfully
    Success,

    /// Task has failed
    Failure,

    Lost(RetryReason),
}

/// Task Definition, contains all the information required to run the task
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct TaskDef {
    /// Task Method to execute
    pub method: String,
    /// Arguments passed to the given method
    pub arguments: serde_json::Value,
    /// Resources to be allocated to this task
    pub resources: HashMap<String, usize>,
    /// Scheduler Policy
    pub policy: Option<(SchedulerType, SchedulerPolicy, FailurePolicy)>, // TODO we might want to add a way where a job can have tasks that are scheduled in the future
}
impl TaskDef {
    pub fn new<T: prost::Message + serde::Serialize>(
        method: String,
        arguments: T,
    ) -> Result<TaskDef, serde_json::Error> {
        Ok(TaskDef {
            method,
            arguments: serde_json::to_value(arguments)?,
            resources: Default::default(),
            policy: None,
        })
    }
}

/// Represent a singular Task
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Task {
    /// The status of the task
    pub status: TaskStatus,
    /// number of executions left
    pub executions: u16,

    ///
    pub id: String,

    /// Parent Job ID
    pub parent_job: String,

    pub method: String,
    pub args: serde_json::Value,
    pub resources: HashMap<String, usize>,
    pub results: serde_json::Value,

    /// Which Executor has this task run on
    pub executor: String,
    pub logs: TaskLogger,
    pub scheduler: (SchedulerType, SchedulerPolicy, FailurePolicy),
}
impl Task {
    /// Load a task object from Zookeeper
    pub fn load(zk: Arc<ZooKeeper>, namespace: String, id: String) -> ZkResult<Task> {
        let path = format!("/jotty/{}/task/{}", &namespace, &id);
        log::trace!("loading task {} from {}", &id, &path);
        Ok(serde_json::from_slice(zk.get_data(path.as_str(), false)?.0.as_slice()).unwrap())
    }
    /// Save the current task object to Zookeeper
    /// TODO Save the epoch of the zookeeper object to make sure the task has not changed under us,
    ///     or implement a lock to make sure only one executor can mutate a task at a time
    pub fn save(&self, zk: Arc<ZooKeeper>, namespace: String) -> ZkResult<()> {
        let path = format!("/jotty/{}/task/{}", namespace, self.id);
        if zk.exists(path.as_str(), false)?.is_none() {
            zk.create(
                path.as_str(),
                serde_json::to_vec(self).unwrap(),
                zookeeper::Acl::open_unsafe().clone(),
                zookeeper::CreateMode::Persistent,
            )?;
        } else {
            zk.set_data(path.as_str(), serde_json::to_vec(self).unwrap(), None)?;
        }
        Ok(())
    }
    pub fn set_status(&mut self, status: TaskStatus) {
        let m = format!("info - state change. {:?} -> {:?}", &self.status, &status);
        log::debug!("{}", &m);
        self.logs.info(m);
        self.status = status;
    }
    pub fn set_executor(&mut self, id: &str) {
        let m = format!("info - executor set to {}", &id);
        log::debug!("{}", &m);
        self.logs.info(m);
        self.executor = id.to_string();
    }
}
impl From<Vec<u8>> for Task {
    fn from(inner: Vec<u8>) -> Task {
        serde_json::from_slice(inner.as_slice()).unwrap()
    }
}
// impl<T: prost::Message> From<ProtoTaskStub<T>> for Task {
//     fn from(inner: ProtoTaskStub<T>) -> Task {
//         Task {
//             status: TaskStatus::,
//             executions: 0,
//             id: "".to_string(),
//             parent_job: "".to_string(),
//             method: "".to_string(),
//             args: Default::default(),
//             resources: Default::default(),
//             results: Default::default(),
//             executor: "".to_string(),
//             logs: Default::default(),
//             scheduler: (SchedulerType::Infinite, SchedulerPolicy::Immediate)
//         }
//     }
// }

/// A Job
///
/// This object governs how individual tasks are executed
///
#[derive(Serialize, Deserialize, Clone, Debug)]
pub(crate) struct Job {
    /// Job ID
    pub id: String,

    /// Evaluation status of the Job
    pub status: TaskStatus,

    /// HashMap of each task ID associated with job, and execution time
    /// TODO Refactor this to better reflect the job tree?
    pub timings: HashMap<String, usize>,

    /// HashMap that is shared between all tasks spawned under the job, tied together using
    /// [`zkstate`]. All tasks are given a reference to this object, and can add or remove data as
    /// the job progresses.
    ///
    /// Keys are arbitrary strings, and the values are [`serde_json::Value`].
    ///
    /// TODO investigate possibility of replacing `Value` with something else that allows for more flexibility
    #[serde(skip)]
    pub shared_data: Option<ZkState<HashMap<String, serde_json::Value>>>,

    /// Task State Machine
    pub inner: Box<InnerJobType>,

    /// List of tasks to be injected into the InnerJobType tree on next evalulation
    pub injected_tasks: Vec<JobType>,

    /// Generated Log Messages
    pub logs: TaskLogger,

    /// Scheduler Configuration for the job as a whole
    pub scheduler: (SchedulerType, SchedulerPolicy, FailurePolicy),
}
impl Job {
    /// Load the given job from ZooKeeper
    pub fn load(zk: Arc<ZooKeeper>, namespace: String, id: String) -> anyhow::Result<Self> {
        log::trace!("loading /jotty/{}/job/{}", &namespace, &id);
        let inner = zk
            .get_data(&*format!("/jotty/{}/job/{}", &namespace, &id), false)
            .context(format!("loading job state for job {}", &id))?;

        let mut job: Job = serde_json::from_slice(&*inner.0).context("parsing saved job")?;
        job.shared_data = Some(
            ZkState::new(
                zk,
                format!("/jotty/{}/job_state/{}", &namespace, &id),
                HashMap::new(),
            )
            .context("creating zkstate object for job shared_state")?,
        );
        Ok(job)
    }
    /// Writes the current object into zookeeper
    pub fn save(&self, zk: Arc<ZooKeeper>, namespace: String) -> anyhow::Result<()> {
        let path = format!("/jotty/{}/job/{}", &namespace, &self.id);
        log::trace!("saving {}", &path);
        if zk.exists(path.as_str(), false)?.is_some() {
            log::trace!("{} exists, updating contents", &path);
            zk.set_data(
                &*format!("/jotty/{}/job/{}", &namespace, &self.id),
                serde_json::to_vec(self)?,
                None,
            )
            .context(format!("unable to set_data on {}", &path))?;
        } else {
            zk.create(
                &*format!("/jotty/{}/job/{}", &namespace, &self.id),
                serde_json::to_vec(self)?,
                zookeeper::Acl::open_unsafe().clone(),
                zookeeper::CreateMode::Persistent,
            )
            .context(format!("unable to create {}", &path))?;
        }
        Ok(())
    }
}

#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)]
pub enum JobType {
    /// execute a single task, and done
    Single(TaskDef),
    /// execute each task
    Linear(Vec<JobType>),
    /// Executes the first Task. On success, the 2nd Task will be executed. On failure, the 3rd will be.
    ///
    /// `JobType::Branch(Base, Success, Failure)`
    Branch(Box<JobType>, Option<Box<JobType>>, Option<Box<JobType>>),
    // /// Run all the given tasks in parallel
    // Parallel(Vec<JobTYpe>)
}
impl JobType {
    pub fn load(self) -> Box<InnerJobType> {
        Box::new(match self {
            JobType::Single(inner) => {
                InnerJobType::Single(inner, EvalState::None("unset_2".to_string()), vec![])
            }
            JobType::Linear(inner) => {
                let mut r = vec![];
                for bx in inner {
                    r.push(bx.load())
                }
                InnerJobType::Linear(r)
            }
            JobType::Branch(a, b, c) => InnerJobType::Branch(
                a.load(),
                b.map(|r| Box::new(*r.load())),
                c.map(|r| Box::new(*r.load())),
            ),
        })
    }
}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct JobDef {
    pub name: Option<String>,
    pub tree: JobType,
    pub policy: Option<(SchedulerType, SchedulerPolicy, FailurePolicy)>,
    pub constraints: Option<HashMap<String, usize>>,
}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum EvalState {
    /// No evaluation yet
    None(String),
    /// evaluation is done, task has been spawned
    Some(String),
    /// evaluation is done, task has been concluded and decision has been made
    Done(String),
}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
#[allow(clippy::vec_box)]
pub enum InnerJobType {
    Single(TaskDef, EvalState, Vec<String>),
    Linear(Vec<Box<InnerJobType>>),
    Branch(
        Box<InnerJobType>,
        Option<Box<InnerJobType>>,
        Option<Box<InnerJobType>>,
    ),
}

/// Struct passed into the thread that will run the task's callback
struct ContextWrapper {
    task: Task,
    ctx: TaskContext,
    exit: HandlerResult,
    executor: String,
    zk: Arc<ZooKeeper>,
    namespace: String,
}

impl ContextWrapper {
    pub fn new(
        zk: Arc<ZooKeeper>,
        namespace: String,
        ctx: TaskContext,
        executor: String,
        task: Task,
    ) -> Self {
        Self {
            zk,
            namespace,
            ctx,
            exit: HandlerResult::success(),
            task,
            executor,
        }
    }
}
impl Drop for ContextWrapper {
    fn drop(&mut self) {
        self.task.status = match &self.exit {
            HandlerResult::Success(reason) => {
                self.task.results = reason.clone();
                TaskStatus::Success
            }
            HandlerResult::Failure(reason) => {
                self.task.results = reason.clone();
                TaskStatus::Failure
            }
            HandlerResult::Retry(reason) => TaskStatus::Lost(reason.clone()),
        };
        let _ = self.task.save(self.zk.clone(), self.namespace.clone());
    }
}

/// Struct passed to a method handler when it is invoked
#[derive(Clone, Debug)]
pub struct TaskContext {
    pub id: String,
    pub method: String,
    pub inputs: serde_json::Value,
    /// Boolean indicting if the handler should continue running.
    /// This will always be true when the method is invoked, but could change to false if the task
    /// is told to shutdown
    pub running: Arc<AtomicBool>,
    /// Vec of ResourceChunks assigned to this Task
    pub resources: HashMap<String, ResourceChunk>,

    /// Allows the step to inject additional steps after it's execution.
    ///
    /// If the current job is defined as `JobType::Single(MyFooBar::...)` and next_step is set to
    /// `Some(JobType::Single(EchoHandler::...))` the resulting job state will become:
    /// ```text
    ///   JobType::Linear(vec![
    ///      JobType::Single(MyFooBar::...),
    ///      JobType::Single(EchoHandler::...)
    ///   ])
    /// ```
    ///
    pub next_steps: Vec<JobType>,
}

#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct TaskLogger {
    messages: Vec<(chrono::DateTime<chrono::Utc>, String, String)>,
}
impl Default for TaskLogger {
    fn default() -> Self {
        Self::new()
    }
}
impl TaskLogger {
    pub fn new() -> Self {
        TaskLogger { messages: vec![] }
    }
    pub fn info(&mut self, message: String) {
        self.messages
            .push((chrono::Utc::now(), "info".to_string(), message))
    }
}