1use std::sync::{Arc, OnceLock};
3use crate::services::runner_service::{start_worker_pool, start_delayed_worker_pool};
4use anyhow::{anyhow, Result};
5use tokio::sync::Notify;
6use tracing::info;
7use redis::{aio::MultiplexedConnection, Client}; use redis::AsyncCommands;
9
10use crate::cron::cron_scheduler::CronScheduler;
12use crate::utils::constants::QUEUE_CONFIG_PREFIX;
13
14#[derive(Clone, Debug)]
15pub struct QueueConfig {
16 pub name: String,
17 pub concurrency: usize,
18 pub priority: usize,
19}
20
21pub static QUEUE_INITIALIZED: OnceLock<Arc<Notify>> = OnceLock::new();
22pub static REDIS_URL: OnceLock<String> = OnceLock::new();
23pub static GLOBAL_QUEUES: OnceLock<Vec<QueueConfig>> = OnceLock::new();
24pub static QRUSH_SHUTDOWN: OnceLock<Arc<Notify>> = OnceLock::new();
25
26pub fn get_shutdown_notify() -> Arc<Notify> {
27 QRUSH_SHUTDOWN.get_or_init(|| Arc::new(Notify::new())).clone()
28}
29
30pub fn trigger_shutdown() {
34 get_shutdown_notify().notify_waiters();
35}
36
37
38async fn store_queue_metadata(queue: &QueueConfig) -> anyhow::Result<()> {
39 let mut conn = get_redis_conn().await?;
40 let redis_key = format!("{QUEUE_CONFIG_PREFIX}:{}", queue.name);
41 conn.hset_multiple::<_, _, _, ()>(&redis_key, &[
42 ("concurrency", queue.concurrency.to_string()),
43 ("priority", queue.priority.to_string()),
44 ]).await?;
45 Ok(())
46}
47
48impl QueueConfig {
49 pub fn new(name: impl Into<String>, concurrency: usize, priority: usize) -> Self {
50 Self {
51 name: name.into(),
52 concurrency,
53 priority,
54 }
55 }
56
57 pub fn from_configs(configs: Vec<(&str, usize, usize)>) -> Vec<Self> {
58 configs
59 .into_iter()
60 .map(|(name, concurrency, priority)| Self::new(name, concurrency, priority))
61 .collect()
62 }
63
64 pub async fn initialize(redis_url: String, queues: Vec<Self>) -> Result<()> {
65 set_redis_url(redis_url)?;
66 set_global_queues(queues.clone())?;
67
68 info!("Worker Pool Started");
69 for queue in &queues {
70 store_queue_metadata(queue).await?;
71 let config_key = format!("{QUEUE_CONFIG_PREFIX}:{}", queue.name);
72 let mut conn = get_redis_conn().await?;
73 let _: () = redis::pipe()
74 .hset(&config_key, "concurrency", queue.concurrency)
75 .hset(&config_key, "priority", queue.priority)
76 .query_async(&mut conn)
77 .await?;
78 start_worker_pool(&queue.name, queue.concurrency).await;
79 }
80
81 info!("Delayed Worker Pool Started");
82 start_delayed_worker_pool().await;
83
84 info!("Cron Scheduler Started");
86 CronScheduler::start().await;
87
88 Ok(())
89 }
90}
91
92pub fn get_global_queues() -> &'static [QueueConfig] {
93 GLOBAL_QUEUES.get().expect("Queues not initialized")
94}
95
96pub fn set_global_queues(configs: Vec<QueueConfig>) -> Result<()> {
97 GLOBAL_QUEUES
98 .set(configs)
99 .map_err(|_| anyhow!("Queues already initialized"))
100}
101
102pub fn get_redis_url() -> &'static str {
103 REDIS_URL.get().expect("Redis URL is not set")
104}
105
106pub fn set_redis_url(url: String) -> Result<()> {
107 REDIS_URL
108 .set(url)
109 .map_err(|_| anyhow!("Redis URL already set"))
110}
111
112#[derive(Debug, Clone)]
113pub struct QrushBasicAuthConfig {
114 pub username: String,
115 pub password: String,
116}
117
118pub static QRUSH_BASIC_AUTH: OnceLock<Option<QrushBasicAuthConfig>> = OnceLock::new();
119
120pub fn set_basic_auth(auth: Option<QrushBasicAuthConfig>) {
121 let _ = QRUSH_BASIC_AUTH.set(auth);
122}
123
124pub fn get_basic_auth() -> Option<&'static QrushBasicAuthConfig> {
125 QRUSH_BASIC_AUTH.get().and_then(|opt| opt.as_ref())
126}
127
128pub async fn get_redis_conn() -> redis::RedisResult<MultiplexedConnection> {
130 let redis_url = get_redis_url();
131 let client = Client::open(redis_url)?;
132 client.get_multiplexed_async_connection().await
133}