use crate::cron::Cron;
use crate::establish_connection::establish_connection;
use crate::job::RunTaskResult;
use crate::supervised_thread::new_supervised_thread;
use crate::task::task_req::TaskReq;
use crate::task::Task;
use crate::unix_time::get_cur_unix_time;
use chrono::Utc;
use postgres::Connection;
use std::thread::JoinHandle;
use std::time::Duration;
pub struct Runner<F>
where
F: Fn(&Task) -> RunTaskResult,
F: Send + 'static + std::marker::Sync + Clone + Copy,
{
conn: Connection,
table_name: String,
queues: Vec<String>,
db_url: String,
f: F,
}
impl<F> Runner<F>
where
F: Fn(&Task) -> RunTaskResult,
F: Send + 'static + std::marker::Sync + Clone + Copy,
{
pub fn new(f: F, db_url: &str, table_name: &str, queues: Vec<String>) -> Runner<F>
where
F: Fn(&Task) -> RunTaskResult,
F: Send + 'static + std::marker::Sync + Clone + Copy,
{
Runner {
conn: establish_connection(&db_url),
table_name: table_name.to_string(),
queues: queues.clone(),
db_url: db_url.to_string(),
f,
}
}
pub fn start(&self) {
let db_url = self.db_url.clone();
let table_name = self.table_name.clone();
let queues = self.queues.clone();
let f = self.f;
new_supervised_thread(move || {
let conn = establish_connection(&db_url);
loop {
match Task::pop_queue(&conn, &queues, &table_name) {
Some(task) => {
Runner::consume_task(&task, f, &conn);
}
None => {
std::thread::sleep(Duration::from_secs(1));
}
}
}
});
}
fn consume_task(task: &Task, f: F, conn: &Connection)
where
F: Fn(&Task) -> RunTaskResult,
F: Send + 'static + std::marker::Sync + Clone + Copy,
{
let task2 = task.clone();
let handle: JoinHandle<RunTaskResult> = std::thread::spawn(move || f(&task2));
match handle.join() {
Ok(run_task_result) => {
persist_task_result(&task, run_task_result, conn);
}
Err(e) => {
if let Some(e) = e.downcast_ref::<&'static str>() {
task.failed(Some(e.to_string()), &conn);
} else {
task.failed(Some("thread panicked".to_string()), conn);
}
}
}
}
pub fn add_task(&self, req: &TaskReq) -> Task {
let cur = get_cur_unix_time();
let cron = serde_json::to_string(&req.cron).unwrap();
let run_at = req.run_at.map(|ts| ts.timestamp_millis());
let interval = req.interval.map(|i| i.num_milliseconds());
let rows = self.conn.query(
&format!("INSERT INTO {} (job_uuid, status, run_at, queue, attempts, max_attempts, created_at, updated_at, cron, interval, job) VALUES ($1, $2, $3, $4, $5, $6, $7, $7, $8, $9, $10) RETURNING *", &self.table_name),
&[&req.job_uuid, &"pending", &run_at, &req.queue, &0, &(req.max_attempts as i32), &cur, &cron, &interval, &req.job])
.expect("Error saving new task");
Task::rows_to_tasks(rows, &self.table_name)
.first()
.unwrap()
.clone()
}
pub fn finished(&self) -> Vec<Task> {
Task::get_finished(&self.conn, &self.queues, &self.table_name)
}
pub fn failed(&self) -> Vec<Task> {
Task::get_failed(&self.conn, &self.queues, &self.table_name)
}
}
fn reschedule_cron(task: &Task, cron: &Cron, result: &str, conn: &Connection) {
let run_at = cron.next_run(Utc::now());
task.reschedule(&result, run_at, &conn);
}
fn persist_task_result(task: &Task, run_task_result: RunTaskResult, conn: &Connection) {
match run_task_result {
RunTaskResult::Finished { result } => {
if let Some(cron) = &task.cron {
reschedule_cron(task, cron, &result, conn);
} else if let Some(interval) = task.interval {
task.reschedule(&result, get_cur_unix_time() + interval, &conn);
} else {
task.finished(&result, &conn);
}
}
RunTaskResult::Failed { result } => {
if task.attempts == task.max_attempts {
if let Some(cron) = &task.cron {
reschedule_cron(task, cron, &result, conn);
} else if let Some(interval) = task.interval {
task.reschedule(&result, get_cur_unix_time() + interval, &conn);
} else {
task.failed(Some(result), &conn);
}
} else {
task.failing(Some(result), &conn);
}
}
RunTaskResult::SerdeErr { msg } => {
task.failed(Some(msg), &conn);
}
RunTaskResult::None => {
task.failed(
Some(format!("Job with UUID {} does not exist", &task.job_uuid)),
&conn,
);
}
}
}
#[macro_export]
macro_rules! process_jobs {
($($type_name:ident) ,*) => {
|task| {
use izta::job::RunTaskResult;
use izta::serde_json;
$(
if $type_name::UUID == task.job_uuid {
let job: $type_name = match serde_json::from_str(&task.job) {
Ok(job) => job,
Err(e) => return RunTaskResult::SerdeErr { msg: e.to_string() },
};
let res = job.run_task(task);
if !res.is_none() {
return res;
}
}
)*
RunTaskResult::None
}
};
}