oxanus/
launcher.rs

1use std::sync::Arc;
2use tokio::sync::Mutex;
3use tokio_util::sync::CancellationToken;
4
5use crate::config::Config;
6use crate::context::ContextValue;
7use crate::coordinator;
8use crate::error::OxanusError;
9use crate::result_collector::Stats;
10use crate::storage_internal::StorageInternal;
11use crate::worker_registry::CronJob;
12
13/// Runs the Oxanus worker system with the given configuration and context.
14///
15/// This is the main entry point for running Oxanus workers. It sets up all necessary
16/// background tasks and starts processing jobs from the configured queues.
17///
18/// # Arguments
19///
20/// * `config` - The worker configuration, including queue and worker registrations
21/// * `ctx` - The context value that will be shared across all worker instances
22///
23/// # Returns
24///
25/// Returns statistics about the worker run, or an [`OxanusError`] if the operation fails.
26///
27/// # Examples
28///
29/// ```rust
30/// use oxanus::{Config, Context, Storage, Queue, Worker};
31///
32/// async fn run_worker() -> Result<(), oxanus::OxanusError> {
33///     let ctx = Context::value(MyContext {});
34///     let storage = Storage::builder().from_env()?.build()?;
35///
36///     let config = Config::new(&storage)
37///         .register_queue::<MyQueue>()
38///         .register_worker::<MyWorker>()
39///         .with_graceful_shutdown(tokio::signal::ctrl_c());
40///
41///     let stats = oxanus::run(config, ctx).await?;
42///     println!("Processed {} jobs", stats.processed);
43///
44///     Ok(())
45/// }
46/// ```
47pub async fn run<DT, ET>(
48    config: Config<DT, ET>,
49    ctx: ContextValue<DT>,
50) -> Result<Stats, OxanusError>
51where
52    DT: Send + Sync + Clone + 'static,
53    ET: std::error::Error + Send + Sync + 'static,
54{
55    tracing::info!(
56        "Starting worker (namespace: {})",
57        config.storage.namespace()
58    );
59
60    let mut config = config;
61    let shutdown_signal = config.consume_shutdown_signal();
62    let config = Arc::new(config);
63    let mut joinset = tokio::task::JoinSet::new();
64    let stats = Arc::new(Mutex::new(Stats::default()));
65
66    tokio::spawn(retry_loop(Arc::clone(&config)));
67    tokio::spawn(schedule_loop(Arc::clone(&config)));
68    tokio::spawn(ping_loop(Arc::clone(&config)));
69    tokio::spawn(resurrect_loop(Arc::clone(&config)));
70    tokio::spawn(cron_loop(Arc::clone(&config)));
71
72    for queue_config in &config.queues {
73        joinset.spawn(coordinator::run(
74            Arc::clone(&config),
75            Arc::clone(&stats),
76            ctx.clone(),
77            queue_config.clone(),
78        ));
79    }
80
81    tokio::select! {
82        _ = config.cancel_token.cancelled() => {}
83        _ = shutdown_signal => {
84            config.cancel_token.cancel();
85        }
86    }
87
88    tracing::info!("Shutting down");
89
90    joinset.join_all().await;
91
92    let stats = Arc::try_unwrap(stats)
93        .expect("Failed to unwrap Arc - there are still references to stats")
94        .into_inner();
95
96    tracing::info!("Gracefully shut down");
97
98    Ok(stats)
99}
100
101async fn retry_loop<DT, ET>(config: Arc<Config<DT, ET>>) -> Result<(), OxanusError>
102where
103    DT: Send + Sync + Clone + 'static,
104    ET: std::error::Error + Send + Sync + 'static,
105{
106    config
107        .storage
108        .internal
109        .retry_loop(config.cancel_token.clone())
110        .await?;
111
112    tracing::trace!("Retry loop finished");
113
114    Ok(())
115}
116
117async fn schedule_loop<DT, ET>(config: Arc<Config<DT, ET>>) -> Result<(), OxanusError>
118where
119    DT: Send + Sync + Clone + 'static,
120    ET: std::error::Error + Send + Sync + 'static,
121{
122    config
123        .storage
124        .internal
125        .schedule_loop(config.cancel_token.clone())
126        .await?;
127
128    tracing::trace!("Schedule loop finished");
129
130    Ok(())
131}
132
133async fn ping_loop<DT, ET>(config: Arc<Config<DT, ET>>) -> Result<(), OxanusError>
134where
135    DT: Send + Sync + Clone + 'static,
136    ET: std::error::Error + Send + Sync + 'static,
137{
138    config
139        .storage
140        .internal
141        .ping_loop(config.cancel_token.clone())
142        .await?;
143
144    tracing::trace!("Ping loop finished");
145
146    Ok(())
147}
148
149async fn resurrect_loop<DT, ET>(config: Arc<Config<DT, ET>>) -> Result<(), OxanusError>
150where
151    DT: Send + Sync + Clone + 'static,
152    ET: std::error::Error + Send + Sync + 'static,
153{
154    config
155        .storage
156        .internal
157        .resurrect_loop(config.cancel_token.clone())
158        .await?;
159
160    tracing::trace!("Resurrect loop finished");
161
162    Ok(())
163}
164
165async fn cron_loop<DT, ET>(config: Arc<Config<DT, ET>>) -> Result<(), OxanusError>
166where
167    DT: Send + Sync + Clone + 'static,
168    ET: std::error::Error + Send + Sync + 'static,
169{
170    for (name, cron_job) in &config.registry.schedules {
171        tokio::spawn(cron_job_loop(
172            config.storage.internal.clone(),
173            config.cancel_token.clone(),
174            name.clone(),
175            cron_job.clone(),
176        ));
177    }
178
179    Ok(())
180}
181
182async fn cron_job_loop(
183    storage: StorageInternal,
184    cancel_token: CancellationToken,
185    job_name: String,
186    cron_job: CronJob,
187) -> Result<(), OxanusError> {
188    storage
189        .cron_job_loop(cancel_token, job_name.clone(), cron_job)
190        .await?;
191
192    tracing::trace!("Cron job loop finished for {}", job_name);
193
194    Ok(())
195}