rust_bus 3.0.6

bus — Lightweight CQRS Library for Rust
Documentation
use crate::error::BusError;
use std::collections::HashMap;
use std::sync::OnceLock;

static BUS_QUEUE_CONFIG: OnceLock<BusQueueConfiguration> = OnceLock::new();

impl BusQueueConfiguration {
    #[allow(dead_code)]
    pub(crate) fn set_global(config: BusQueueConfiguration) -> Result<(), BusError> {
        BUS_QUEUE_CONFIG.set(config).map_err(|_| {
            BusError::Configuration(
                "BusQueueConfiguration has already been initialized".to_string(),
            )
        })
    }

    pub(crate) fn global() -> Result<&'static BusQueueConfiguration, BusError> {
        BUS_QUEUE_CONFIG.get().ok_or_else(|| {
            BusError::Configuration(
                "BusQueueConfiguration must be initialized before use. Call BusQueueConfiguration::set_global first.".to_string(),
            )
        })
    }

    #[cfg(feature = "_db_sea_orm")]
    pub(crate) fn get_connection(&self) -> &sea_orm::DatabaseConnection {
        &self.connection
    }

    #[cfg(feature = "sqlx-postgres")]
    pub(crate) fn get_connection(&self) -> &sqlx::PgPool {
        &self.connection
    }

    #[cfg(feature = "sqlx-mysql")]
    pub(crate) fn get_connection(&self) -> &sqlx::MySqlPool {
        &self.connection
    }

    pub(crate) fn get_queue(&self, name: &str) -> Result<&QueueConfiguration, BusError> {
        self.queues.get(name).ok_or_else(|| {
            BusError::Configuration(format!("Queue configuration for '{}' not found", name))
        })
    }

    pub(crate) fn queues(&self) -> &HashMap<String, QueueConfiguration> {
        &self.queues
    }

    pub(crate) fn srv_name(&self) -> &str {
        &self.srv_name
    }
}

#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct BusQueueConfiguration {
    srv_name: String,

    #[cfg(feature = "_db_sea_orm")]
    connection: sea_orm::DatabaseConnection,

    #[cfg(feature = "sqlx-postgres")]
    connection: sqlx::PgPool,

    #[cfg(feature = "sqlx-mysql")]
    connection: sqlx::MySqlPool,

    queues: HashMap<String, QueueConfiguration>,
}

#[allow(dead_code)]
#[derive(Clone, Debug)]
pub struct QueueConfiguration {
    pub(crate) workers: u32,
    pub(crate) max_attempts: u32,
    pub(crate) execution_timeout: chrono::Duration,
    pub(crate) empty_queue_delay: std::time::Duration,
}

pub struct BusQueueConfigurationBuilder {
    srv_name: String,

    #[cfg(feature = "_db_sea_orm")]
    connection: Option<sea_orm::ConnectOptions>,

    #[cfg(feature = "sqlx-postgres")]
    connection: Option<sqlx::Pool<sqlx::Postgres>>,

    #[cfg(feature = "sqlx-mysql")]
    connection: Option<sqlx::Pool<sqlx::MySql>>,

    pub(crate) queues: HashMap<String, QueueConfiguration>,
}

impl Default for BusQueueConfigurationBuilder {
    fn default() -> Self {
        Self::new()
    }
}

impl BusQueueConfigurationBuilder {
    pub fn new() -> Self {
        Self {
            srv_name: uuid::Uuid::now_v7().to_string(),
            #[cfg(feature = "_db_any")]
            connection: None,
            queues: HashMap::new(),
        }
    }

    pub fn srv_name<S: Into<String>>(mut self, srv_name: S) -> Self {
        self.srv_name = srv_name.into();
        self
    }

    #[cfg(feature = "_db_sea_orm")]
    pub fn connection(mut self, opts: sea_orm::ConnectOptions) -> Self {
        self.connection = Some(opts);
        self
    }

    #[cfg(feature = "_db_sqlx")]
    pub fn connection(
        mut self,
        #[cfg(feature = "sqlx-postgres")] pool: sqlx::Pool<sqlx::Postgres>,
        #[cfg(feature = "sqlx-mysql")] pool: sqlx::Pool<sqlx::MySql>,
    ) -> Self {
        self.connection = Some(pool);
        self
    }

    pub fn add_queue(mut self, name: impl Into<String>, config: QueueConfigBuilder) -> Self {
        let name_str = name.into();
        if self.queues.contains_key(&name_str) {
            panic!("Queue '{}' is already configured!", name_str);
        }
        self.queues.insert(name_str, config.build());
        self
    }

