jotty 0.3.1

JoTTy is an embeddable distributed processing framework for both short and long running batch jobs
Documentation
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::Duration;

use zkstate::ZkState;
use zookeeper::{WatchedEvent, WatchedEventType, ZooKeeper};

use crate::engine::job::job_processing_loop;
use crate::executor::ExecutorState;
use crate::{Queues, Task, TaskStatus};
use std::sync::atomic::AtomicBool;

const SCHEDULE_RETRY_BACKOFF: i64 = 10000; // 10s in ms

type ExecutorMap = HashMap<String, ZkState<ExecutorState>>;
type ExecutorList = Arc<RwLock<ExecutorMap>>;

pub struct Manager {
    pub(crate) id: String,
    pub(crate) zk: Arc<ZooKeeper>,
    pub(crate) namespace: String,
    executors: ExecutorList,
    pub(crate) queues: Queues,
    pub(crate) enabled: AtomicBool,
}

impl Manager {
    pub(crate) fn new(
        zk: Arc<ZooKeeper>,
        namespace: String,
        queues: Queues,
        id: String,
        enabled: bool,
    ) -> Self {
        Manager {
            id,
            zk,
            namespace,
            executors: Arc::new(RwLock::new(HashMap::new())),
            queues,
            enabled: AtomicBool::new(enabled),
        }
    }

    pub fn toggle(mut self) -> Self {
        *self.enabled.get_mut() = !*self.enabled.get_mut();
        self
    }

    pub fn run(&mut self) {
        log::info!("starting manager {}", &self.id);

        if !*self.enabled.get_mut() {
            log::error!("Manager is set to not enabled. nothing to spawn");
            return;
        }

        // list all executors in the znode, adding a watcher to handle adding and removing
        handle_executor_znode_change(
            self.zk.clone(),
            format!("/jotty/{}/executors", &self.namespace),
            self.executors.clone(),
        );

        // processing loop
        let zk = self.zk.clone();
        let queue = self.queues.task.clone();
        let executors = self.executors.clone();
        let namespace = self.namespace.clone();
        thread::spawn(move || {
            task_processing_loop(zk.clone(), namespace.clone(), queue.clone(), executors)
        });

        // job evaluation loop
        let zk = self.zk.clone();
        let queue = self.queues.clone();
        let namespace = self.namespace.clone();
        thread::spawn(move || job_processing_loop(zk.clone(), namespace.clone(), queue));

        // job walk loop
        let zk = self.zk.clone();
        let queue = self.queues.clone();
        let namespace = self.namespace.clone();
        thread::spawn(move || {
            crate::engine::job::job_walk(zk.clone(), namespace.clone(), queue.job)
        });

        // task walk loop
        let zk = self.zk.clone();
        let queue = self.queues.clone();
        let namespace = self.namespace.clone();
        thread::spawn(move || {
            crate::engine::task::task_walk(zk.clone(), namespace.clone(), queue.task)
        });
    }
}

