use std::collections::HashMap;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use crate::executor::{ExecutorState, JoTTyExecutor};
use crate::manager::Manager;
use crate::proto::JoTTyResult;
use crate::rcm::{ResourceChunk, ResourceManager};
use anyhow::Context;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::time::Duration;
use uuid::Uuid;
use zookeeper::{ZkResult, ZooKeeper, ZooKeeperExt};
pub use anyhow::{bail, Result};
pub use zkstate::ZkState;
mod engine;
pub mod executor;
pub mod manager;
pub mod proto;
pub mod rcm;
const JOB_DEFAULT_POLICY: (SchedulerType, SchedulerPolicy, FailurePolicy) = (
SchedulerType::Finite(1),
SchedulerPolicy::Immediate,
FailurePolicy::Stop,
);
const TASK_DEFAULT_POLICY: (SchedulerType, SchedulerPolicy, FailurePolicy) = (
SchedulerType::Finite(1),
SchedulerPolicy::Immediate,
FailurePolicy::Stop,
);
pub enum JoTTyError {
ProtobufDecodeError(),
}
#[derive(Debug)]
pub enum TaskError {
NoSuchTask,
WaitTimeout,
}
#[derive(Clone)]
struct Queues {
job: zkmq::ZkMQ,
task: zkmq::ZkMQ,
}
#[allow(dead_code)]
pub struct Builder {
id: String,
namespace: String,
executor_enabled: bool,
executor_channels: usize,
executor_resources: ResourceManager,
manager_enabled: bool,
}
impl Builder {
pub fn new(namespace: String) -> Self {
Builder {
id: Uuid::new_v4().to_string(),
namespace,
executor_enabled: false,
executor_channels: 32,
executor_resources: Default::default(),
manager_enabled: false,
}
}
pub fn set_id(mut self, id: String) -> Self {
self.id = id;
self
}
pub fn executor(mut self, enabled: bool) -> Self {
self.executor_enabled = enabled;
self
}
pub fn manager(mut self, enabled: bool) -> Self {
self.manager_enabled = enabled;
self
}
pub fn channels(mut self, number: usize) -> Self {
self.executor_channels = number;
self
}
pub fn set_rcm(mut self, rcm: rcm::ResourceManager) -> Self {
self.executor_resources = rcm;
self
}
pub fn set_resource(
mut self,
rname: &str,
amount: usize,
block_size: usize,
overcommit: f32,
) -> Self {
let _ = self
.executor_resources
.declare(rname, amount, block_size, overcommit);
self
}
pub fn build(self, zk: Arc<ZooKeeper>) -> anyhow::Result<JoTTy> {
let queue_path = format!("/jotty/{}/queue", &self.namespace);
zk.ensure_path(queue_path.as_str())?;
let mut job_path = format!("/jotty/{}/job", self.namespace);
zk.ensure_path(job_path.as_str())?;
let task_path = format!("/jotty/{}/task", &self.namespace);
zk.ensure_path(task_path.as_str())?;
let run_path = format!("/jotty/{}/run", &self.namespace);
zk.ensure_path(run_path.as_str())?;
let exec_path = format!("/jotty/{}/executor", self.namespace);
zk.ensure_path(exec_path.as_str())?;
let state_path = format!("/jotty/{}/job_state", self.namespace);
zk.ensure_path(state_path.as_str())?;
job_path.push('/');
job_path.push_str(self.id.as_str());
let queues = Queues {
job: zkmq::ZkMQBuilder::new(zk.clone())
.consumer(true)
.producer(true)
.set_base(format!("{}/job", &queue_path))
.set_id(format!("{}-job", &self.id))
.build()
.context("building job queue")?,
task: zkmq::ZkMQBuilder::new(zk.clone())
.consumer(true)
.producer(true)
.set_base(format!("{}/task", &queue_path))
.set_id(format!("{}-task", &self.id))
.build()
.context("building task queue")?,
};
let client = JoTTyClient {
zk: zk.clone(),
namespace: self.namespace.clone(),
queue: queues.clone(),
};
Ok(JoTTy {
_id: self.id.clone(),
manager: Manager::new(
zk.clone(),
self.namespace.clone(),
queues,
self.id.clone(),
self.manager_enabled,
),
executor: JoTTyExecutor::new(
zk,
self.namespace,
self.executor_resources,
self.executor_channels,
Some(self.id),
)
.attach_client(client.clone()),
client,
})
}
}
pub struct JoTTy {
_id: String,
pub executor: JoTTyExecutor,
pub client: JoTTyClient,
pub manager: Manager,
}
#[derive(Clone)]
pub struct JoTTyClient {
zk: Arc<ZooKeeper>,
namespace: String,
queue: Queues,
}
impl JoTTyClient {
pub fn add_task(
&self,
method: &str,
arguments: serde_json::Value,
resources: HashMap<String, usize>,
) -> anyhow::Result<String> {
let start = chrono::Utc::now();
let id = uuid::Uuid::new_v4().to_string();
let task = JobType::Single(TaskDef {
method: method.to_string(),
arguments,
resources,
policy: Some(TASK_DEFAULT_POLICY.clone()),
});
log::debug!("creating {:?} under job {}", &task, &id);
let job = Job {
id: id.clone(),
status: TaskStatus::Pending(chrono::Utc::now(), 0),
timings: Default::default(),
shared_data: None,
inner: task.load(),
logs: TaskLogger::new(),
scheduler: JOB_DEFAULT_POLICY.clone(),
injected_tasks: vec![]
};
job.save(self.zk.clone(), self.namespace.clone())
.context("unable to save job definition")?;
self.queue
.job
.produce(job.id.into_bytes())
.context("enqueuing job for evaluation")?;
log::info!(
"took {}ms to create task {}",
(chrono::Utc::now() - start).num_milliseconds(),
&id
);
Ok(id)
}
pub fn add_job(&self, job: JobDef) -> anyhow::Result<String> {
let start = chrono::Utc::now();
let scheduler = job.policy.unwrap_or_else(|| JOB_DEFAULT_POLICY.clone());
let status = match scheduler.1 {
SchedulerPolicy::Immediate => TaskStatus::Pending(chrono::Utc::now(), 0),
SchedulerPolicy::Scheduled(time) => TaskStatus::Pending(time, 0),
};
let id = job.name.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
let job = Job {
id,
status,
timings: Default::default(),
shared_data: None,
inner: job.tree.load(),
logs: Default::default(),
scheduler,
injected_tasks: vec![]
};
let id = job.id.clone();
job.save(self.zk.clone(), self.namespace.clone())
.context("unable to save job definition")?;
self.queue
.job
.produce(job.id.into_bytes())
.context("enqueuing job for evaluation")?;
log::info!(
"took {}ms to create job {}",
(chrono::Utc::now() - start).num_milliseconds(),
&id
);
Ok(id)
}
pub fn get_resources(&self) -> anyhow::Result<HashMap<String, HashMap<String, usize>>> {
let zk = self.zk.clone();
let mut results = HashMap::new();
for child in
zk.get_children(&*format!("/jotty/{}/executors", &self.namespace), false)?
{
let raw_payload = zk
.get_data(
&*format!(
"/jotty/{}/executors/{}/payload",
&self.namespace, &child
),
false,
)
.context(format!("getting state for executor {}", &child))?
.0;
let payload: ExecutorState = serde_json::from_slice(&*raw_payload)
.context("unmarshalling payload into Struct")?;
results.insert(child, payload.resources.dump()?);
}
Ok(results)
}
pub fn wait_for_job(
&self,
id: String,
duration: Option<chrono::Duration>,
) -> Result<HandlerResult, TaskError> {
let path = format!("/jotty/{}/job/{}", self.namespace, id);
log::debug!("loading {}", &path);
let start_time = chrono::Utc::now();
let end_time = start_time + duration.unwrap_or_else(|| chrono::Duration::seconds(10));
loop {
if chrono::Utc::now() > end_time {
return Err(TaskError::WaitTimeout);
}
let req = self.zk.get_data(path.as_str(), false);
if req.is_err() {
log::warn!("{} does not exist or cannot be read", &path);
return Err(TaskError::NoSuchTask);
}
let data: Task = serde_json::from_slice(req.unwrap().0.as_slice()).unwrap();
match &data.status {
TaskStatus::Failure | TaskStatus::Success => {
log::debug!(
"job {} completed {:?}. took {}ms",
&id,
&data.status,
(chrono::Utc::now() - start_time).num_milliseconds()
);
return Ok(HandlerResult::success());
}
_ => {
log::trace!(
"job {} is not in a completed state ({:?}). sleeping",
&id,
&data.status
);
std::thread::sleep(Duration::from_secs(1));
}
}
}
}
pub fn load_response<T: prost::Message>(&self, id: String) -> JoTTyResult<T> {
JoTTyResult {
id,
inner: None,
status: TaskStatus::Scheduled,
}
}
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum SchedulerType {
Finite(u32),
Infinite,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum SchedulerPolicy {
Immediate,
Scheduled(DateTime<Utc>),
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum FailurePolicy {
FiniteRetry(u32, u32),
Stop,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum RetryReason {
Lost,
MethodNotFound(String),
SerdeError(String),
DimensionsExhausted(String),
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum HandlerResult {
Success(serde_json::Value),
Failure(serde_json::Value),
Retry(RetryReason),
}
impl HandlerResult {
pub fn success() -> Self {
Self::Success(serde_json::Value::Null)
}
pub fn success_reason(reason: serde_json::Value) -> Self {
Self::Success(reason)
}
pub fn failure() -> Self {
Self::Failure(serde_json::Value::Null)
}
pub fn failure_reason(reason: serde_json::Value) -> Self {
Self::Failure(reason)
}
pub fn retry(reason: RetryReason) -> Self {
Self::Retry(reason)
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum TaskStatus {
Pending(DateTime<Utc>, u16),
Scheduled,
Run,
Success,
Failure,
Lost(RetryReason),
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct TaskDef {
pub method: String,
pub arguments: serde_json::Value,
pub resources: HashMap<String, usize>,
pub policy: Option<(SchedulerType, SchedulerPolicy, FailurePolicy)>, }
impl TaskDef {
pub fn new<T: prost::Message + serde::Serialize>(
method: String,
arguments: T,
) -> Result<TaskDef, serde_json::Error> {
Ok(TaskDef {
method,
arguments: serde_json::to_value(arguments)?,
resources: Default::default(),
policy: None,
})
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Task {
pub status: TaskStatus,
pub executions: u16,
pub id: String,
pub parent_job: String,
pub method: String,
pub args: serde_json::Value,
pub resources: HashMap<String, usize>,
pub results: serde_json::Value,
pub executor: String,
pub logs: TaskLogger,
pub scheduler: (SchedulerType, SchedulerPolicy, FailurePolicy),
}
impl Task {
pub fn load(zk: Arc<ZooKeeper>, namespace: String, id: String) -> ZkResult<Task> {
let path = format!("/jotty/{}/task/{}", &namespace, &id);
log::trace!("loading task {} from {}", &id, &path);
Ok(serde_json::from_slice(zk.get_data(path.as_str(), false)?.0.as_slice()).unwrap())
}
pub fn save(&self, zk: Arc<ZooKeeper>, namespace: String) -> ZkResult<()> {
let path = format!("/jotty/{}/task/{}", namespace, self.id);
if zk.exists(path.as_str(), false)?.is_none() {
zk.create(
path.as_str(),
serde_json::to_vec(self).unwrap(),
zookeeper::Acl::open_unsafe().clone(),
zookeeper::CreateMode::Persistent,
)?;
} else {
zk.set_data(path.as_str(), serde_json::to_vec(self).unwrap(), None)?;
}
Ok(())
}
pub fn set_status(&mut self, status: TaskStatus) {
let m = format!("info - state change. {:?} -> {:?}", &self.status, &status);
log::debug!("{}", &m);
self.logs.info(m);
self.status = status;
}
pub fn set_executor(&mut self, id: &str) {
let m = format!("info - executor set to {}", &id);
log::debug!("{}", &m);
self.logs.info(m);
self.executor = id.to_string();
}
}
impl From<Vec<u8>> for Task {
fn from(inner: Vec<u8>) -> Task {
serde_json::from_slice(inner.as_slice()).unwrap()
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub(crate) struct Job {
pub id: String,
pub status: TaskStatus,
pub timings: HashMap<String, usize>,
#[serde(skip)]
pub shared_data: Option<ZkState<HashMap<String, serde_json::Value>>>,
pub inner: Box<InnerJobType>,
pub injected_tasks: Vec<JobType>,
pub logs: TaskLogger,
pub scheduler: (SchedulerType, SchedulerPolicy, FailurePolicy),
}
impl Job {
pub fn load(zk: Arc<ZooKeeper>, namespace: String, id: String) -> anyhow::Result<Self> {
log::trace!("loading /jotty/{}/job/{}", &namespace, &id);
let inner = zk
.get_data(&*format!("/jotty/{}/job/{}", &namespace, &id), false)
.context(format!("loading job state for job {}", &id))?;
let mut job: Job = serde_json::from_slice(&*inner.0).context("parsing saved job")?;
job.shared_data = Some(
ZkState::new(
zk,
format!("/jotty/{}/job_state/{}", &namespace, &id),
HashMap::new(),
)
.context("creating zkstate object for job shared_state")?,
);
Ok(job)
}
pub fn save(&self, zk: Arc<ZooKeeper>, namespace: String) -> anyhow::Result<()> {
let path = format!("/jotty/{}/job/{}", &namespace, &self.id);
log::trace!("saving {}", &path);
if zk.exists(path.as_str(), false)?.is_some() {
log::trace!("{} exists, updating contents", &path);
zk.set_data(
&*format!("/jotty/{}/job/{}", &namespace, &self.id),
serde_json::to_vec(self)?,
None,
)
.context(format!("unable to set_data on {}", &path))?;
} else {
zk.create(
&*format!("/jotty/{}/job/{}", &namespace, &self.id),
serde_json::to_vec(self)?,
zookeeper::Acl::open_unsafe().clone(),
zookeeper::CreateMode::Persistent,
)
.context(format!("unable to create {}", &path))?;
}
Ok(())
}
}
#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)]
pub enum JobType {
Single(TaskDef),
Linear(Vec<JobType>),
Branch(Box<JobType>, Option<Box<JobType>>, Option<Box<JobType>>),
}
impl JobType {
pub fn load(self) -> Box<InnerJobType> {
Box::new(match self {
JobType::Single(inner) => {
InnerJobType::Single(inner, EvalState::None("unset_2".to_string()), vec![])
}
JobType::Linear(inner) => {
let mut r = vec![];
for bx in inner {
r.push(bx.load())
}
InnerJobType::Linear(r)
}
JobType::Branch(a, b, c) => InnerJobType::Branch(
a.load(),
b.map(|r| Box::new(*r.load())),
c.map(|r| Box::new(*r.load())),
),
})
}
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct JobDef {
pub name: Option<String>,
pub tree: JobType,
pub policy: Option<(SchedulerType, SchedulerPolicy, FailurePolicy)>,
pub constraints: Option<HashMap<String, usize>>,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum EvalState {
None(String),
Some(String),
Done(String),
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
#[allow(clippy::vec_box)]
pub enum InnerJobType {
Single(TaskDef, EvalState, Vec<String>),
Linear(Vec<Box<InnerJobType>>),
Branch(
Box<InnerJobType>,
Option<Box<InnerJobType>>,
Option<Box<InnerJobType>>,
),
}
struct ContextWrapper {
task: Task,
ctx: TaskContext,
exit: HandlerResult,
executor: String,
zk: Arc<ZooKeeper>,
namespace: String,
}
impl ContextWrapper {
pub fn new(
zk: Arc<ZooKeeper>,
namespace: String,
ctx: TaskContext,
executor: String,
task: Task,
) -> Self {
Self {
zk,
namespace,
ctx,
exit: HandlerResult::success(),
task,
executor,
}
}
}
impl Drop for ContextWrapper {
fn drop(&mut self) {
self.task.status = match &self.exit {
HandlerResult::Success(reason) => {
self.task.results = reason.clone();
TaskStatus::Success
}
HandlerResult::Failure(reason) => {
self.task.results = reason.clone();
TaskStatus::Failure
}
HandlerResult::Retry(reason) => TaskStatus::Lost(reason.clone()),
};
let _ = self.task.save(self.zk.clone(), self.namespace.clone());
}
}
#[derive(Clone, Debug)]
pub struct TaskContext {
pub id: String,
pub method: String,
pub inputs: serde_json::Value,
pub running: Arc<AtomicBool>,
pub resources: HashMap<String, ResourceChunk>,
pub next_steps: Vec<JobType>,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct TaskLogger {
messages: Vec<(chrono::DateTime<chrono::Utc>, String, String)>,
}
impl Default for TaskLogger {
fn default() -> Self {
Self::new()
}
}
impl TaskLogger {
pub fn new() -> Self {
TaskLogger { messages: vec![] }
}
pub fn info(&mut self, message: String) {
self.messages
.push((chrono::Utc::now(), "info".to_string(), message))
}
}