rust_bus 3.0.6

bus — Lightweight CQRS Library for Rust
Documentation
use crate::init;
use tokio::sync::OnceCell;

static INIT_DONE: OnceCell<()> = OnceCell::const_new();

#[cfg(not(feature = "_db_any"))]
pub async fn setup_bus() {
    INIT_DONE
        .get_or_init(|| async {
            init().await.expect("Failed to init bus queue");
        })
        .await;
}

#[cfg(feature = "_db_any")]
pub async fn setup_bus() {
    INIT_DONE
        .get_or_init(|| async {
            println!("🚀 Starting global one-time initialization...");

            use crate::workers::configuration::{BusQueueConfigurationBuilder, QueueConfigBuilder};

            let db_url = std::env::var("DATABASE_URL").expect("DATABASE_URL required");

            #[cfg(feature = "_db_sqlx")]
            let pool = {
                #[cfg(feature = "sqlx-postgres")]
                let p = sqlx::postgres::PgPoolOptions::new()
                    .max_connections(10)
                    .connect(&db_url)
                    .await
                    .unwrap();

                #[cfg(feature = "sqlx-mysql")]
                let p = sqlx::mysql::MySqlPoolOptions::new()
                    .max_connections(10)
                    .connect(&db_url)
                    .await
                    .unwrap();

                sqlx::query("DELETE FROM bus_jobs_peers")
                    .execute(&p)
                    .await
                    .ok();
                sqlx::query("DELETE FROM bus_jobs").execute(&p).await.ok();
                p
            };

            #[cfg(feature = "_db_sea_orm")]
            let pool = {
                let opt = sea_orm::ConnectOptions::new(db_url)
                    .max_connections(10)
                    .clone();

                use sea_orm::ConnectionTrait;
                let db = sea_orm::Database::connect(opt.clone())
                    .await
                    .expect("Database connection failed");

                db.execute_unprepared("DELETE FROM bus_jobs_peers")
                    .await
                    .ok();
                db.execute_unprepared("DELETE FROM bus_jobs").await.ok();
                opt
            };

            let config_builder = BusQueueConfigurationBuilder::new()
                .connection(pool)
                .add_queue("default", QueueConfigBuilder::new().workers(2));

            init(config_builder)
                .await
                .expect("Failed to init bus queue");

            println!("✅ Initialization finished");
            ()
        })
        .await;
}