/// Handle incoming tasks
fn task_processing_loop(
    zk: Arc<ZooKeeper>,
    namespace: String,
    mut queue: zkmq::ZkMQ,
    executors: ExecutorList,
) {
    loop {
        let message = queue.consume(None);
        let message = message.unwrap();

        let task_id: String = String::from_utf8(message.body).unwrap();

        let mut task = Task::load(zk.clone(), namespace.clone(), task_id).unwrap();

        log::trace!(target: format!("jotty::manager::task_processing_loop::{}", &task.id).as_str(), "processing task {}", &task.id);

        match task.status {
            TaskStatus::Pending(schedule_after, attempt) => {
                // make sure the schedule_after time is in the past, if not- we can skip it.
                if schedule_after > chrono::Utc::now() {
                    log::trace!(
                        target: format!("jotty::manager::task_processing_loop::{}", &task.id).as_str(),
                        "skipping evaluation of task {}, set to be evaluated in {}s",
                        &task.id,
                        (schedule_after - chrono::Utc::now()).num_seconds()
                    );
                    let op = queue.produce(task.id.as_bytes());
                    if op.is_err() {
                        log::error!(
                            target: format!("jotty::manager::task_processing_loop::{}", &task.id).as_str(),
                            "unable to re-enqueue task {}. task is lost!", &task.id);
                    }
                    std::thread::sleep(Duration::from_secs(3));
                    continue;
                }
                log::trace!(
                    target: format!("jotty::manager::task_processing_loop::{}", &task.id).as_str(),
                    "starting scheduling of task {}", &task.id);

                // attempt to locate an eligible executor for this task
                let handle = executors.read().unwrap();

                let schedule_op = dense_pack(&task, handle.clone());
                log::debug!(
                    target: format!("jotty::manager::task_processing_loop::{}", &task.id).as_str(),
                    "packing returned {:?}", &schedule_op);

                if schedule_op.is_err() {
                    // the packing failed, so we need to re-queue at a point in time in the future
                    let err = schedule_op.err().unwrap();
                    log::warn!(
                        target: format!("jotty::manager::task_processing_loop::{}", &task.id).as_str(),
                        "unable to place task {}. placement failed with {:?}",
                        &task.id,
                        &err
                    );

                    task.status = TaskStatus::Pending(
                        schedule_after + chrono::Duration::milliseconds(SCHEDULE_RETRY_BACKOFF),
                        attempt + 1,
                    );

                    let _ =
                        update_and_qnueue_task(zk.clone(), namespace.clone(), task, queue.clone());
                    continue;
                }

                // now, we need to find the specified executor and update it so it contains the tas
                let _ = place_task(executors.clone(), schedule_op.unwrap(), task);
            }
            // TaskStatus::Failure => {
            //     match task.scheduler.2 {
            //         FailurePolicy::FiniteRetry(total, current) => {
            //             if total > current {
            //
            //             }
            //         }
            //         FailurePolicy::Stop => {}
            //     }
            //     // if we're rescheduling the task, update the state and re-insert the id into `queue`
            //     task.status = TaskStatus::Pending(Utc::now(), 0);
            //     let _ = update_and_qnueue_task(zk.clone(), namespace.clone(), task, queue.clone());
            // }
            TaskStatus::Run => {
                // read Task.executor and then check the executor's state to see if the Task is in there
                let handle = executors.read().unwrap();
                let executor = handle.get(&task.executor);

                // if the task points to an executor we don't know about, assume it is lost and mark it as such
                // we might get into a case where executors are de-registering and registering. Which, is not something
                // we will care about. We depend on Zookeeper here.
                if executor.is_none() {
                    log::warn!(
                        target: format!("jotty::manager::task_processing_loop::{}", &task.id).as_str(),
                        "task {} is registered to executor {}, which is not known to us",
                        &task.id,
                        &task.executor
                    );
                    task.set_status(TaskStatus::Failure);
                    if let Err(error) = task.save(zk.clone(), namespace.clone()) {
                        log::error!("unable to save task. {:?}", error);
                    }
                }
                drop(handle);
            }
            _ => {}
        }
    }
}

fn update_and_qnueue_task(
    zk: Arc<ZooKeeper>,
    namespace: String,
    task: Task,
    queue: zkmq::ZkMQ,
) -> Result<(), ()> {
    log::debug!("{:?}", &task);

    // write the updated task
    let write = task.save(zk, namespace);
    if write.is_err() {
        log::error!("unable to commit task update for task {}!", &task.id);
        return Err(());
    }
    log::debug!("updated task {} successfully", &task.id);
    let op = queue.produce(task.id.as_bytes());
    if op.is_err() {
        log::error!("unable to re-enqueue task {}. task is lost!", &task.id);
    }
    log::debug!("re-enqueued task {} for evaluation successfully", &task.id);

    Ok(())
}

fn place_task(executors: ExecutorList, target: String, task: Task) -> Result<(), ()> {
    let handle = executors.read().unwrap();
    let id = task.id.clone();

    let executor = handle.get(&target);
    if executor.is_none() {
        log::error!(
            "unable to place task {} with candidate {}- candidate not found",
            id,
            target
        );
        return Err(());
    }

    let _ = executor.unwrap().update(|inner| {
        inner.tasks.insert(id.clone(), task.id.clone());
    });
    log::debug!("task {} successfully placed onto {}", &id, &target);

    Ok(())
}

