qrush_engine/
config.rs

1// src/config.rs
2use std::sync::{Arc, OnceLock};
3use crate::services::runner_service::{start_worker_pool, start_delayed_worker_pool};
4use anyhow::{anyhow, Result};
5use tokio::sync::Notify;
6use tracing::info;
7use redis::{aio::MultiplexedConnection, Client}; // Use MultiplexedConnection, not Connection
8use redis::AsyncCommands;
9
10// Add this import for cron scheduler
11use crate::cron::cron_scheduler::CronScheduler;
12
13#[derive(Clone, Debug)]
14pub struct QueueConfig {
15    pub name: String,
16    pub concurrency: usize,
17    pub priority: usize,
18}
19
20pub static QUEUE_INITIALIZED: OnceLock<Arc<Notify>> = OnceLock::new();
21pub static REDIS_URL: OnceLock<String> = OnceLock::new();
22pub static GLOBAL_QUEUES: OnceLock<Vec<QueueConfig>> = OnceLock::new();
23pub static QRUSH_SHUTDOWN: OnceLock<Arc<Notify>> = OnceLock::new();
24
25pub fn get_shutdown_notify() -> Arc<Notify> {
26    QRUSH_SHUTDOWN.get_or_init(|| Arc::new(Notify::new())).clone()
27}
28
29/// Notify all running worker loops to stop gracefully.
30///
31/// `qrush-engine` should call this on SIGINT/SIGTERM.
32pub fn trigger_shutdown() {
33    get_shutdown_notify().notify_waiters();
34}
35
36
37async fn store_queue_metadata(queue: &QueueConfig) -> anyhow::Result<()> {
38    let mut conn = get_redis_conn().await?;
39    let redis_key = format!("snm:queue:config:{}", queue.name);
40    conn.hset_multiple::<_, _, _, ()>(&redis_key, &[
41        ("concurrency", queue.concurrency.to_string()),
42        ("priority", queue.priority.to_string()),
43    ]).await?;
44    Ok(())
45}
46
47impl QueueConfig {
48    pub fn new(name: impl Into<String>, concurrency: usize, priority: usize) -> Self {
49        Self {
50            name: name.into(),
51            concurrency,
52            priority,
53        }
54    }
55
56    pub fn from_configs(configs: Vec<(&str, usize, usize)>) -> Vec<Self> {
57        configs
58            .into_iter()
59            .map(|(name, concurrency, priority)| Self::new(name, concurrency, priority))
60            .collect()
61    }
62
63    pub async fn initialize(redis_url: String, queues: Vec<Self>) -> Result<()> {
64        set_redis_url(redis_url)?;
65        set_global_queues(queues.clone())?;
66
67        info!("Worker Pool Started");
68        for queue in &queues {
69            store_queue_metadata(queue).await?;
70            let config_key = format!("snm:queue:config:{}", queue.name);
71            let mut conn = get_redis_conn().await?;
72            let _: () = redis::pipe()
73                .hset(&config_key, "concurrency", queue.concurrency)
74                .hset(&config_key, "priority", queue.priority)
75                .query_async(&mut conn)
76                .await?;
77            start_worker_pool(&queue.name, queue.concurrency).await;
78        }
79        
80        info!("Delayed Worker Pool Started");
81        start_delayed_worker_pool().await;
82        
83        // ADD THIS: Start cron scheduler
84        info!("Cron Scheduler Started");
85        CronScheduler::start().await;
86        
87        Ok(())
88    }
89}
90
91pub fn get_global_queues() -> &'static [QueueConfig] {
92    GLOBAL_QUEUES.get().expect("Queues not initialized")
93}
94
95pub fn set_global_queues(configs: Vec<QueueConfig>) -> Result<()> {
96    GLOBAL_QUEUES
97        .set(configs)
98        .map_err(|_| anyhow!("Queues already initialized"))
99}
100
101pub fn get_redis_url() -> &'static str {
102    REDIS_URL.get().expect("Redis URL is not set")
103}
104
105pub fn set_redis_url(url: String) -> Result<()> {
106    REDIS_URL
107        .set(url)
108        .map_err(|_| anyhow!("Redis URL already set"))
109}
110
111#[derive(Debug, Clone)]
112pub struct QrushBasicAuthConfig {
113    pub username: String,
114    pub password: String,
115}
116
117pub static QRUSH_BASIC_AUTH: OnceLock<Option<QrushBasicAuthConfig>> = OnceLock::new();
118
119pub fn set_basic_auth(auth: Option<QrushBasicAuthConfig>) {
120    let _ = QRUSH_BASIC_AUTH.set(auth);
121}
122
123pub fn get_basic_auth() -> Option<&'static QrushBasicAuthConfig> {
124    QRUSH_BASIC_AUTH.get().and_then(|opt| opt.as_ref())
125}
126
127// Keep the same connection function - MultiplexedConnection still works in 0.30.0
128pub async fn get_redis_conn() -> redis::RedisResult<MultiplexedConnection> {
129    let redis_url = get_redis_url();
130    let client = Client::open(redis_url)?;
131    client.get_multiplexed_async_connection().await
132}