qrush_engine/
config.rs

1// src/config.rs
2use std::sync::{Arc, OnceLock};
3use crate::services::runner_service::{start_worker_pool, start_delayed_worker_pool};
4use anyhow::{anyhow, Result};
5use tokio::sync::Notify;
6use tracing::info;
7use redis::{aio::MultiplexedConnection, Client}; // Use MultiplexedConnection, not Connection
8use redis::AsyncCommands;
9
10// Add this import for cron scheduler
11use crate::cron::cron_scheduler::CronScheduler;
12use crate::utils::constants::QUEUE_CONFIG_PREFIX;
13
14#[derive(Clone, Debug)]
15pub struct QueueConfig {
16    pub name: String,
17    pub concurrency: usize,
18    pub priority: usize,
19}
20
21pub static QUEUE_INITIALIZED: OnceLock<Arc<Notify>> = OnceLock::new();
22pub static REDIS_URL: OnceLock<String> = OnceLock::new();
23pub static GLOBAL_QUEUES: OnceLock<Vec<QueueConfig>> = OnceLock::new();
24pub static QRUSH_SHUTDOWN: OnceLock<Arc<Notify>> = OnceLock::new();
25
26pub fn get_shutdown_notify() -> Arc<Notify> {
27    QRUSH_SHUTDOWN.get_or_init(|| Arc::new(Notify::new())).clone()
28}
29
30/// Notify all running worker loops to stop gracefully.
31///
32/// `qrush-engine` should call this on SIGINT/SIGTERM.
33pub fn trigger_shutdown() {
34    get_shutdown_notify().notify_waiters();
35}
36
37
38async fn store_queue_metadata(queue: &QueueConfig) -> anyhow::Result<()> {
39    let mut conn = get_redis_conn().await?;
40    let redis_key = format!("{QUEUE_CONFIG_PREFIX}:{}", queue.name);
41    conn.hset_multiple::<_, _, _, ()>(&redis_key, &[
42        ("concurrency", queue.concurrency.to_string()),
43        ("priority", queue.priority.to_string()),
44    ]).await?;
45    Ok(())
46}
47
48impl QueueConfig {
49    pub fn new(name: impl Into<String>, concurrency: usize, priority: usize) -> Self {
50        Self {
51            name: name.into(),
52            concurrency,
53            priority,
54        }
55    }
56
57    pub fn from_configs(configs: Vec<(&str, usize, usize)>) -> Vec<Self> {
58        configs
59            .into_iter()
60            .map(|(name, concurrency, priority)| Self::new(name, concurrency, priority))
61            .collect()
62    }
63
64    pub async fn initialize(redis_url: String, queues: Vec<Self>) -> Result<()> {
65        set_redis_url(redis_url)?;
66        set_global_queues(queues.clone())?;
67
68        info!("Worker Pool Started");
69        for queue in &queues {
70            store_queue_metadata(queue).await?;
71            let config_key = format!("{QUEUE_CONFIG_PREFIX}:{}", queue.name);
72            let mut conn = get_redis_conn().await?;
73            let _: () = redis::pipe()
74                .hset(&config_key, "concurrency", queue.concurrency)
75                .hset(&config_key, "priority", queue.priority)
76                .query_async(&mut conn)
77                .await?;
78            start_worker_pool(&queue.name, queue.concurrency).await;
79        }
80        
81        info!("Delayed Worker Pool Started");
82        start_delayed_worker_pool().await;
83        
84        // ADD THIS: Start cron scheduler
85        info!("Cron Scheduler Started");
86        CronScheduler::start().await;
87        
88        Ok(())
89    }
90}
91
92pub fn get_global_queues() -> &'static [QueueConfig] {
93    GLOBAL_QUEUES.get().expect("Queues not initialized")
94}
95
96pub fn set_global_queues(configs: Vec<QueueConfig>) -> Result<()> {
97    GLOBAL_QUEUES
98        .set(configs)
99        .map_err(|_| anyhow!("Queues already initialized"))
100}
101
102pub fn get_redis_url() -> &'static str {
103    REDIS_URL.get().expect("Redis URL is not set")
104}
105
106pub fn set_redis_url(url: String) -> Result<()> {
107    REDIS_URL
108        .set(url)
109        .map_err(|_| anyhow!("Redis URL already set"))
110}
111
112#[derive(Debug, Clone)]
113pub struct QrushBasicAuthConfig {
114    pub username: String,
115    pub password: String,
116}
117
118pub static QRUSH_BASIC_AUTH: OnceLock<Option<QrushBasicAuthConfig>> = OnceLock::new();
119
120pub fn set_basic_auth(auth: Option<QrushBasicAuthConfig>) {
121    let _ = QRUSH_BASIC_AUTH.set(auth);
122}
123
124pub fn get_basic_auth() -> Option<&'static QrushBasicAuthConfig> {
125    QRUSH_BASIC_AUTH.get().and_then(|opt| opt.as_ref())
126}
127
128// Keep the same connection function - MultiplexedConnection still works in 0.30.0
129pub async fn get_redis_conn() -> redis::RedisResult<MultiplexedConnection> {
130    let redis_url = get_redis_url();
131    let client = Client::open(redis_url)?;
132    client.get_multiplexed_async_connection().await
133}