effectum 0.7.0

An embeddable task queue based on SQLite
Documentation
use std::{
    rc::Rc,
    sync::{
        atomic::{AtomicI64, Ordering},
        Arc,
    },
};

use rusqlite::{named_params, types::Value, Connection};
use time::OffsetDateTime;
use tokio::sync::Mutex;
use tracing::{event, Level};
use uuid::Uuid;

use super::DbOperationResult;
use crate::{
    shared_state::SharedState, worker::RunningJobs, Error, Result, RunningJob, RunningJobData,
};

pub(crate) struct ReadyJob {
    pub job: RunningJob,
    pub done_rx: tokio::sync::watch::Receiver<bool>,
}

pub(crate) struct GetReadyJobsArgs {
    pub job_types: Vec<Value>,
    pub max_jobs: u32,
    pub max_concurrency: u32,
    pub running_jobs: Arc<RunningJobs>,
    pub now: OffsetDateTime,
    pub result_tx: tokio::sync::oneshot::Sender<Result<Vec<ReadyJob>>>,
}

fn do_get_ready_jobs(
    tx: &Connection,
    queue: &SharedState,
    worker_id: u64,
    job_types: Vec<Value>,
    max_jobs: u32,
    max_concurrency: u32,
    running_jobs: Arc<RunningJobs>,
    now: OffsetDateTime,
) -> Result<Vec<ReadyJob>> {
    let mut stmt = tx.prepare_cached(
        r##"SELECT job_id, external_id, active_jobs.priority, weight,
                job_type, current_try,
                COALESCE(checkpointed_payload, payload) as payload,
                default_timeout,
                heartbeat_increment,
                backoff_multiplier,
                backoff_randomization,
                backoff_initial_interval,
                max_retries,
                orig_run_at,
                jobs.name
            FROM active_jobs
            JOIN jobs USING(job_id)
            WHERE active_worker_id IS NULL
                AND run_at <= $now
                AND job_type in rarray($job_types)
                AND weight <= $max_concurrency
            ORDER BY active_jobs.priority DESC, run_at
            LIMIT $limit"##,
    )?;

    #[derive(Debug)]
    struct JobResult {
        job_id: i64,
        external_id: Uuid,
        name: Option<String>,
        priority: i32,
        weight: u16,
        job_type: String,
        current_try: i32,
        payload: Option<Vec<u8>>,
        default_timeout: i32,
        heartbeat_increment: i32,
        backoff_multiplier: f64,
        backoff_randomization: f64,
        backoff_initial_interval: i32,
        max_retries: i32,
        orig_run_at: i64,
    }

    let now_timestamp = now.unix_timestamp();
    let jobs = stmt.query_map(
        named_params! {
            "$job_types": Rc::new(job_types),
            "$now": now_timestamp,
            "$max_concurrency": max_concurrency,
            "$limit": max_jobs,
        },
        |row| {
            let job_id: i64 = row.get(0)?;
            let external_id: Uuid = row.get(1)?;
            let priority: i32 = row.get(2)?;
            let weight: u16 = row.get(3)?;
            let job_type: String = row.get(4)?;
            let current_try: i32 = row.get(5)?;
            let payload: Option<Vec<u8>> = row.get(6)?;
            let default_timeout: i32 = row.get(7)?;
            let heartbeat_increment: i32 = row.get(8)?;
            let backoff_multiplier: f64 = row.get(9)?;
            let backoff_randomization: f64 = row.get(10)?;
            let backoff_initial_interval: i32 = row.get(11)?;
            let max_retries: i32 = row.get(12)?;
            let orig_run_at: i64 = row.get(13)?;
            let name: Option<String> = row.get(14)?;

            Ok(JobResult {
                job_id,
                priority,
                weight,
                job_type,
                current_try,
                payload,
                default_timeout,
                external_id,
                heartbeat_increment,
                backoff_multiplier,
                backoff_randomization,
                backoff_initial_interval,
                max_retries,
                orig_run_at,
                name,
            })
        },
    )?;

    let mut set_running = tx.prepare_cached(
        r##"UPDATE active_jobs
            SET active_worker_id=$worker_id, started_at=$now, expires_at=$expiration
            WHERE job_id=$job_id"##,
    )?;

    let mut ready_jobs = Vec::with_capacity(max_jobs as usize);
    let mut running_count = running_jobs.current_weighted.load(Ordering::Relaxed);
    for job in jobs {
        let job = job?;
        let weight = job.weight as u32;

        event!(Level::DEBUG, running_count, weight, max_concurrency);

        if running_count + weight > max_concurrency {
            break;
        }

        let expiration = now_timestamp + job.default_timeout as i64;

        set_running.execute(named_params! {
            "$job_id": job.job_id,
            "$worker_id": worker_id,
            "$now": now_timestamp,
            "$expiration": expiration
        })?;

        running_count = running_jobs
            .current_weighted
            .fetch_add(weight, Ordering::Relaxed)
            + weight;
        running_jobs.started.fetch_add(1, Ordering::Relaxed);

        let (done_tx, done_rx) = tokio::sync::watch::channel(false);
        let job = RunningJob(Arc::new(RunningJobData {
            id: job.external_id,
            job_id: job.job_id,
            name: job.name,
            worker_id,
            heartbeat_increment: job.heartbeat_increment,
            job_type: job.job_type,
            payload: job.payload.unwrap_or_default(),
            priority: job.priority,
            weight: job.weight,
            start_time: now,
            current_try: job.current_try,
            backoff_multiplier: job.backoff_multiplier,
            backoff_randomization: job.backoff_randomization,
            backoff_initial_interval: job.backoff_initial_interval,
            max_retries: job.max_retries,
            done: Mutex::new(Some(done_tx)),
            queue: queue.clone(),
            expires: AtomicI64::new(expiration),
            orig_run_at: OffsetDateTime::from_unix_timestamp(job.orig_run_at)
                .map_err(|_| Error::TimestampOutOfRange("orig_run_at"))?,
        }));

        ready_jobs.push(ReadyJob { job, done_rx });
    }

    Ok(ready_jobs)
}

pub(super) fn get_ready_jobs(
    tx: &Connection,
    queue: &SharedState,
    worker_id: u64,
    args: GetReadyJobsArgs,
) -> DbOperationResult {
    let GetReadyJobsArgs {
        job_types,
        max_jobs,
        max_concurrency,
        running_jobs,
        now,
        result_tx,
    } = args;

    let result = do_get_ready_jobs(
        tx,
        queue,
        worker_id,
        job_types,
        max_jobs,
        max_concurrency,
        running_jobs,
        now,
    );

    DbOperationResult::GetReadyJobs(super::OperationResult { result, result_tx })
}