rust_bus 3.0.6

bus — Lightweight CQRS Library for Rust
Documentation
use crate::contracts::bus_event::IBusEvent;
use crate::contracts::enums::{Replace, ScheduleIn};
use crate::contracts::meta::BusMetadata;
use crate::dispatch::registration::{DATABASE_HANDLERS, EventDatabaseHandlerRegistration};
use crate::error::BusError;
use crate::workers::configuration::BusQueueConfiguration;
use chrono::{DateTime, Utc};

#[cfg(feature = "sea-orm-mysql")]
use crate::dispatch::dispatch_db_sea_mysql::insert_sea_mysql;
#[cfg(feature = "sea-orm-postgres")]
use crate::dispatch::dispatch_db_sea_postgres::insert_sea_postgres;
#[cfg(feature = "sqlx-mysql")]
use crate::dispatch::dispatch_db_sqlx_mysql::insert_sqlx_mysql;
#[cfg(feature = "sqlx-postgres")]
use crate::dispatch::dispatch_db_sqlx_postgres::insert_sqlx_postgres;

#[cfg(not(feature = "sea-orm-mysql"))]
pub(crate) const INSERT_SQL: &str = r#"
INSERT INTO bus_jobs (
    id,
    hash_type_name,

    inserted_at,
    scheduled_at,
    attempted_at,
    completed_at,
    cancelled_at,
    discarded_at,

    state,
    priority,
    attempt,
    max_attempts,
    execution_timeout_sec,

    queue,
    type_name_event,
    type_name_handler,

    payload,
    meta,
    tags,
    errors,
    attempted_by
) "#;

pub(crate) async fn dispatch_db<'a, TEvent>(
    #[cfg(feature = "sqlx-postgres")] txn: &mut sqlx::Transaction<'a, sqlx::Postgres>,
    #[cfg(feature = "sqlx-mysql")] txn: &mut sqlx::Transaction<'a, sqlx::MySql>,
    #[cfg(feature = "_db_sea_orm")] txn: &'a sea_orm::DatabaseTransaction,
    event: &TEvent,
    metadata: &BusMetadata,
) -> Result<usize, BusError>
where
    TEvent: IBusEvent,
{
    let event_identity = TEvent::EVENT_IDENTITY;

    #[cfg(feature = "logging")]
    log::debug!("Dispatching database event: {}", event_identity);

    let handlers_map = DATABASE_HANDLERS.get().ok_or_else(|| {
        BusError::Configuration(
            "Bus not initialized. Call Bus::init() before dispatching.".to_string(),
        )
    })?;

    let handlers = match handlers_map.get(event_identity) {
        None => return Ok(0),
        Some(h) if h.is_empty() => return Ok(0),
        Some(h) => h,
    };

    let global_config = BusQueueConfiguration::global()?;
    let jobs = get_validated_handlers(handlers, global_config)?;

    #[cfg(feature = "sea-orm-postgres")]
    insert_sea_postgres(txn, jobs, event, metadata).await?;

    #[cfg(feature = "sea-orm-mysql")]
    insert_sea_mysql(txn, jobs, event, metadata).await?;

    #[cfg(feature = "sqlx-postgres")]
    insert_sqlx_postgres(txn, jobs, event, metadata).await?;

    #[cfg(feature = "sqlx-mysql")]
    insert_sqlx_mysql(txn, jobs, event, metadata).await?;

    Ok(handlers.len())
}

pub(crate) struct ValidatedJob<'a> {
    pub(crate) reg: &'a EventDatabaseHandlerRegistration,
    pub(crate) max_attempts: i32,
    pub(crate) timeout_sec: i32,
    pub(crate) priority: i32,
    pub(crate) tags_json: serde_json::Value,
    pub(crate) schedule_in: DateTime<Utc>,
}

pub(crate) fn get_validated_handlers<'a>(
    handlers: &Vec<&'a EventDatabaseHandlerRegistration>,
    global_config: &'a BusQueueConfiguration,
) -> Result<Vec<ValidatedJob<'a>>, BusError> {
    let now = Utc::now();

    handlers
        .iter()
        .map(|reg| {
            let q_conf = global_config.get_queue(reg.queue)?;

            let max_attempts: i32 = reg
                .max_attempts
                .unwrap_or(q_conf.max_attempts)
                .try_into()
                .map_err(|_| BusError::Configuration("Attempts too large".into()))?;

            let timeout_sec: i32 = reg
                .execution_timeout
                .unwrap_or(q_conf.execution_timeout)
                .num_seconds()
                .try_into()
                .map_err(|_| BusError::Configuration("Timeout too large".into()))?;

            let priority: i32 = reg
                .priority
                .try_into()
                .map_err(|_| BusError::Configuration("priority too large".into()))?;

            let tags_json = serde_json::to_value(reg.tags)
                .map_err(|_| BusError::Configuration("Tags has incorrect value".into()))?;

            let schedule_in = match reg.schedule_in {
                ScheduleIn::Immediately => now,
                ScheduleIn::Duration(duration) => {
                    Utc::now()
                        + chrono::TimeDelta::from_std(duration).map_err(|_| {
                            BusError::Configuration("schedule_in has incorrect value".into())
                        })?
                }
            };

            if let Some(ref unique) = reg.unique {
                let mut seen = std::collections::HashSet::new();
                for r in unique.replace.iter() {
                    let state_id = match r {
                        Replace::Available(_) => "available",
                        Replace::Scheduled(_) => "scheduled",
                        Replace::Executing(_) => "executing",
                        Replace::Retryable(_) => "retryable",
                        Replace::Completed(_) => "completed",
                        Replace::Cancelled(_) => "cancelled",
                        Replace::Discarded(_) => "discarded",
                    };
                    if !seen.insert(state_id) {
                        return Err(BusError::Configuration(format!(
                            "replace has non unique value '{}'",
                            state_id
                        )));
                    }
                }
            }

            Ok(ValidatedJob {
                reg,
                max_attempts,
                timeout_sec,
                priority,
                tags_json,
                schedule_in,
            })
        })
        .collect()
}