izta 0.1.2

Izta is a drop-in job queue for Rust
Documentation
//! Task-runner and scheduler
use crate::cron::Cron;
use crate::establish_connection::establish_connection;
use crate::job::RunTaskResult;
use crate::supervised_thread::new_supervised_thread;
use crate::task::task_req::TaskReq;
use crate::task::Task;
use crate::unix_time::get_cur_unix_time;
use chrono::Utc;
use postgres::Connection;
use std::thread::JoinHandle;
use std::time::Duration;

/// `Runner` spawns the task executor and allows new tasks to be scheduled.
pub struct Runner<F>
where
    F: Fn(&Task) -> RunTaskResult,
    F: Send + 'static + std::marker::Sync + Clone + Copy,
{
    conn: Connection,
    table_name: String,
    queues: Vec<String>,
    db_url: String,
    f: F,
}

impl<F> Runner<F>
where
    F: Fn(&Task) -> RunTaskResult,
    F: Send + 'static + std::marker::Sync + Clone + Copy,
{
    /// Returns a new `Runner`. Must call [`start()`] to begin executing tasks.
    ///
    /// # Arguments
    ///
    /// * `f` - Callback generated by `process_jobs!`. See Examples below.
    /// * `db_url` - Address of Postgres database containing tasks table. Syntax: postgres://username:password@hostname:port/db_name
    /// * `table_name` - The name of the Postgres table containing tasks.
    /// * `queues` - If empty, process any task. If not empty, only process tasks belonging to specified queues.
    ///
    /// # Example
    /// ```no_run
    /// # #[macro_use] extern crate serde;
    /// # use izta::runner::Runner;
    /// # use izta::process_jobs;
    /// # use izta::job::Job;
    /// #
    /// # #[derive(Serialize, Deserialize)]
    /// # struct JobA {}
    /// #
    /// # impl Job for JobA {
    /// #     type R = ();
    /// #     type E = ();
    /// #
    /// #     const UUID: &'static str = "";
    /// #     const MAX_ATTEMPTS: usize = 1;
    /// #
    /// #     fn run(&self) -> Result<Self::R, Self::E> {
    /// #         Ok(())
    /// #     }
    /// # }
    /// #
    /// # #[derive(Serialize, Deserialize)]
    /// # struct JobB {}
    /// #
    /// # impl Job for JobB {
    /// #     type R = ();
    /// #     type E = ();
    /// #
    /// #     const UUID: &'static str = "";
    /// #     const MAX_ATTEMPTS: usize = 1;
    /// #
    /// #     fn run(&self) -> Result<Self::R, Self::E> {
    /// #         Ok(())
    /// #     }
    /// # }
    ///
    /// // Provided that JobA and JobB are two types implementing the Job trait
    /// let runner = Runner::new(
    ///     process_jobs!(JobA, JobB),
    ///     "postgres://izta:password@localhost:5432/izta_test",
    ///     "tasks",
    ///     vec![],
    /// );
    /// ```
    ///
    /// [`start()`]: runner/struct.Runner.html#method_start
    pub fn new(f: F, db_url: &str, table_name: &str, queues: Vec<String>) -> Runner<F>
    where
        F: Fn(&Task) -> RunTaskResult,
        F: Send + 'static + std::marker::Sync + Clone + Copy,
    {
        Runner {
            conn: establish_connection(&db_url),
            table_name: table_name.to_string(),
            queues: queues.clone(),
            db_url: db_url.to_string(),
            f,
        }
    }

    /// Begin executing tasks.
    pub fn start(&self) {
        let db_url = self.db_url.clone();
        let table_name = self.table_name.clone();
        let queues = self.queues.clone();
        let f = self.f;
        new_supervised_thread(move || {
            let conn = establish_connection(&db_url);
            loop {
                match Task::pop_queue(&conn, &queues, &table_name) {
                    Some(task) => {
                        Runner::consume_task(&task, f, &conn);
                    }

                    None => {
                        // No tasks available for us! Check again later
                        std::thread::sleep(Duration::from_secs(1));
                    }
                }
            }
        });
    }

    // Execute provided task
    fn consume_task(task: &Task, f: F, conn: &Connection)
    where
        F: Fn(&Task) -> RunTaskResult,
        F: Send + 'static + std::marker::Sync + Clone + Copy,
    {
        let task2 = task.clone();
        let handle: JoinHandle<RunTaskResult> = std::thread::spawn(move || f(&task2));

        match handle.join() {
            Ok(run_task_result) => {
                persist_task_result(&task, run_task_result, conn);
            }

            Err(e) => {
                // Child thread panicked
                if let Some(e) = e.downcast_ref::<&'static str>() {
                    task.failed(Some(e.to_string()), &conn);
                } else {
                    task.failed(Some("thread panicked".to_string()), conn);
                }
            }
        }
    }

    /// Creates and schedules a new `Task` for execution from a `TaskReq`. Returns new task
    pub fn add_task(&self, req: &TaskReq) -> Task {
        let cur = get_cur_unix_time();
        let cron = serde_json::to_string(&req.cron).unwrap();
        let run_at = req.run_at.map(|ts| ts.timestamp_millis());
        let interval = req.interval.map(|i| i.num_milliseconds());
        let rows = self.conn.query(
            &format!("INSERT INTO {} (job_uuid, status, run_at, queue, attempts, max_attempts, created_at, updated_at, cron, interval, job) VALUES ($1, $2, $3, $4, $5, $6, $7, $7, $8, $9, $10) RETURNING *", &self.table_name),
            &[&req.job_uuid, &"pending", &run_at, &req.queue, &0, &(req.max_attempts as i32), &cur, &cron, &interval, &req.job])
            .expect("Error saving new task");
        Task::rows_to_tasks(rows, &self.table_name)
            .first()
            .unwrap()
            .clone()
    }

    /// Return all finished tasks managed by this `Runner`
    pub fn finished(&self) -> Vec<Task> {
        Task::get_finished(&self.conn, &self.queues, &self.table_name)
    }

    /// Return all failed tasks managed by this `Runner`
    pub fn failed(&self) -> Vec<Task> {
        Task::get_failed(&self.conn, &self.queues, &self.table_name)
    }
}

