job_queue 0.0.20

A simple, efficient Rust library for handling asynchronous job processing and task queuing.
Documentation
use crate::{DBType, Error};

pub(crate) struct PoolOptions {
    pub(crate) max_connections: u32,
    pub(crate) min_connections: u32,
}

pub(crate) async fn get_pool(
    database_url: &str,
    options: PoolOptions,
) -> Result<(sqlx::Pool<sqlx::Any>, DBType), Error> {
    sqlx::any::install_default_drivers();

    let pool = sqlx::any::AnyPoolOptions::new()
        .max_connections(options.max_connections)
        .min_connections(options.min_connections)
        .connect(database_url)
        .await
        .map_err(Error::DatabaseError)?;

    let db_type = if database_url.starts_with("mysql") {
        DBType::Mysql
    } else if database_url.starts_with("postgres") {
        DBType::Postgres
    } else {
        return Err(Error::UnsupportedDatabaseUrl);
    };

    if database_url.starts_with("mysql") {
        sqlx::query(
            r" CREATE TABLE IF NOT EXISTS `jobs` (
                `id` bigint unsigned NOT NULL AUTO_INCREMENT,
                `uuid` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL,
                `queue` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL,
                `payload` longtext COLLATE utf8mb4_unicode_ci NOT NULL,
                `attempts` int unsigned NOT NULL,
                `reserved_at` int unsigned DEFAULT NULL,
                `available_at` int unsigned NOT NULL,
                `created_at` int unsigned NOT NULL,
                PRIMARY KEY (`id`),
                KEY `jobs_queue_index` (`queue`)
              ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci",
        )
        .execute(&pool)
        .await
        .map_err(Error::DatabaseError)?;

        sqlx::query(
            r" CREATE TABLE IF NOT EXISTS `failed_jobs` (
            `id` bigint unsigned NOT NULL AUTO_INCREMENT,
            `uuid` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL,
            `queue` text COLLATE utf8mb4_unicode_ci NOT NULL,
            `payload` longtext COLLATE utf8mb4_unicode_ci NOT NULL,
            `exception` longtext COLLATE utf8mb4_unicode_ci NOT NULL,
            `failed_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
            PRIMARY KEY (`id`),
            UNIQUE KEY `failed_jobs_uuid_unique` (`uuid`)
          ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci",
        )
        .execute(&pool)
        .await
        .map_err(Error::DatabaseError)?;
    } else if database_url.starts_with("postgres") {
        let mut transaction = pool.begin().await?;

        sqlx::query("CREATE SEQUENCE IF NOT EXISTS jobs_id_seq")
            .execute(&mut *transaction)
            .await
            .map_err(Error::DatabaseError)?;
        sqlx::query(
            r#"
                CREATE TABLE IF NOT EXISTS public.jobs (
                    id int8 NOT NULL DEFAULT nextval('jobs_id_seq'::regclass),
                    uuid text NOT NULL,
                    queue text NOT NULL,
                    payload text NOT NULL,
                    attempts int2 NOT NULL,
                    reserved_at int4,
                    available_at int4 NOT NULL,
                    created_at int4 NOT NULL,
                    PRIMARY KEY (id)
                )
                "#,
        )
        .execute(&mut *transaction)
        .await
        .map_err(Error::DatabaseError)?;

        sqlx::query(" CREATE SEQUENCE IF NOT EXISTS failed_jobs_id_seq")
            .execute(&mut *transaction)
            .await
            .map_err(Error::DatabaseError)?;

        sqlx::query(
            r#"
                CREATE TABLE IF NOT EXISTS public.failed_jobs (
                    "id" int8 NOT NULL DEFAULT nextval('failed_jobs_id_seq'::regclass),
                    "uuid" text NOT NULL,
                    "queue" text NOT NULL,
                    "payload" text NOT NULL,
                    "exception" text NOT NULL,
                    "failed_at" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
                    PRIMARY KEY ("id")
                )
                "#,
        )
        .execute(&mut *transaction)
        .await
        .map_err(Error::DatabaseError)?;

        transaction.commit().await.map_err(Error::DatabaseError)?;
    } else {
        return Err(Error::UnsupportedDatabaseUrl);
    }

    Ok((pool, db_type))
}