use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::Duration;
use zkstate::ZkState;
use zookeeper::{WatchedEvent, WatchedEventType, ZooKeeper};
use crate::engine::job::job_processing_loop;
use crate::executor::ExecutorState;
use crate::{Queues, Task, TaskStatus};
use std::sync::atomic::AtomicBool;
const SCHEDULE_RETRY_BACKOFF: i64 = 10000;
type ExecutorMap = HashMap<String, ZkState<ExecutorState>>;
type ExecutorList = Arc<RwLock<ExecutorMap>>;
pub struct Manager {
pub(crate) id: String,
pub(crate) zk: Arc<ZooKeeper>,
pub(crate) namespace: String,
executors: ExecutorList,
pub(crate) queues: Queues,
pub(crate) enabled: AtomicBool,
}
impl Manager {
pub(crate) fn new(
zk: Arc<ZooKeeper>,
namespace: String,
queues: Queues,
id: String,
enabled: bool,
) -> Self {
Manager {
id,
zk,
namespace,
executors: Arc::new(RwLock::new(HashMap::new())),
queues,
enabled: AtomicBool::new(enabled),
}
}
pub fn toggle(mut self) -> Self {
*self.enabled.get_mut() = !*self.enabled.get_mut();
self
}
pub fn run(&mut self) {
log::info!("starting manager {}", &self.id);
if !*self.enabled.get_mut() {
log::error!("Manager is set to not enabled. nothing to spawn");
return;
}
handle_executor_znode_change(
self.zk.clone(),
format!("/jotty/{}/executors", &self.namespace),
self.executors.clone(),
);
let zk = self.zk.clone();
let queue = self.queues.task.clone();
let executors = self.executors.clone();
let namespace = self.namespace.clone();
thread::spawn(move || {
task_processing_loop(zk.clone(), namespace.clone(), queue.clone(), executors)
});
let zk = self.zk.clone();
let queue = self.queues.clone();
let namespace = self.namespace.clone();
thread::spawn(move || job_processing_loop(zk.clone(), namespace.clone(), queue));
let zk = self.zk.clone();
let queue = self.queues.clone();
let namespace = self.namespace.clone();
thread::spawn(move || {
crate::engine::job::job_walk(zk.clone(), namespace.clone(), queue.job)
});
let zk = self.zk.clone();
let queue = self.queues.clone();
let namespace = self.namespace.clone();
thread::spawn(move || {
crate::engine::task::task_walk(zk.clone(), namespace.clone(), queue.task)
});
}
}
fn task_processing_loop(
zk: Arc<ZooKeeper>,
namespace: String,
mut queue: zkmq::ZkMQ,
executors: ExecutorList,
) {
loop {
let message = queue.consume(None);
let message = message.unwrap();
let task_id: String = String::from_utf8(message.body).unwrap();
let mut task = Task::load(zk.clone(), namespace.clone(), task_id).unwrap();
log::trace!(target: format!("jotty::manager::task_processing_loop::{}", &task.id).as_str(), "processing task {}", &task.id);
match task.status {
TaskStatus::Pending(schedule_after, attempt) => {
if schedule_after > chrono::Utc::now() {
log::trace!(
target: format!("jotty::manager::task_processing_loop::{}", &task.id).as_str(),
"skipping evaluation of task {}, set to be evaluated in {}s",
&task.id,
(schedule_after - chrono::Utc::now()).num_seconds()
);
let op = queue.produce(task.id.as_bytes());
if op.is_err() {
log::error!(
target: format!("jotty::manager::task_processing_loop::{}", &task.id).as_str(),
"unable to re-enqueue task {}. task is lost!", &task.id);
}
std::thread::sleep(Duration::from_secs(3));
continue;
}
log::trace!(
target: format!("jotty::manager::task_processing_loop::{}", &task.id).as_str(),
"starting scheduling of task {}", &task.id);
let handle = executors.read().unwrap();
let schedule_op = dense_pack(&task, handle.clone());
log::debug!(
target: format!("jotty::manager::task_processing_loop::{}", &task.id).as_str(),
"packing returned {:?}", &schedule_op);
if schedule_op.is_err() {
let err = schedule_op.err().unwrap();
log::warn!(
target: format!("jotty::manager::task_processing_loop::{}", &task.id).as_str(),
"unable to place task {}. placement failed with {:?}",
&task.id,
&err
);
task.status = TaskStatus::Pending(
schedule_after + chrono::Duration::milliseconds(SCHEDULE_RETRY_BACKOFF),
attempt + 1,
);
let _ =
update_and_qnueue_task(zk.clone(), namespace.clone(), task, queue.clone());
continue;
}
let _ = place_task(executors.clone(), schedule_op.unwrap(), task);
}
TaskStatus::Run => {
let handle = executors.read().unwrap();
let executor = handle.get(&task.executor);
if executor.is_none() {
log::warn!(
target: format!("jotty::manager::task_processing_loop::{}", &task.id).as_str(),
"task {} is registered to executor {}, which is not known to us",
&task.id,
&task.executor
);
task.set_status(TaskStatus::Failure);
if let Err(error) = task.save(zk.clone(), namespace.clone()) {
log::error!("unable to save task. {:?}", error);
}
}
drop(handle);
}
_ => {}
}
}
}
fn update_and_qnueue_task(
zk: Arc<ZooKeeper>,
namespace: String,
task: Task,
queue: zkmq::ZkMQ,
) -> Result<(), ()> {
log::debug!("{:?}", &task);
let write = task.save(zk, namespace);
if write.is_err() {
log::error!("unable to commit task update for task {}!", &task.id);
return Err(());
}
log::debug!("updated task {} successfully", &task.id);
let op = queue.produce(task.id.as_bytes());
if op.is_err() {
log::error!("unable to re-enqueue task {}. task is lost!", &task.id);
}
log::debug!("re-enqueued task {} for evaluation successfully", &task.id);
Ok(())
}
fn place_task(executors: ExecutorList, target: String, task: Task) -> Result<(), ()> {
let handle = executors.read().unwrap();
let id = task.id.clone();
let executor = handle.get(&target);
if executor.is_none() {
log::error!(
"unable to place task {} with candidate {}- candidate not found",
id,
target
);
return Err(());
}
let _ = executor.unwrap().update(|inner| {
inner.tasks.insert(id.clone(), task.id.clone());
});
log::debug!("task {} successfully placed onto {}", &id, &target);
Ok(())
}
fn handle_executor_znode_change(zk: Arc<ZooKeeper>, dir_path: String, lake: ExecutorList) {
let inner_zk = zk.clone();
let inner_path = dir_path.clone();
let inner_lake = lake.clone();
let registered_executors = zk
.get_children_w(dir_path.as_str(), move |ev: WatchedEvent| {
match ev.event_type {
WatchedEventType::None => {}
WatchedEventType::NodeCreated => {}
WatchedEventType::NodeDeleted => {}
WatchedEventType::NodeDataChanged => {}
WatchedEventType::NodeChildrenChanged => handle_executor_znode_change(
inner_zk.clone(),
inner_path.clone(),
inner_lake.clone(),
),
WatchedEventType::DataWatchRemoved => {}
WatchedEventType::ChildWatchRemoved => {}
}
})
.unwrap();
log::trace!(
"got {} executors to search through",
registered_executors.len()
);
let mut handle = lake.write().unwrap();
for known_executor in handle.clone().into_iter() {
if !registered_executors.contains(&known_executor.0) {
log::trace!(
"removing managed state for executor {0} at path {1}/{0}",
&known_executor.0,
&dir_path
);
handle.remove(&known_executor.0);
}
}
for registered_executor in registered_executors {
if handle.contains_key(®istered_executor) {
log::trace!("executor {} is already known to us", ®istered_executor);
} else {
match zkstate::ZkState::expect(
zk.clone(),
format!("{}/{}", &dir_path, ®istered_executor),
Some(Duration::from_secs(1))
) {
Ok(zks) => { handle.insert(registered_executor, zks); },
Err(err) => log::warn!("executor {} is invalid or has a malformed state. {:?}", ®istered_executor, err)
}
}
}
log::debug!("now tracking {} state(s)", handle.len());
drop(handle);
}
#[derive(Debug)]
enum ScheduleError {
NoEligibleNodes,
}
fn average(numbers: &[usize]) -> u32 {
(numbers.iter().sum::<usize>() as f32 / numbers.len() as f32).round() as u32
}
fn dense_pack(task: &Task, nodes: ExecutorMap) -> Result<String, ScheduleError> {
let mut candidates = vec![];
log::trace!(target: format!("jotty::manager::dense_pack::{}", &task.id).as_str(), "picking executor using dense_pack");
if !task.resources.is_empty() {
log::debug!(target: format!("jotty::manager::dense_pack::{}", &task.id).as_str(), "[dense_pack] packing based on requested resources");
for (id, node) in nodes {
let mut pcts = vec![];
for resource in &task.resources {
let total = node.read().unwrap().resources.get(resource.0.as_str());
let used_pct = resource.1 / total.unwrap();
pcts.push(used_pct);
}
if pcts.len() != task.resources.len() {
continue;
}
pcts.sort_by(|a, b| b.cmp(a));
candidates.push((id, average(&pcts)))
}
} else {
log::debug!(target: format!("jotty::manager::dense_pack::{}", &task.id).as_str(), "[dense_pack] packing based on number of used channels");
for (id, node) in nodes {
let n = node.read().unwrap();
candidates.push((id, (n.channels - n.tasks.len()) as u32))
}
}
if candidates.is_empty() {
return Err(ScheduleError::NoEligibleNodes);
}
candidates.sort_by(|a, b| b.1.cmp(&a.1));
log::trace!(target: format!("jotty::manager::dense_pack::{}", &task.id).as_str(), "candidates: {:#?}", &candidates);
Ok(candidates[0].clone().0)
}