/// Register a watcher on `dir_path` (which should be the znode containing all registered executors
/// and manage their state
fn handle_executor_znode_change(zk: Arc<ZooKeeper>, dir_path: String, lake: ExecutorList) {
    // get all the children in the directory- attaching the watcher
    let inner_zk = zk.clone();
    let inner_path = dir_path.clone();
    let inner_lake = lake.clone();
    // get all the executors that are publishing state
    let registered_executors = zk
        .get_children_w(dir_path.as_str(), move |ev: WatchedEvent| {
            match ev.event_type {
                WatchedEventType::None => {}
                WatchedEventType::NodeCreated => {}
                WatchedEventType::NodeDeleted => {}
                WatchedEventType::NodeDataChanged => {}
                WatchedEventType::NodeChildrenChanged => handle_executor_znode_change(
                    inner_zk.clone(),
                    inner_path.clone(),
                    inner_lake.clone(),
                ),
                WatchedEventType::DataWatchRemoved => {}
                WatchedEventType::ChildWatchRemoved => {}
            }
        })
        .unwrap();

    log::trace!(
        "got {} executors to search through",
        registered_executors.len()
    );
    let mut handle = lake.write().unwrap();

    // compare the executors we know about locally, to the registered executors- looking for nodes that we know about, but aren't in zookeeper
    for known_executor in handle.clone().into_iter() {
        if !registered_executors.contains(&known_executor.0) {
            log::trace!(
                "removing managed state for executor {0} at path {1}/{0}",
                &known_executor.0,
                &dir_path
            );
            handle.remove(&known_executor.0);
        }
    }

    // look for new executors that we don't know about
    for registered_executor in registered_executors {
        if handle.contains_key(&registered_executor) {
            log::trace!("executor {} is already known to us", &registered_executor);
        } else {
            match zkstate::ZkState::expect(
                zk.clone(),
                format!("{}/{}", &dir_path, &registered_executor),
                Some(Duration::from_secs(1))
            ) {
                Ok(zks) => { handle.insert(registered_executor, zks); },
                Err(err) => log::warn!("executor {} is invalid or has a malformed state. {:?}", &registered_executor, err)
            }
        }
    }

    log::debug!("now tracking {} state(s)", handle.len());
    drop(handle);
}

#[derive(Debug)]
enum ScheduleError {
    NoEligibleNodes,
}

fn average(numbers: &[usize]) -> u32 {
    (numbers.iter().sum::<usize>() as f32 / numbers.len() as f32).round() as u32
}

/// Pack resources on as few nodes as possible
fn dense_pack(task: &Task, nodes: ExecutorMap) -> Result<String, ScheduleError> {
    // If we are given resources, we should calculate based on density of resources
    // if not, do based on number of free channels vs running tasks
    let mut candidates = vec![];

    log::trace!(target: format!("jotty::manager::dense_pack::{}", &task.id).as_str(), "picking executor using dense_pack");

    if !task.resources.is_empty() {
        log::debug!(target: format!("jotty::manager::dense_pack::{}", &task.id).as_str(), "[dense_pack] packing based on requested resources");

        for (id, node) in nodes {
            // calculate the percentage of resources we will consume
            let mut pcts = vec![];
            for resource in &task.resources {
                let total = node.read().unwrap().resources.get(resource.0.as_str());
                let used_pct = resource.1 / total.unwrap();
                pcts.push(used_pct);
            }
            // if the amount of pcts is not equal to the amount of resources we need, skip this node
            if pcts.len() != task.resources.len() {
                continue;
            }

            pcts.sort_by(|a, b| b.cmp(a));
            candidates.push((id, average(&pcts)))
        }
    } else {
        log::debug!(target: format!("jotty::manager::dense_pack::{}", &task.id).as_str(), "[dense_pack] packing based on number of used channels");

        for (id, node) in nodes {
            let n = node.read().unwrap();
            candidates.push((id, (n.channels - n.tasks.len()) as u32))
        }
    }

    if candidates.is_empty() {
        return Err(ScheduleError::NoEligibleNodes);
    }
    candidates.sort_by(|a, b| b.1.cmp(&a.1));

    log::trace!(target: format!("jotty::manager::dense_pack::{}", &task.id).as_str(), "candidates: {:#?}", &candidates);

    Ok(candidates[0].clone().0)
}

// #[cfg(test)]
// mod tests {
//
//     #[test]
//     fn test_dense_pack_one() {
//
//     }
// }