roadster/worker/backend/sidekiq/processor/
mod.rs1use crate::app::context::AppContext;
2use crate::config::service::worker::StaleCleanUpBehavior;
3use crate::error::RoadsterResult;
4use crate::util::redis::RedisCommands;
5use crate::worker::backend::sidekiq::processor::builder::SidekiqProcessorBuilder;
6use crate::worker::{PeriodicArgsJson, WorkerWrapper};
7use axum_core::extract::FromRef;
8use itertools::Itertools;
9use sidekiq::periodic;
10use std::collections::{BTreeSet, HashMap, HashSet};
11use std::pin::Pin;
12use std::sync::Arc;
13use thiserror::Error;
14use tokio::sync::Mutex;
15use tokio::task::JoinSet;
16use tokio_util::sync::CancellationToken;
17use tracing::{error, info, warn};
18
19pub mod builder;
20
21const PERIODIC_KEY: &str = "periodic";
22
23#[derive(Debug, Error)]
24#[non_exhaustive]
25pub enum SidekiqProcessorError {
26 #[error("The provided `Worker` name was already registered: `{0}`")]
29 AlreadyRegistered(String),
30
31 #[error("The provided `Worker` name was already registered for a different type: `{0}`")]
34 AlreadyRegisteredWithDifferentType(String),
35
36 #[error(
39 "The provided periodic worker job was already registered. Worker: `{0}`, schedule: `{1}`, args: `{2}`"
40 )]
41 AlreadyRegisteredPeriodic(String, String, serde_json::Value),
42
43 #[error("No queue configured for worker `{0}`.")]
44 NoQueue(String),
45
46 #[error(transparent)]
47 Other(#[from] Box<dyn std::error::Error + Send + Sync>),
48}
49
50#[derive(Clone)]
51#[non_exhaustive]
52pub struct SidekiqProcessor<S>
53where
54 S: Clone + Send + Sync + 'static,
55 AppContext: FromRef<S>,
56{
57 pub(crate) inner: Arc<SidekiqProcessorInner<S>>,
58}
59
60pub(crate) struct WorkerData<S>
61where
62 S: Clone + Send + Sync + 'static,
63 AppContext: FromRef<S>,
64{
65 pub(crate) worker_wrapper: WorkerWrapper<S>,
66 pub(crate) register_sidekiq_fn: RegisterSidekiqFn<S>,
67 pub(crate) register_sidekiq_periodic_fn: RegisterSidekiqPeriodicFn<S>,
68}
69
70#[non_exhaustive]
71pub(crate) struct SidekiqProcessorInner<S>
72where
73 S: Clone + Send + Sync + 'static,
74 AppContext: FromRef<S>,
75{
76 pub(crate) state: S,
77 pub(crate) processor: Mutex<Option<::sidekiq::Processor>>,
78 pub(crate) queues: BTreeSet<String>,
79 pub(crate) periodic_workers: HashMap<PeriodicArgsJson, Arc<WorkerData<S>>>,
80}
81
82impl<S> SidekiqProcessor<S>
83where
84 S: Clone + Send + Sync + 'static,
85 AppContext: FromRef<S>,
86{
87 pub(crate) fn new(inner: SidekiqProcessorInner<S>) -> Self {
88 Self {
89 inner: Arc::new(inner),
90 }
91 }
92
93 pub fn builder(state: &S) -> SidekiqProcessorBuilder<S> {
94 SidekiqProcessorBuilder::new(state)
95 }
96
97 pub async fn before_run(&self, state: &S) -> RoadsterResult<()> {
98 self.initialize_periodic(state).await?;
99
100 Ok(())
101 }
102
103 async fn initialize_periodic(&self, state: &S) -> RoadsterResult<()> {
105 let context = AppContext::from_ref(state);
106
107 let periodic_config = &context
108 .config()
109 .service
110 .worker
111 .sidekiq
112 .custom
113 .custom
114 .periodic;
115
116 match periodic_config.stale_cleanup {
117 StaleCleanUpBehavior::Manual => {}
118 StaleCleanUpBehavior::AutoCleanAll => {
119 periodic::destroy_all(context.redis_enqueue().inner.clone()).await?;
120 info!("Deleted all previously registered periodic jobs");
121 }
122 StaleCleanUpBehavior::AutoCleanStale => {
123 }
125 };
126
127 let mut processor = self.inner.processor.lock().await;
128
129 let processor = if let Some(processor) = processor.as_mut() {
130 processor
131 } else {
132 warn!("No ::sidekiq::Processor available.");
133 return Ok(());
134 };
135
136 let mut registered_periodic_jobs_json: HashSet<String> = Default::default();
137 for (periodic_args, worker_data) in self.inner.periodic_workers.iter() {
138 let hash = (worker_data.register_sidekiq_periodic_fn)(
139 &self.inner.state,
140 processor,
141 worker_data.worker_wrapper.clone(),
142 periodic_args.clone(),
143 )
144 .await?;
145 registered_periodic_jobs_json.insert(hash);
146 }
147
148 if periodic_config.stale_cleanup == StaleCleanUpBehavior::AutoCleanStale {
149 let mut conn = context.redis_enqueue().get().await?;
150 remove_stale_periodic_jobs(&mut conn, &context, ®istered_periodic_jobs_json).await?;
151 }
152
153 Ok(())
154 }
155
156 pub async fn run(self, _state: &S, cancellation_token: CancellationToken) {
157 let processor = { self.inner.processor.lock().await.clone() };
158
159 let processor = match processor {
160 Some(processor) => processor,
161 None => {
162 warn!("No ::sidekiq::Processor available. Idling until cancelled.");
170 cancellation_token.cancelled().await;
171 return;
172 }
173 };
174 let sidekiq_cancel_token = processor.get_cancellation_token();
175
176 let mut join_set = JoinSet::new();
177 let token = cancellation_token.clone();
178 join_set.spawn(Box::pin(async move {
179 token.cancelled().await;
180 }));
181 let token = sidekiq_cancel_token.clone();
182 join_set.spawn(Box::pin(async move {
183 token.cancelled().await;
184 }));
185 join_set.spawn(processor.run());
186
187 while let Some(result) = join_set.join_next().await {
188 cancellation_token.cancel();
191 sidekiq_cancel_token.cancel();
192 if let Err(join_err) = result {
193 error!(
194 "An error occurred when trying to join on one of the app's tasks. Error: {join_err}"
195 );
196 }
197 }
198 }
199}
200
201async fn remove_stale_periodic_jobs<C: RedisCommands>(
210 conn: &mut C,
211 context: &AppContext,
212 registered_periodic_workers: &HashSet<String>,
213) -> RoadsterResult<()> {
214 let stale_jobs = conn
215 .zrange(PERIODIC_KEY.to_string(), 0, -1)
216 .await?
217 .into_iter()
218 .filter(|job| !registered_periodic_workers.contains(job))
219 .collect_vec();
220
221 if stale_jobs.is_empty() {
222 info!("No stale periodic jobs found");
223 return Ok(());
224 }
225
226 if context
227 .config()
228 .service
229 .worker
230 .sidekiq
231 .custom
232 .custom
233 .periodic
234 .stale_cleanup
235 == StaleCleanUpBehavior::AutoCleanStale
236 {
237 info!(count = stale_jobs.len(), "Removing stale periodic jobs",);
238 conn.zrem(PERIODIC_KEY.to_string(), stale_jobs.clone())
239 .await?;
240 } else {
241 warn!(count = stale_jobs.len(), "Found stale periodic jobs");
242 }
243
244 Ok(())
245}
246
247type RegisterSidekiqFn<S> =
248 Box<dyn Send + Sync + for<'a> Fn(&'a S, &'a mut ::sidekiq::Processor, WorkerWrapper<S>)>;
249type RegisterSidekiqPeriodicFn<S> = Box<
251 dyn Send
252 + Sync
253 + for<'a> Fn(
254 &'a S,
255 &'a mut ::sidekiq::Processor,
256 WorkerWrapper<S>,
257 PeriodicArgsJson,
258 ) -> Pin<Box<dyn 'a + Send + Future<Output = RoadsterResult<String>>>>,
259>;
260type RegisterSidekiqMiddlewareFn = Box<
261 dyn Send
262 + Sync
263 + for<'a> FnOnce(
264 &'a mut ::sidekiq::Processor,
265 ) -> Pin<Box<dyn 'a + Send + Future<Output = ()>>>,
266>;