1use std::sync::Arc;
2use tokio::sync::Mutex;
3use tokio_util::sync::CancellationToken;
4
5use crate::config::Config;
6use crate::context::ContextValue;
7use crate::coordinator;
8use crate::error::OxanusError;
9use crate::result_collector::Stats;
10use crate::storage_internal::StorageInternal;
11use crate::worker_registry::CronJob;
12
13pub async fn run<DT, ET>(
48 config: Config<DT, ET>,
49 ctx: ContextValue<DT>,
50) -> Result<Stats, OxanusError>
51where
52 DT: Send + Sync + Clone + 'static,
53 ET: std::error::Error + Send + Sync + 'static,
54{
55 tracing::info!(
56 "Starting worker (namespace: {})",
57 config.storage.namespace()
58 );
59
60 let mut config = config;
61 let shutdown_signal = config.consume_shutdown_signal();
62 let config = Arc::new(config);
63 let mut joinset = tokio::task::JoinSet::new();
64 let stats = Arc::new(Mutex::new(Stats::default()));
65
66 tokio::spawn(retry_loop(Arc::clone(&config)));
67 tokio::spawn(schedule_loop(Arc::clone(&config)));
68 tokio::spawn(ping_loop(Arc::clone(&config)));
69 tokio::spawn(resurrect_loop(Arc::clone(&config)));
70 tokio::spawn(cron_loop(Arc::clone(&config)));
71
72 for queue_config in &config.queues {
73 joinset.spawn(coordinator::run(
74 Arc::clone(&config),
75 Arc::clone(&stats),
76 ctx.clone(),
77 queue_config.clone(),
78 ));
79 }
80
81 tokio::select! {
82 _ = config.cancel_token.cancelled() => {}
83 _ = shutdown_signal => {
84 config.cancel_token.cancel();
85 }
86 }
87
88 tracing::info!("Shutting down");
89
90 joinset.join_all().await;
91
92 let stats = Arc::try_unwrap(stats)
93 .expect("Failed to unwrap Arc - there are still references to stats")
94 .into_inner();
95
96 tracing::info!("Gracefully shut down");
97
98 Ok(stats)
99}
100
101async fn retry_loop<DT, ET>(config: Arc<Config<DT, ET>>) -> Result<(), OxanusError>
102where
103 DT: Send + Sync + Clone + 'static,
104 ET: std::error::Error + Send + Sync + 'static,
105{
106 config
107 .storage
108 .internal
109 .retry_loop(config.cancel_token.clone())
110 .await?;
111
112 tracing::trace!("Retry loop finished");
113
114 Ok(())
115}
116
117async fn schedule_loop<DT, ET>(config: Arc<Config<DT, ET>>) -> Result<(), OxanusError>
118where
119 DT: Send + Sync + Clone + 'static,
120 ET: std::error::Error + Send + Sync + 'static,
121{
122 config
123 .storage
124 .internal
125 .schedule_loop(config.cancel_token.clone())
126 .await?;
127
128 tracing::trace!("Schedule loop finished");
129
130 Ok(())
131}
132
133async fn ping_loop<DT, ET>(config: Arc<Config<DT, ET>>) -> Result<(), OxanusError>
134where
135 DT: Send + Sync + Clone + 'static,
136 ET: std::error::Error + Send + Sync + 'static,
137{
138 config
139 .storage
140 .internal
141 .ping_loop(config.cancel_token.clone())
142 .await?;
143
144 tracing::trace!("Ping loop finished");
145
146 Ok(())
147}
148
149async fn resurrect_loop<DT, ET>(config: Arc<Config<DT, ET>>) -> Result<(), OxanusError>
150where
151 DT: Send + Sync + Clone + 'static,
152 ET: std::error::Error + Send + Sync + 'static,
153{
154 config
155 .storage
156 .internal
157 .resurrect_loop(config.cancel_token.clone())
158 .await?;
159
160 tracing::trace!("Resurrect loop finished");
161
162 Ok(())
163}
164
165async fn cron_loop<DT, ET>(config: Arc<Config<DT, ET>>) -> Result<(), OxanusError>
166where
167 DT: Send + Sync + Clone + 'static,
168 ET: std::error::Error + Send + Sync + 'static,
169{
170 for (name, cron_job) in &config.registry.schedules {
171 tokio::spawn(cron_job_loop(
172 config.storage.internal.clone(),
173 config.cancel_token.clone(),
174 name.clone(),
175 cron_job.clone(),
176 ));
177 }
178
179 Ok(())
180}
181
182async fn cron_job_loop(
183 storage: StorageInternal,
184 cancel_token: CancellationToken,
185 job_name: String,
186 cron_job: CronJob,
187) -> Result<(), OxanusError> {
188 storage
189 .cron_job_loop(cancel_token, job_name.clone(), cron_job)
190 .await?;
191
192 tracing::trace!("Cron job loop finished for {}", job_name);
193
194 Ok(())
195}