1use std::sync::{Arc, OnceLock};
3use crate::services::runner_service::{start_worker_pool, start_delayed_worker_pool};
4use anyhow::{anyhow, Result};
5use tokio::sync::Notify;
6use tracing::info;
7use redis::{aio::MultiplexedConnection, Client}; use redis::AsyncCommands;
9
10use crate::cron::cron_scheduler::CronScheduler;
12
13#[derive(Clone, Debug)]
14pub struct QueueConfig {
15 pub name: String,
16 pub concurrency: usize,
17 pub priority: usize,
18}
19
20pub static QUEUE_INITIALIZED: OnceLock<Arc<Notify>> = OnceLock::new();
21pub static REDIS_URL: OnceLock<String> = OnceLock::new();
22pub static GLOBAL_QUEUES: OnceLock<Vec<QueueConfig>> = OnceLock::new();
23pub static QRUSH_SHUTDOWN: OnceLock<Arc<Notify>> = OnceLock::new();
24
25pub fn get_shutdown_notify() -> Arc<Notify> {
26 QRUSH_SHUTDOWN.get_or_init(|| Arc::new(Notify::new())).clone()
27}
28
29pub fn trigger_shutdown() {
33 get_shutdown_notify().notify_waiters();
34}
35
36
37async fn store_queue_metadata(queue: &QueueConfig) -> anyhow::Result<()> {
38 let mut conn = get_redis_conn().await?;
39 let redis_key = format!("snm:queue:config:{}", queue.name);
40 conn.hset_multiple::<_, _, _, ()>(&redis_key, &[
41 ("concurrency", queue.concurrency.to_string()),
42 ("priority", queue.priority.to_string()),
43 ]).await?;
44 Ok(())
45}
46
47impl QueueConfig {
48 pub fn new(name: impl Into<String>, concurrency: usize, priority: usize) -> Self {
49 Self {
50 name: name.into(),
51 concurrency,
52 priority,
53 }
54 }
55
56 pub fn from_configs(configs: Vec<(&str, usize, usize)>) -> Vec<Self> {
57 configs
58 .into_iter()
59 .map(|(name, concurrency, priority)| Self::new(name, concurrency, priority))
60 .collect()
61 }
62
63 pub async fn initialize(redis_url: String, queues: Vec<Self>) -> Result<()> {
64 set_redis_url(redis_url)?;
65 set_global_queues(queues.clone())?;
66
67 info!("Worker Pool Started");
68 for queue in &queues {
69 store_queue_metadata(queue).await?;
70 let config_key = format!("snm:queue:config:{}", queue.name);
71 let mut conn = get_redis_conn().await?;
72 let _: () = redis::pipe()
73 .hset(&config_key, "concurrency", queue.concurrency)
74 .hset(&config_key, "priority", queue.priority)
75 .query_async(&mut conn)
76 .await?;
77 start_worker_pool(&queue.name, queue.concurrency).await;
78 }
79
80 info!("Delayed Worker Pool Started");
81 start_delayed_worker_pool().await;
82
83 info!("Cron Scheduler Started");
85 CronScheduler::start().await;
86
87 Ok(())
88 }
89}
90
91pub fn get_global_queues() -> &'static [QueueConfig] {
92 GLOBAL_QUEUES.get().expect("Queues not initialized")
93}
94
95pub fn set_global_queues(configs: Vec<QueueConfig>) -> Result<()> {
96 GLOBAL_QUEUES
97 .set(configs)
98 .map_err(|_| anyhow!("Queues already initialized"))
99}
100
101pub fn get_redis_url() -> &'static str {
102 REDIS_URL.get().expect("Redis URL is not set")
103}
104
105pub fn set_redis_url(url: String) -> Result<()> {
106 REDIS_URL
107 .set(url)
108 .map_err(|_| anyhow!("Redis URL already set"))
109}
110
111#[derive(Debug, Clone)]
112pub struct QrushBasicAuthConfig {
113 pub username: String,
114 pub password: String,
115}
116
117pub static QRUSH_BASIC_AUTH: OnceLock<Option<QrushBasicAuthConfig>> = OnceLock::new();
118
119pub fn set_basic_auth(auth: Option<QrushBasicAuthConfig>) {
120 let _ = QRUSH_BASIC_AUTH.set(auth);
121}
122
123pub fn get_basic_auth() -> Option<&'static QrushBasicAuthConfig> {
124 QRUSH_BASIC_AUTH.get().and_then(|opt| opt.as_ref())
125}
126
127pub async fn get_redis_conn() -> redis::RedisResult<MultiplexedConnection> {
129 let redis_url = get_redis_url();
130 let client = Client::open(redis_url)?;
131 client.get_multiplexed_async_connection().await
132}