rust_bus 3.0.6

bus — Lightweight CQRS Library for Rust
Documentation
use crate::BusError;
use crate::contracts::meta::BusMetadata;
use crate::dispatch::registration::DATABASE_HANDLERS_BY_HASH;
use crate::sql::dto::BusJob;
use crate::workers::configuration::BusQueueConfiguration;
use chrono::Utc;
use futures::future::join_all;
use sea_orm::TransactionTrait;
use sea_orm::{ConnectionTrait, DbBackend, Statement};
use serde_json::Value;
use uuid::Uuid;

pub(crate) async fn fetch_jobs(queue_name: &str, limit: usize) -> Result<Vec<BusJob>, BusError> {
    let db = BusQueueConfiguration::global()?.get_connection();
    let txn = db.begin().await?;
    let now = Utc::now();

    let select_sql = r#"
        SELECT
            id, hash_type_name, attempt + 1 as attempt, max_attempts, execution_timeout_sec,
            type_name_event, type_name_handler, payload, meta, tags, errors
        FROM bus_jobs USE INDEX(idx_bus_jobs_fetch_available)
        WHERE queue = ?
          AND state IN ('available', 'retryable')
          AND scheduled_at <= ?
        ORDER BY priority DESC, scheduled_at ASC, id ASC
        LIMIT ?
        FOR UPDATE SKIP LOCKED
    "#;

    let select_stmt = Statement::from_sql_and_values(
        DbBackend::MySql,
        select_sql,
        vec![queue_name.into(), now.into(), (limit as i64).into()],
    );

    let rows = txn.query_all_raw(select_stmt).await?;

    if rows.is_empty() {
        txn.commit().await?;
        return Ok(vec![]);
    }

    let mut jobs = Vec::with_capacity(rows.len());
    let mut ids = Vec::with_capacity(rows.len());

    for row in rows {
        let id: Uuid = row.try_get("", "id")?;
        ids.push(id);

        let meta_json: Value = row.try_get("", "meta")?;
        let meta: BusMetadata = serde_json::from_value(meta_json)
            .map_err(|e| sea_orm::DbErr::Custom(format!("Meta conversion error: {}", e)))?;

        jobs.push(BusJob {
            id,
            hash_type_name: row.try_get("", "hash_type_name")?,
            attempt: row.try_get("", "attempt")?,
            max_attempts: row.try_get("", "max_attempts")?,
            execution_timeout_sec: row.try_get("", "execution_timeout_sec")?,
            type_name_event: row.try_get("", "type_name_event")?,
            type_name_handler: row.try_get("", "type_name_handler")?,
            payload: row.try_get("", "payload")?,
            meta,
            tags: row.try_get("", "tags")?,
            errors: row.try_get("", "errors")?,
        });
    }

    let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
    let update_sql = format!(
        "UPDATE bus_jobs SET state = 'executing', attempted_at = ?, attempt = attempt + 1 WHERE id IN ({})",
        placeholders
    );

    let mut update_values: Vec<sea_orm::Value> = Vec::with_capacity(ids.len());
    update_values.push(Utc::now().into());
    for id in ids {
        update_values.push(id.into());
    }

    let update_stmt = Statement::from_sql_and_values(DbBackend::MySql, &update_sql, update_values);

    txn.execute_raw(update_stmt).await?;
    txn.commit().await?;

    Ok(jobs)
}

pub(crate) async fn handler_success(id: Uuid) -> Result<(), BusError> {
    let pool = BusQueueConfiguration::global()?.get_connection();

    let stmt = Statement::from_sql_and_values(
        DbBackend::MySql,
        r#"
        UPDATE bus_jobs
            SET state = 'completed',
            completed_at = ?
        WHERE id = ?
          AND state = 'executing'
        "#,
        vec![Utc::now().into(), id.into()],
    );

    let _result = pool
        .execute_raw(stmt)
        .await
        .map_err(|e| BusError::Database(e.to_string()))?;

    #[cfg(feature = "logging")]
    if _result.rows_affected() == 0 {
        log::warn!("[Worker] handler_success skipped (stale job): {}", id);
    }

    Ok(())
}