    pub async fn build(self) -> Result<BusQueueConfiguration, BusError> {
        if self.srv_name.is_empty() {
            return Err(BusError::Configuration(
                "srv_name cannot be empty".to_string(),
            ));
        }

        if self.srv_name.len() > 128 {
            return Err(BusError::Configuration(format!(
                "srv_name is too long ({} chars). Maximum is 128.",
                self.srv_name.len()
            )));
        }

        let connection_val = self.connection.ok_or_else(|| {
            BusError::Configuration(
                "Database connection must be provided. \
                Use .connection(opts) on the BusQueueConfigurationBuilder."
                    .to_string(),
            )
        })?;

        #[cfg(feature = "_db_sea_orm")]
        let connection = {
            use sea_orm::{ConnectionTrait, DatabaseBackend, Statement};

            let db = sea_orm::Database::connect(connection_val.clone())
                .await
                .map_err(|e| BusError::Configuration(format!("SeaORM connection failed: {}", e)))?;

            let backend = db.get_database_backend();

            let sql = match backend {
                DatabaseBackend::Postgres => {
                    "SELECT COUNT(*) FROM information_schema.tables WHERE table_name IN ('bus_jobs', 'bus_jobs_peers') AND table_schema = ANY(current_schemas(false))"
                }
                DatabaseBackend::MySql => {
                    "SELECT COUNT(*) FROM information_schema.tables WHERE table_name IN ('bus_jobs', 'bus_jobs_peers') AND table_schema = DATABASE()"
                }
                _ => {
                    return Err(BusError::Configuration(
                        "Unsupported database backend".to_string(),
                    ));
                }
            };

            let res = db
                .query_one_raw(Statement::from_string(backend, sql))
                .await
                .map_err(|e| BusError::Configuration(format!("Schema check failed: {}", e)))?;

            let tables_found: i64 = match res {
                Some(row) => row.try_get_by_index(0).unwrap_or(0),
                None => 0,
            };

            if tables_found < 2 {
                let ext = match backend {
                    DatabaseBackend::Postgres => "postgres.sql",
                    DatabaseBackend::MySql => "mysql.sql",
                    _ => "sql",
                };
                return Err(BusError::Configuration(format!(
                    "Required tables missing. Please run migrations from: ../../../migrations/{}",
                    ext
                )));
            }

            db
        };

        #[cfg(feature = "sqlx-postgres")]
        let connection = {
            let tables_exist: (i64,) = sqlx::query_as(
                "SELECT COUNT(*) FROM information_schema.tables
                     WHERE table_name IN ('bus_jobs', 'bus_jobs_peers') AND table_schema = ANY(current_schemas(false))"
            )
                .fetch_one(&connection_val)
                .await
                .map_err(|e| BusError::Configuration(e.to_string()))?;

            if tables_exist.0 < 2 {
                return Err(BusError::Configuration(
                    "Required tables 'bus_jobs' or 'bus_jobs_peers' are missing. \
                            Please run migrations from: ../../../migrations/postgres.sql"
                        .to_string(),
                ));
            }
            connection_val
        };

        #[cfg(feature = "sqlx-mysql")]
        let connection = {
            let tables_exist: (i64,) = sqlx::query_as(
                "SELECT COUNT(*) FROM information_schema.tables 
                     WHERE table_name IN ('bus_jobs', 'bus_jobs_peers') AND table_schema = DATABASE()"
            )
                .fetch_one(&connection_val)
                .await
                .map_err(|e| BusError::Configuration(e.to_string()))?;

            if tables_exist.0 < 2 {
                return Err(BusError::Configuration(
                    "Required tables 'bus_jobs' or 'bus_jobs_peers' are missing. \
                            Please run migrations from: ../../../migrations/mysql.sql"
                        .to_string(),
                ));
            }
            connection_val
        };

        if self.queues.is_empty() {
            return Err(BusError::Configuration(
                "At least one queue must be configured. \
                Use BusQueueConfigurationBuilder::add_queue(\"queue_name\", QueueConfigBuilder::new().build()) \
                to define your worker queues before calling init().".to_string(),
            ));
        }

        Ok(BusQueueConfiguration {
            srv_name: self.srv_name,
            connection,
            queues: self.queues,
        })
    }
}

pub struct QueueConfigBuilder {
    workers: u32,
    max_attempts: u32,
    execution_timeout: chrono::Duration,
    empty_queue_delay: std::time::Duration,
}

impl Default for QueueConfigBuilder {
    fn default() -> Self {
        Self::new()
    }
}

impl QueueConfigBuilder {
    pub fn new() -> Self {
        Self {
            workers: 1,
            max_attempts: 3,
            execution_timeout: chrono::Duration::minutes(10),
            empty_queue_delay: std::time::Duration::from_secs(5),
        }
    }

    pub fn workers(mut self, count: u32) -> Self {
        self.workers = count;
        self
    }

    pub fn max_attempts(mut self, count: u32) -> Self {
        self.max_attempts = count;
        self
    }

    pub fn execution_timeout(mut self, duration: chrono::Duration) -> Self {
        self.execution_timeout = duration;
        self
    }

    pub fn empty_queue_delay(mut self, delay: std::time::Duration) -> Self {
        self.empty_queue_delay = delay;
        self
    }

    fn build(self) -> QueueConfiguration {
        QueueConfiguration {
            workers: self.workers,
            max_attempts: self.max_attempts,
            execution_timeout: self.execution_timeout,
            empty_queue_delay: self.empty_queue_delay,
        }
    }
}