pub mod task_req;
use crate::cron::Cron;
use crate::unix_time::get_cur_unix_time;
use postgres::rows::Rows;
use postgres::Connection;
#[derive(Clone, Debug)]
pub struct Task {
pub id: i64,
pub job_uuid: String,
pub status: String,
pub result: Option<String>,
pub run_at: Option<i64>,
pub queue: Option<String>,
pub attempts: i32,
pub max_attempts: i32,
pub created_at: i64,
pub updated_at: i64,
pub cron: Option<Cron>,
pub interval: Option<i64>,
pub job: String,
pub table_name: String,
}
impl Task {
pub fn pop_queue(conn: &Connection, queues: &[String], table_name: &str) -> Option<Task> {
let current_time = get_cur_unix_time();
let transaction = conn.transaction().expect("Error creating transaction");
let rows = if queues.is_empty() {
transaction.query(
&format!(
"with top as (
select id from {table_name} where status = 'pending'
AND (run_at IS NULL OR run_at <= $1)
AND attempts < max_attempts
OR (status = 'failing' AND updated_at + 5000 + (attempts ^ 4) * 1000 <= $1)
order by id limit 1
)
update {table_name}
set status = 'running', attempts = attempts + 1, updated_at = $1
from top
where {table_name}.id = top.id
RETURNING {table_name}.*",
table_name = table_name
),
&[¤t_time],
)
} else {
transaction.query(
format!(
"with top as (
select id from {table_name} where status = 'pending'
AND (run_at IS NULL OR run_at <= $1) AND queue IN ('{queues}')
AND attempts < max_attempts
OR (status = 'failing' AND updated_at + 5000 + (attempts ^ 4) * 1000 <= $1)
order by id limit 1
)
update {table_name}
set status = 'running', attempts = attempts + 1, updated_at = $1
from top
where {table_name}.id = top.id
RETURNING {table_name}.*",
table_name = table_name,
queues = queues.join("','")
)
.as_str(),
&[¤t_time],
)
}
.expect("Error popping queue");
transaction.commit().expect("Error committing transaction");
Task::rows_to_tasks(rows, table_name).first().cloned()
}
pub fn get_pending(conn: &Connection, queues: &[String], table_name: &str) -> Vec<Task> {
Task::get_by_status(conn, queues, "pending", table_name)
}
pub fn get_finished(conn: &Connection, queues: &[String], table_name: &str) -> Vec<Task> {
Task::get_by_status(conn, queues, "finished", table_name)
}
pub fn get_failed(conn: &Connection, queues: &[String], table_name: &str) -> Vec<Task> {
Task::get_by_status(conn, queues, "failed", table_name)
}
pub fn get_task_by_id(id: i64, conn: &Connection, table_name: &str) -> Option<Task> {
Task::rows_to_tasks(
conn.query(
&format!("SELECT * FROM {} WHERE id = $1", table_name),
&[&id],
)
.expect("error finding task"),
table_name,
)
.first()
.cloned()
}
pub fn delete(&self, conn: &Connection) {
conn.execute(
&format!("DELETE FROM {} WHERE id = $1", self.table_name),
&[&self.id],
)
.expect("error deleting task");
}
fn get_by_status(
conn: &Connection,
queues: &[String],
status: &str,
table_name: &str,
) -> Vec<Task> {
let current_time = get_cur_unix_time();
if queues.is_empty() {
Task::rows_to_tasks(
conn.query(
&format!(
"SELECT * FROM {} WHERE status = $2 AND (run_at IS NULL OR run_at <= $1)",
table_name
),
&[¤t_time, &status],
)
.expect("error loading pending tasks"),
table_name,
)
} else {
let query = format!("SELECT * FROM {} WHERE status = $1 AND (run_at IS NULL OR run_at <= $2) AND queue IN ('{}')", table_name, queues.join("','"));
Task::rows_to_tasks(
conn.query(&query, &[&status, ¤t_time])
.expect("error loading pending tasks"),
table_name,
)
}
}
pub fn finished(&self, res: &str, conn: &Connection) {
self.set_status(conn, "finished", Some(res.to_owned()));
}
pub fn reschedule(&self, res: &str, when: i64, conn: &Connection) {
let updated_at = get_cur_unix_time();
conn.execute(
&format!("UPDATE {} SET status = 'pending', run_at = $1, updated_at = $2, result = $4, attempts = 0 WHERE id = $3", &self.table_name),
&[&when, &updated_at, &self.id, &res],
).expect("Error rescheduling task");
}
pub fn failed(&self, err: Option<String>, conn: &Connection) {
self.set_status(conn, "failed", err);
}
pub fn failing(&self, err: Option<String>, conn: &Connection) {
self.set_status(conn, "failing", err);
}
pub fn release(&self, conn: &Connection) {
self.set_status(conn, "pending", None);
}
fn set_status(&self, conn: &Connection, stat: &str, res: Option<String>) {
let updated_at = get_cur_unix_time();
conn.execute(
&format!(
"UPDATE {} SET status = $1, result = $3, updated_at = $4 WHERE id = $2",
&self.table_name
),
&[&stat, &self.id, &res, &updated_at],
)
.expect("Error updating task status");
}
pub fn rows_to_tasks(rows: Rows, table_name: &str) -> Vec<Task> {
rows.iter()
.map(|row| {
let cron_str: String = row.get(10);
Task {
id: row.get(0),
job_uuid: row.get(1),
status: row.get(2),
result: row.get(3),
run_at: row.get(4),
queue: row.get(5),
attempts: row.get(6),
max_attempts: row.get(7),
created_at: row.get(8),
updated_at: row.get(9),
cron: serde_json::from_str(&cron_str).unwrap(),
interval: row.get(11),
job: row.get(12),
table_name: table_name.to_string(),
}
})
.collect()
}
pub fn reload(&self, conn: &Connection) -> Task {
Task::get_task_by_id(self.id, conn, &self.table_name).unwrap()
}
}