pub(crate) async fn handler_error(job: &BusJob, err_display: String) -> Result<(), BusError> {
    let pool = BusQueueConfiguration::global()?.get_connection();

    let err_json = serde_json::json!({
        "attempt": job.attempt,
        "error": err_display,
        "at": Utc::now().to_rfc3339()
    });

    let (sql, values) = if job.attempt < job.max_attempts {
        let database_handlers = DATABASE_HANDLERS_BY_HASH.get().ok_or_else(|| {
            BusError::Configuration("Bus not initialized. Call Bus::init().".into())
        })?;

        let reg = database_handlers.get(&job.hash_type_name).ok_or_else(|| {
            BusError::Configuration(format!("Handler not found: {}", job.hash_type_name))
        })?;

        let delay = (reg.next_attempt_at)(job.attempt as u32);
        let scheduled_at = Utc::now() + delay;

        let sql = r#"
            UPDATE bus_jobs
            SET state = 'retryable',
                scheduled_at = ?,
                errors = JSON_ARRAY_APPEND(COALESCE(errors, CAST('[]' AS JSON)), '$', CAST(? AS JSON))
            WHERE id = ?
              AND state = 'executing'
        "#;

        (
            sql,
            vec![
                scheduled_at.into(),
                err_json.to_string().into(),
                job.id.into(),
            ],
        )
    } else {
        let sql = r#"
            UPDATE bus_jobs
            SET state = 'discarded',
                discarded_at = ?,
                errors = JSON_ARRAY_APPEND(COALESCE(errors, CAST('[]' AS JSON)), '$', CAST(? AS JSON))
            WHERE id = ?
              AND state = 'executing'
        "#;

        (
            sql,
            vec![
                Utc::now().into(),
                err_json.to_string().into(),
                job.id.into(),
            ],
        )
    };

    let stmt = Statement::from_sql_and_values(DbBackend::MySql, sql, values);
    let _result = pool
        .execute_raw(stmt)
        .await
        .map_err(|e| BusError::Database(e.to_string()))?;

    #[cfg(feature = "logging")]
    if _result.rows_affected() == 0 {
        log::warn!("[Worker] handler_error skipped (stale job): {}", job.id);
    }

    Ok(())
}

pub(crate) async fn handler_error_shutdown(job: &BusJob) -> Result<(), BusError> {
    let db = BusQueueConfiguration::global()?.get_connection();

    let sql = r#"
        UPDATE bus_jobs
        SET
            state = CASE
                WHEN (attempt - 1) > 0 THEN 'retryable'
                ELSE 'available'
            END,
            attempt = attempt - 1,
            errors = JSON_ARRAY_APPEND(IFNULL(errors, CAST('[]' AS JSON)), '$', CAST(? AS JSON))
        WHERE id = ?
          AND state = 'executing'
    "#;

    let err_json = serde_json::json!({
        "attempt": job.attempt,
        "error": "Worker shutdown: job interrupted",
        "at": Utc::now().to_rfc3339()
    });

    db.execute_raw(Statement::from_sql_and_values(
        DbBackend::MySql,
        sql,
        vec![err_json.into(), job.id.into()],
    ))
    .await
    .map_err(|e| BusError::Database(e.to_string()))?;

    Ok(())
}

pub async fn recover_timed_out_jobs() -> Result<(), BusError> {
    let db = BusQueueConfiguration::global()?.get_connection();

    let select_stmt = Statement::from_sql_and_values(
        DbBackend::MySql,
        r#"
            SELECT
                id, hash_type_name, attempt, max_attempts,
                execution_timeout_sec, type_name_event, type_name_handler,
                payload, meta, tags, errors
            FROM bus_jobs
            WHERE state = 'executing'
              AND DATE_ADD(attempted_at, INTERVAL execution_timeout_sec SECOND) < ?
            LIMIT 100
        "#,
        vec![Utc::now().into()],
    );

    let rows = db.query_all_raw(select_stmt).await?;

    if rows.is_empty() {
        return Ok(());
    }

    let mut futures = Vec::new();

    for row in rows {
        let id: Uuid = row.try_get("", "id")?;
        let meta_json: Value = row.try_get("", "meta")?;
        let meta: BusMetadata = serde_json::from_value(meta_json)
            .map_err(|e| sea_orm::DbErr::Custom(format!("Meta conversion error: {}", e)))?;

        let job = BusJob {
            id,
            hash_type_name: row.try_get("", "hash_type_name")?,
            attempt: row.try_get("", "attempt")?,
            max_attempts: row.try_get("", "max_attempts")?,
            execution_timeout_sec: row.try_get("", "execution_timeout_sec")?,
            type_name_event: row.try_get("", "type_name_event")?,
            type_name_handler: row.try_get("", "type_name_handler")?,
            payload: row.try_get("", "payload")?,
            meta,
            tags: row.try_get("", "tags")?,
            errors: row.try_get("", "errors")?,
        };

        let fut = async move {
            let timeout_sec = job.execution_timeout_sec;
            if let Err(_e) = crate::sql::handler_error(
                &job,
                format!("Job execution timed out after {}s", timeout_sec),
            )
            .await
            {
                #[cfg(feature = "logging")]
                log::error!("[Recovery] Failed to recover job {}: {:?}", job.id, _e);
            }
        };

        futures.push(fut);
    }

    join_all(futures).await;
    Ok(())
}