oxanus/
launcher.rs

1use std::sync::Arc;
2use tokio::sync::Mutex;
3use tokio::task::JoinSet;
4use tokio_util::sync::CancellationToken;
5
6use crate::config::Config;
7use crate::context::ContextValue;
8use crate::coordinator;
9use crate::error::OxanusError;
10use crate::result_collector::Stats;
11use crate::storage_internal::StorageInternal;
12use crate::worker_registry::CronJob;
13
14/// Runs the Oxanus worker system with the given configuration and context.
15///
16/// This is the main entry point for running Oxanus workers. It sets up all necessary
17/// background tasks and starts processing jobs from the configured queues.
18///
19/// # Arguments
20///
21/// * `config` - The worker configuration, including queue and worker registrations
22/// * `ctx` - The context value that will be shared across all worker instances
23///
24/// # Returns
25///
26/// Returns statistics about the worker run, or an [`OxanusError`] if the operation fails.
27///
28/// # Examples
29///
30/// ```rust
31/// use oxanus::{Config, Context, Storage, Queue, Worker};
32///
33/// async fn run_worker() -> Result<(), oxanus::OxanusError> {
34///     let ctx = Context::value(MyContext {});
35///     let storage = Storage::builder().from_env()?.build()?;
36///
37///     let config = Config::new(&storage)
38///         .register_queue::<MyQueue>()
39///         .register_worker::<MyWorker>()
40///         .with_graceful_shutdown(tokio::signal::ctrl_c());
41///
42///     let stats = oxanus::run(config, ctx).await?;
43///     println!("Processed {} jobs", stats.processed);
44///
45///     Ok(())
46/// }
47/// ```
48pub async fn run<DT, ET>(
49    config: Config<DT, ET>,
50    ctx: ContextValue<DT>,
51) -> Result<Stats, OxanusError>
52where
53    DT: Send + Sync + Clone + 'static,
54    ET: std::error::Error + Send + Sync + 'static,
55{
56    tracing::info!(
57        "Starting worker (namespace: {})",
58        config.storage.namespace()
59    );
60
61    let mut config = config;
62    let shutdown_signal = config.consume_shutdown_signal();
63    let config: Arc<Config<DT, ET>> = Arc::new(config);
64    let mut joinset = JoinSet::new();
65    let mut coordinator_joinset = JoinSet::new();
66    let stats = Arc::new(Mutex::new(Stats::default()));
67
68    joinset.spawn(ping_loop(Arc::clone(&config)));
69    joinset.spawn(retry_loop(Arc::clone(&config)));
70    joinset.spawn(schedule_loop(Arc::clone(&config)));
71    joinset.spawn(resurrect_loop(Arc::clone(&config)));
72    joinset.spawn(cron_loop(Arc::clone(&config)));
73    joinset.spawn(cleanup_loop(Arc::clone(&config)));
74
75    for queue_config in &config.queues {
76        coordinator_joinset.spawn(coordinator::run(
77            Arc::clone(&config),
78            Arc::clone(&stats),
79            ctx.clone(),
80            queue_config.clone(),
81        ));
82    }
83
84    config.storage.internal.start().await?;
85
86    let mut result = Ok(());
87
88    tokio::select! {
89        Some(task_result) = joinset.join_next() => {
90            result = task_result?;
91
92            if result.is_ok() {
93                tracing::info!("Background task unexpectedly finished");
94            }
95
96            config.cancel_token.cancel();
97        }
98        Some(task_result) = coordinator_joinset.join_next() => {
99            result = task_result?;
100
101            if result.is_ok() {
102                tracing::info!("Background task unexpectedly finished");
103            }
104
105            config.cancel_token.cancel();
106        }
107        _ = config.cancel_token.cancelled() => {}
108        _ = shutdown_signal => {
109            tracing::info!("Received shutdown signal");
110            config.cancel_token.cancel();
111        }
112    }
113
114    tracing::info!("Shutting down");
115
116    coordinator_joinset.join_all().await;
117
118    config.storage.internal.self_cleanup().await?;
119
120    let stats = Arc::try_unwrap(stats)
121        .expect("Failed to unwrap Arc - there are still references to stats")
122        .into_inner();
123
124    match result {
125        Ok(()) => {
126            tracing::info!("Gracefully shut down");
127            Ok(stats)
128        }
129        Err(e) => {
130            tracing::error!("Gracefully shut down with errors");
131            Err(e)
132        }
133    }
134}
135
136async fn retry_loop<DT, ET>(config: Arc<Config<DT, ET>>) -> Result<(), OxanusError>
137where
138    DT: Send + Sync + Clone + 'static,
139    ET: std::error::Error + Send + Sync + 'static,
140{
141    config
142        .storage
143        .internal
144        .retry_loop(config.cancel_token.clone())
145        .await?;
146
147    tracing::trace!("Retry loop finished");
148
149    Ok(())
150}
151
152async fn cleanup_loop<DT, ET>(config: Arc<Config<DT, ET>>) -> Result<(), OxanusError>
153where
154    DT: Send + Sync + Clone + 'static,
155    ET: std::error::Error + Send + Sync + 'static,
156{
157    config
158        .storage
159        .internal
160        .cleanup_loop(config.cancel_token.clone())
161        .await?;
162
163    tracing::trace!("Cleanup loop finished");
164
165    Ok(())
166}
167
168async fn schedule_loop<DT, ET>(config: Arc<Config<DT, ET>>) -> Result<(), OxanusError>
169where
170    DT: Send + Sync + Clone + 'static,
171    ET: std::error::Error + Send + Sync + 'static,
172{
173    config
174        .storage
175        .internal
176        .schedule_loop(config.cancel_token.clone())
177        .await?;
178
179    tracing::trace!("Schedule loop finished");
180
181    Ok(())
182}
183
184async fn ping_loop<DT, ET>(config: Arc<Config<DT, ET>>) -> Result<(), OxanusError>
185where
186    DT: Send + Sync + Clone + 'static,
187    ET: std::error::Error + Send + Sync + 'static,
188{
189    config
190        .storage
191        .internal
192        .ping_loop(config.cancel_token.clone())
193        .await?;
194
195    tracing::trace!("Ping loop finished");
196
197    Ok(())
198}
199
200async fn resurrect_loop<DT, ET>(config: Arc<Config<DT, ET>>) -> Result<(), OxanusError>
201where
202    DT: Send + Sync + Clone + 'static,
203    ET: std::error::Error + Send + Sync + 'static,
204{
205    config
206        .storage
207        .internal
208        .resurrect_loop(config.cancel_token.clone())
209        .await?;
210
211    tracing::trace!("Resurrect loop finished");
212
213    Ok(())
214}
215
216async fn cron_loop<DT, ET>(config: Arc<Config<DT, ET>>) -> Result<(), OxanusError>
217where
218    DT: Send + Sync + Clone + 'static,
219    ET: std::error::Error + Send + Sync + 'static,
220{
221    let mut set = JoinSet::new();
222
223    for (name, cron_job) in &config.registry.schedules {
224        set.spawn(cron_job_loop(
225            config.storage.internal.clone(),
226            config.cancel_token.clone(),
227            name.clone(),
228            cron_job.clone(),
229        ));
230    }
231
232    if set.is_empty() {
233        config.cancel_token.cancelled().await;
234    } else {
235        set.join_all().await;
236    }
237    Ok(())
238}
239
240async fn cron_job_loop(
241    storage: StorageInternal,
242    cancel_token: CancellationToken,
243    job_name: String,
244    cron_job: CronJob,
245) -> Result<(), OxanusError> {
246    storage
247        .cron_job_loop(cancel_token, job_name.clone(), cron_job)
248        .await?;
249
250    tracing::trace!("Cron job loop finished for {}", job_name);
251
252    Ok(())
253}