izta 0.1.2

Izta is a drop-in job queue for Rust
Documentation
//! Runnable instantiations of jobs on the task queue
pub mod task_req;

use crate::cron::Cron;
use crate::unix_time::get_cur_unix_time;
use postgres::rows::Rows;
use postgres::Connection;

#[derive(Clone, Debug)]
pub struct Task {
    pub id: i64,

    /// UUID of associated `Job`
    pub job_uuid: String,

    /// Among:
    /// * `pending` - Task is not running nor completed
    /// * `running` - Currently being executed
    /// * `failing` - Task failed, but has some retries remaining
    /// * `failed` - Task failed and won't be restarted
    pub status: String,

    /// Stores result of last run
    pub result: Option<String>,

    /// When to run this task (milliseconds since epoch)
    pub run_at: Option<i64>,

    /// Name of queue to which this task belongs
    pub queue: Option<String>,

    /// Number of attempts made to execute this task
    pub attempts: i32,

    /// Maximum number of attempts allowed
    pub max_attempts: i32,

    pub created_at: i64,
    pub updated_at: i64,

    /// Cron schedule for task
    pub cron: Option<Cron>,

    /// How often (in milliseconds) to execute task
    pub interval: Option<i64>,

    /// Serialized `Job`
    pub job: String,

    /// Task table to which this task belongs
    pub table_name: String,
}

impl Task {
    /// Pop next task from the queue
    pub fn pop_queue(conn: &Connection, queues: &[String], table_name: &str) -> Option<Task> {
        let current_time = get_cur_unix_time();

        let transaction = conn.transaction().expect("Error creating transaction");
        let rows = if queues.is_empty() {
            transaction.query(
                &format!(
                    "with top as (
select id from {table_name} where status = 'pending'
AND (run_at IS NULL OR run_at <= $1)
AND attempts < max_attempts
OR (status = 'failing' AND updated_at + 5000 + (attempts ^ 4) * 1000 <= $1)
order by id limit 1
)
update {table_name}
set status = 'running', attempts = attempts + 1, updated_at = $1
from top
where {table_name}.id = top.id
RETURNING {table_name}.*",
                    table_name = table_name
                ),
                &[&current_time],
            )
        } else {
            transaction.query(
                format!(
                    "with top as (
select id from {table_name} where status = 'pending'
AND (run_at IS NULL OR run_at <= $1) AND queue IN ('{queues}')
AND attempts < max_attempts
OR (status = 'failing' AND updated_at + 5000 + (attempts ^ 4) * 1000 <= $1)
order by id limit 1
)
update {table_name}
set status = 'running', attempts = attempts + 1, updated_at = $1
from top
where {table_name}.id = top.id
RETURNING {table_name}.*",
                    table_name = table_name,
                    queues = queues.join("','")
                )
                .as_str(),
                &[&current_time],
            )
        }
        .expect("Error popping queue");
        transaction.commit().expect("Error committing transaction");
        Task::rows_to_tasks(rows, table_name).first().cloned()
    }

    /// Return all pending tasks
    pub fn get_pending(conn: &Connection, queues: &[String], table_name: &str) -> Vec<Task> {
        Task::get_by_status(conn, queues, "pending", table_name)
    }

    /// Return all finished tasks
    pub fn get_finished(conn: &Connection, queues: &[String], table_name: &str) -> Vec<Task> {
        Task::get_by_status(conn, queues, "finished", table_name)
    }

    /// Return all failed tasks
    pub fn get_failed(conn: &Connection, queues: &[String], table_name: &str) -> Vec<Task> {
        Task::get_by_status(conn, queues, "failed", table_name)
    }

    /// Return task with the provided ID
    pub fn get_task_by_id(id: i64, conn: &Connection, table_name: &str) -> Option<Task> {
        Task::rows_to_tasks(
            conn.query(
                &format!("SELECT * FROM {} WHERE id = $1", table_name),
                &[&id],
            )
            .expect("error finding task"),
            table_name,
        )
        .first()
        .cloned()
    }

    pub fn delete(&self, conn: &Connection) {
        conn.execute(
            &format!("DELETE FROM {} WHERE id = $1", self.table_name),
            &[&self.id],
        )
        .expect("error deleting task");
    }

    fn get_by_status(
        conn: &Connection,
        queues: &[String],
        status: &str,
        table_name: &str,
    ) -> Vec<Task> {
        let current_time = get_cur_unix_time();
        if queues.is_empty() {
            Task::rows_to_tasks(
                conn.query(
                    &format!(
                        "SELECT * FROM {} WHERE status = $2 AND (run_at IS NULL OR run_at <= $1)",
                        table_name
                    ),
                    &[&current_time, &status],
                )
                .expect("error loading pending tasks"),
                table_name,
            )
        } else {
            let query = format!("SELECT * FROM {} WHERE status = $1 AND (run_at IS NULL OR run_at <= $2) AND queue IN ('{}')", table_name, queues.join("','"));
            Task::rows_to_tasks(
                conn.query(&query, &[&status, &current_time])
                    .expect("error loading pending tasks"),
                table_name,
            )
        }
    }

    /// Finish task
    pub fn finished(&self, res: &str, conn: &Connection) {
        self.set_status(conn, "finished", Some(res.to_owned()));
    }

    /// Reschedule task to run in the future
    pub fn reschedule(&self, res: &str, when: i64, conn: &Connection) {
        let updated_at = get_cur_unix_time();
        conn.execute(
            &format!("UPDATE {} SET status = 'pending', run_at = $1, updated_at = $2, result = $4, attempts = 0 WHERE id = $3", &self.table_name),
            &[&when, &updated_at, &self.id, &res],
        ).expect("Error rescheduling task");
    }

    /// Fail task
    pub fn failed(&self, err: Option<String>, conn: &Connection) {
        self.set_status(conn, "failed", err);
    }

    /// Mark task as failing to be retried again
    pub fn failing(&self, err: Option<String>, conn: &Connection) {
        self.set_status(conn, "failing", err);
    }

    /// Mark task as pending again
    pub fn release(&self, conn: &Connection) {
        self.set_status(conn, "pending", None);
    }

    fn set_status(&self, conn: &Connection, stat: &str, res: Option<String>) {
        let updated_at = get_cur_unix_time();
        conn.execute(
            &format!(
                "UPDATE {} SET status = $1, result = $3, updated_at = $4 WHERE id = $2",
                &self.table_name
            ),
            &[&stat, &self.id, &res, &updated_at],
        )
        .expect("Error updating task status");
    }

    /// Convert Postgres rows to a list of tasks
    pub fn rows_to_tasks(rows: Rows, table_name: &str) -> Vec<Task> {
        rows.iter()
            .map(|row| {
                let cron_str: String = row.get(10);
                Task {
                    id: row.get(0),
                    job_uuid: row.get(1),
                    status: row.get(2),
                    result: row.get(3),
                    run_at: row.get(4),
                    queue: row.get(5),
                    attempts: row.get(6),
                    max_attempts: row.get(7),
                    created_at: row.get(8),
                    updated_at: row.get(9),
                    cron: serde_json::from_str(&cron_str).unwrap(),
                    interval: row.get(11),
                    job: row.get(12),
                    table_name: table_name.to_string(),
                }
            })
            .collect()
    }

    /// Reload task from DB
    pub fn reload(&self, conn: &Connection) -> Task {
        Task::get_task_by_id(self.id, conn, &self.table_name).unwrap()
    }
}