1use std::sync::Arc;
2use tokio::sync::Mutex;
3use tokio::task::JoinSet;
4use tokio_util::sync::CancellationToken;
5
6use crate::config::Config;
7use crate::context::ContextValue;
8use crate::coordinator;
9use crate::error::OxanusError;
10use crate::result_collector::Stats;
11use crate::storage_internal::StorageInternal;
12use crate::worker_registry::CronJob;
13
14pub async fn run<DT, ET>(
49 config: Config<DT, ET>,
50 ctx: ContextValue<DT>,
51) -> Result<Stats, OxanusError>
52where
53 DT: Send + Sync + Clone + 'static,
54 ET: std::error::Error + Send + Sync + 'static,
55{
56 tracing::info!(
57 "Starting worker (namespace: {})",
58 config.storage.namespace()
59 );
60
61 let mut config = config;
62 let shutdown_signal = config.consume_shutdown_signal();
63 let config: Arc<Config<DT, ET>> = Arc::new(config);
64 let mut joinset = JoinSet::new();
65 let mut coordinator_joinset = JoinSet::new();
66 let stats = Arc::new(Mutex::new(Stats::default()));
67
68 joinset.spawn(ping_loop(Arc::clone(&config)));
69 joinset.spawn(retry_loop(Arc::clone(&config)));
70 joinset.spawn(schedule_loop(Arc::clone(&config)));
71 joinset.spawn(resurrect_loop(Arc::clone(&config)));
72 joinset.spawn(cron_loop(Arc::clone(&config)));
73 joinset.spawn(cleanup_loop(Arc::clone(&config)));
74
75 for queue_config in &config.queues {
76 coordinator_joinset.spawn(coordinator::run(
77 Arc::clone(&config),
78 Arc::clone(&stats),
79 ctx.clone(),
80 queue_config.clone(),
81 ));
82 }
83
84 config.storage.internal.start().await?;
85
86 let mut result = Ok(());
87
88 tokio::select! {
89 Some(task_result) = joinset.join_next() => {
90 result = task_result?;
91
92 if result.is_ok() {
93 tracing::info!("Background task unexpectedly finished");
94 }
95
96 config.cancel_token.cancel();
97 }
98 Some(task_result) = coordinator_joinset.join_next() => {
99 result = task_result?;
100
101 if result.is_ok() {
102 tracing::info!("Background task unexpectedly finished");
103 }
104
105 config.cancel_token.cancel();
106 }
107 _ = config.cancel_token.cancelled() => {}
108 _ = shutdown_signal => {
109 tracing::info!("Received shutdown signal");
110 config.cancel_token.cancel();
111 }
112 }
113
114 tracing::info!("Shutting down");
115
116 coordinator_joinset.join_all().await;
117
118 config.storage.internal.self_cleanup().await?;
119
120 let stats = Arc::try_unwrap(stats)
121 .expect("Failed to unwrap Arc - there are still references to stats")
122 .into_inner();
123
124 match result {
125 Ok(()) => {
126 tracing::info!("Gracefully shut down");
127 Ok(stats)
128 }
129 Err(e) => {
130 tracing::error!("Gracefully shut down with errors");
131 Err(e)
132 }
133 }
134}
135
136async fn retry_loop<DT, ET>(config: Arc<Config<DT, ET>>) -> Result<(), OxanusError>
137where
138 DT: Send + Sync + Clone + 'static,
139 ET: std::error::Error + Send + Sync + 'static,
140{
141 config
142 .storage
143 .internal
144 .retry_loop(config.cancel_token.clone())
145 .await?;
146
147 tracing::trace!("Retry loop finished");
148
149 Ok(())
150}
151
152async fn cleanup_loop<DT, ET>(config: Arc<Config<DT, ET>>) -> Result<(), OxanusError>
153where
154 DT: Send + Sync + Clone + 'static,
155 ET: std::error::Error + Send + Sync + 'static,
156{
157 config
158 .storage
159 .internal
160 .cleanup_loop(config.cancel_token.clone())
161 .await?;
162
163 tracing::trace!("Cleanup loop finished");
164
165 Ok(())
166}
167
168async fn schedule_loop<DT, ET>(config: Arc<Config<DT, ET>>) -> Result<(), OxanusError>
169where
170 DT: Send + Sync + Clone + 'static,
171 ET: std::error::Error + Send + Sync + 'static,
172{
173 config
174 .storage
175 .internal
176 .schedule_loop(config.cancel_token.clone())
177 .await?;
178
179 tracing::trace!("Schedule loop finished");
180
181 Ok(())
182}
183
184async fn ping_loop<DT, ET>(config: Arc<Config<DT, ET>>) -> Result<(), OxanusError>
185where
186 DT: Send + Sync + Clone + 'static,
187 ET: std::error::Error + Send + Sync + 'static,
188{
189 config
190 .storage
191 .internal
192 .ping_loop(config.cancel_token.clone())
193 .await?;
194
195 tracing::trace!("Ping loop finished");
196
197 Ok(())
198}
199
200async fn resurrect_loop<DT, ET>(config: Arc<Config<DT, ET>>) -> Result<(), OxanusError>
201where
202 DT: Send + Sync + Clone + 'static,
203 ET: std::error::Error + Send + Sync + 'static,
204{
205 config
206 .storage
207 .internal
208 .resurrect_loop(config.cancel_token.clone())
209 .await?;
210
211 tracing::trace!("Resurrect loop finished");
212
213 Ok(())
214}
215
216async fn cron_loop<DT, ET>(config: Arc<Config<DT, ET>>) -> Result<(), OxanusError>
217where
218 DT: Send + Sync + Clone + 'static,
219 ET: std::error::Error + Send + Sync + 'static,
220{
221 let mut set = JoinSet::new();
222
223 for (name, cron_job) in &config.registry.schedules {
224 set.spawn(cron_job_loop(
225 config.storage.internal.clone(),
226 config.cancel_token.clone(),
227 name.clone(),
228 cron_job.clone(),
229 ));
230 }
231
232 if set.is_empty() {
233 config.cancel_token.cancelled().await;
234 } else {
235 set.join_all().await;
236 }
237 Ok(())
238}
239
240async fn cron_job_loop(
241 storage: StorageInternal,
242 cancel_token: CancellationToken,
243 job_name: String,
244 cron_job: CronJob,
245) -> Result<(), OxanusError> {
246 storage
247 .cron_job_loop(cancel_token, job_name.clone(), cron_job)
248 .await?;
249
250 tracing::trace!("Cron job loop finished for {}", job_name);
251
252 Ok(())
253}