Skip to main content

roadster/worker/backend/sidekiq/processor/
mod.rs

1use crate::app::context::AppContext;
2use crate::config::service::worker::StaleCleanUpBehavior;
3use crate::error::RoadsterResult;
4use crate::util::redis::RedisCommands;
5use crate::worker::backend::sidekiq::processor::builder::SidekiqProcessorBuilder;
6use crate::worker::{PeriodicArgsJson, WorkerWrapper};
7use axum_core::extract::FromRef;
8use itertools::Itertools;
9use sidekiq::periodic;
10use std::collections::{BTreeSet, HashMap, HashSet};
11use std::pin::Pin;
12use std::sync::Arc;
13use thiserror::Error;
14use tokio::sync::Mutex;
15use tokio::task::JoinSet;
16use tokio_util::sync::CancellationToken;
17use tracing::{error, info, warn};
18
19pub mod builder;
20
21const PERIODIC_KEY: &str = "periodic";
22
23#[derive(Debug, Error)]
24#[non_exhaustive]
25pub enum SidekiqProcessorError {
26    /// The provided [`crate::worker::Worker`] was already registered. Contains the
27    /// [`crate::worker::Worker::name`] of the provided worker.
28    #[error("The provided `Worker` name was already registered: `{0}`")]
29    AlreadyRegistered(String),
30
31    /// A [`crate::worker::Worker`] was previously registered that has the same name but is a
32    /// different type.
33    #[error("The provided `Worker` name was already registered for a different type: `{0}`")]
34    AlreadyRegisteredWithDifferentType(String),
35
36    /// The provided [`crate::worker::Worker`] was already registered. Contains the
37    /// [`crate::worker::Worker::name`] of the provided worker.
38    #[error(
39        "The provided periodic worker job was already registered. Worker: `{0}`, schedule: `{1}`, args: `{2}`"
40    )]
41    AlreadyRegisteredPeriodic(String, String, serde_json::Value),
42
43    #[error("No queue configured for worker `{0}`.")]
44    NoQueue(String),
45
46    #[error(transparent)]
47    Other(#[from] Box<dyn std::error::Error + Send + Sync>),
48}
49
50#[derive(Clone)]
51#[non_exhaustive]
52pub struct SidekiqProcessor<S>
53where
54    S: Clone + Send + Sync + 'static,
55    AppContext: FromRef<S>,
56{
57    pub(crate) inner: Arc<SidekiqProcessorInner<S>>,
58}
59
60pub(crate) struct WorkerData<S>
61where
62    S: Clone + Send + Sync + 'static,
63    AppContext: FromRef<S>,
64{
65    pub(crate) worker_wrapper: WorkerWrapper<S>,
66    pub(crate) register_sidekiq_fn: RegisterSidekiqFn<S>,
67    pub(crate) register_sidekiq_periodic_fn: RegisterSidekiqPeriodicFn<S>,
68}
69
70#[non_exhaustive]
71pub(crate) struct SidekiqProcessorInner<S>
72where
73    S: Clone + Send + Sync + 'static,
74    AppContext: FromRef<S>,
75{
76    pub(crate) state: S,
77    pub(crate) processor: Mutex<Option<::sidekiq::Processor>>,
78    pub(crate) queues: BTreeSet<String>,
79    pub(crate) periodic_workers: HashMap<PeriodicArgsJson, Arc<WorkerData<S>>>,
80}
81
82impl<S> SidekiqProcessor<S>
83where
84    S: Clone + Send + Sync + 'static,
85    AppContext: FromRef<S>,
86{
87    pub(crate) fn new(inner: SidekiqProcessorInner<S>) -> Self {
88        Self {
89            inner: Arc::new(inner),
90        }
91    }
92
93    pub fn builder(state: &S) -> SidekiqProcessorBuilder<S> {
94        SidekiqProcessorBuilder::new(state)
95    }
96
97    pub async fn before_run(&self, state: &S) -> RoadsterResult<()> {
98        self.initialize_periodic(state).await?;
99
100        Ok(())
101    }
102
103    /// Initialize the periodic queue tables and enqueue the periodic jobs in the queue.
104    async fn initialize_periodic(&self, state: &S) -> RoadsterResult<()> {
105        let context = AppContext::from_ref(state);
106
107        let periodic_config = &context
108            .config()
109            .service
110            .worker
111            .sidekiq
112            .custom
113            .custom
114            .periodic;
115
116        match periodic_config.stale_cleanup {
117            StaleCleanUpBehavior::Manual => {}
118            StaleCleanUpBehavior::AutoCleanAll => {
119                periodic::destroy_all(context.redis_enqueue().inner.clone()).await?;
120                info!("Deleted all previously registered periodic jobs");
121            }
122            StaleCleanUpBehavior::AutoCleanStale => {
123                // This is handled after the jobs are registered
124            }
125        };
126
127        let mut processor = self.inner.processor.lock().await;
128
129        let processor = if let Some(processor) = processor.as_mut() {
130            processor
131        } else {
132            warn!("No ::sidekiq::Processor available.");
133            return Ok(());
134        };
135
136        let mut registered_periodic_jobs_json: HashSet<String> = Default::default();
137        for (periodic_args, worker_data) in self.inner.periodic_workers.iter() {
138            let hash = (worker_data.register_sidekiq_periodic_fn)(
139                &self.inner.state,
140                processor,
141                worker_data.worker_wrapper.clone(),
142                periodic_args.clone(),
143            )
144            .await?;
145            registered_periodic_jobs_json.insert(hash);
146        }
147
148        if periodic_config.stale_cleanup == StaleCleanUpBehavior::AutoCleanStale {
149            let mut conn = context.redis_enqueue().get().await?;
150            remove_stale_periodic_jobs(&mut conn, &context, &registered_periodic_jobs_json).await?;
151        }
152
153        Ok(())
154    }
155
156    pub async fn run(self, _state: &S, cancellation_token: CancellationToken) {
157        let processor = { self.inner.processor.lock().await.clone() };
158
159        let processor = match processor {
160            Some(processor) => processor,
161            None => {
162                /*
163                This should never happen because the processor should only be missing if there
164                was no `redis_fetch` connection available, in which case the SidekiqWorkerService is
165                not enabled. However, if we do get to here, we simply idle until the app goes into
166                shutdown -- if we return from here before then, it will trigger the app to exit
167                pre-maturely.
168                 */
169                warn!("No ::sidekiq::Processor available. Idling until cancelled.");
170                cancellation_token.cancelled().await;
171                return;
172            }
173        };
174        let sidekiq_cancel_token = processor.get_cancellation_token();
175
176        let mut join_set = JoinSet::new();
177        let token = cancellation_token.clone();
178        join_set.spawn(Box::pin(async move {
179            token.cancelled().await;
180        }));
181        let token = sidekiq_cancel_token.clone();
182        join_set.spawn(Box::pin(async move {
183            token.cancelled().await;
184        }));
185        join_set.spawn(processor.run());
186
187        while let Some(result) = join_set.join_next().await {
188            // Once any of the tasks finish, cancel all the cancellation tokens to ensure
189            // the processor and the app shut down gracefully.
190            cancellation_token.cancel();
191            sidekiq_cancel_token.cancel();
192            if let Err(join_err) = result {
193                error!(
194                    "An error occurred when trying to join on one of the app's tasks. Error: {join_err}"
195                );
196            }
197        }
198    }
199}
200
201/// Compares the list of periodic jobs that were registered by the app during app startup with
202/// the list of periodic jobs in Redis, and removes any that exist in Redis but weren't
203/// registered during start up.
204///
205/// The jobs are only removed if the [worker.sidekiq.periodic.stale-cleanup][crate::config::worker::Periodic]
206/// config is set to [auto-clean-stale][StaleCleanUpBehavior::AutoCleanStale].
207///
208/// This is run after all the app's periodic jobs have been registered.
209async fn remove_stale_periodic_jobs<C: RedisCommands>(
210    conn: &mut C,
211    context: &AppContext,
212    registered_periodic_workers: &HashSet<String>,
213) -> RoadsterResult<()> {
214    let stale_jobs = conn
215        .zrange(PERIODIC_KEY.to_string(), 0, -1)
216        .await?
217        .into_iter()
218        .filter(|job| !registered_periodic_workers.contains(job))
219        .collect_vec();
220
221    if stale_jobs.is_empty() {
222        info!("No stale periodic jobs found");
223        return Ok(());
224    }
225
226    if context
227        .config()
228        .service
229        .worker
230        .sidekiq
231        .custom
232        .custom
233        .periodic
234        .stale_cleanup
235        == StaleCleanUpBehavior::AutoCleanStale
236    {
237        info!(count = stale_jobs.len(), "Removing stale periodic jobs",);
238        conn.zrem(PERIODIC_KEY.to_string(), stale_jobs.clone())
239            .await?;
240    } else {
241        warn!(count = stale_jobs.len(), "Found stale periodic jobs");
242    }
243
244    Ok(())
245}
246
247type RegisterSidekiqFn<S> =
248    Box<dyn Send + Sync + for<'a> Fn(&'a S, &'a mut ::sidekiq::Processor, WorkerWrapper<S>)>;
249// Returns the sidekiq json for the periodic job
250type RegisterSidekiqPeriodicFn<S> = Box<
251    dyn Send
252        + Sync
253        + for<'a> Fn(
254            &'a S,
255            &'a mut ::sidekiq::Processor,
256            WorkerWrapper<S>,
257            PeriodicArgsJson,
258        ) -> Pin<Box<dyn 'a + Send + Future<Output = RoadsterResult<String>>>>,
259>;
260type RegisterSidekiqMiddlewareFn = Box<
261    dyn Send
262        + Sync
263        + for<'a> FnOnce(
264            &'a mut ::sidekiq::Processor,
265        ) -> Pin<Box<dyn 'a + Send + Future<Output = ()>>>,
266>;