fn reschedule_cron(task: &Task, cron: &Cron, result: &str, conn: &Connection) {
    let run_at = cron.next_run(Utc::now());
    task.reschedule(&result, run_at, &conn);
}

// Save the result of executing task to the DB, possibly rescheduling it to run again.
fn persist_task_result(task: &Task, run_task_result: RunTaskResult, conn: &Connection) {
    match run_task_result {
        RunTaskResult::Finished { result } => {
            if let Some(cron) = &task.cron {
                reschedule_cron(task, cron, &result, conn);
            } else if let Some(interval) = task.interval {
                task.reschedule(&result, get_cur_unix_time() + interval, &conn);
            } else {
                task.finished(&result, &conn);
            }
        }

        RunTaskResult::Failed { result } => {
            if task.attempts == task.max_attempts {
                // Reschedule repeating jobs, fail one-offs
                if let Some(cron) = &task.cron {
                    reschedule_cron(task, cron, &result, conn);
                } else if let Some(interval) = task.interval {
                    task.reschedule(&result, get_cur_unix_time() + interval, &conn);
                } else {
                    task.failed(Some(result), &conn);
                }
            } else {
                task.failing(Some(result), &conn);
            }
        }

        RunTaskResult::SerdeErr { msg } => {
            // Unlikely to be recoverable
            task.failed(Some(msg), &conn);
        }

        RunTaskResult::None => {
            // Task in DB with no associated Job
            task.failed(
                Some(format!("Job with UUID {} does not exist", &task.job_uuid)),
                &conn,
            );
        }
    }
}

// Use: `cargo expand --lib --tests` in the izta_tests crate
// to debug this macro.
/// This macro returns a callback function that accepts a `Task` and evaluates the appropriate `Job`
/// type's `run()` method on its payload. The callback is intended to be provided to [`Runner::new()`].
/// See documentation for [`Runner::new()`] for example usage.
///
/// [`Runner::new()`]: runner/struct.Runner.html#method.new
#[macro_export]
macro_rules! process_jobs {
    ($($type_name:ident) ,*) => {
        |task| {
            use izta::job::RunTaskResult;
            use izta::serde_json;

            $(
                if $type_name::UUID == task.job_uuid {
                    let job: $type_name = match serde_json::from_str(&task.job) {
                        Ok(job) => job,

                        Err(e) => return RunTaskResult::SerdeErr { msg: e.to_string() },
                    };

                    let res = job.run_task(task);
                    if !res.is_none() {
                        return res;
                    }
                }
            )*

            RunTaskResult::None
        }
    };
}