1use futures::FutureExt as _;
2use sqlx::PgPool;
3use tokio::select;
4use tokio::sync::Semaphore;
5use tracing::Instrument as _;
6
7use crate::heartbeat;
8use crate::prelude::*;
9#[cfg(feature = "wait-for-job")]
10use crate::queue::wait_for_job::get_waiting_guard;
11use crate::result::{self, AnyJobResult, JobResultInternal};
12use crate::sync::{self, BackoffStrategy, JobStrategyError};
13
14use std::panic::AssertUnwindSafe;
15use std::sync::Arc;
16impl SimpleQueue {
18 #[tracing::instrument(skip(self))]
19 async fn fetch_next_job(&self) -> Result<Option<Job>, sqlx::Error> {
20 sqlx::query_as!(
21 Job,
22 r#"
23 UPDATE job_queue
24 SET
25 status = $2,
26 attempt = attempt + 1
27
28 WHERE id = (
29 SELECT id FROM job_queue
30 WHERE status = $1
31 AND (CURRENT_TIMESTAMP > run_at OR run_at IS NULL)
32 AND attempt < max_attempts
33 FOR UPDATE SKIP LOCKED
34 LIMIT 1
35 )
36 RETURNING *
37 "#,
38 result::JobResultInternal::Pending.to_string(),
39 result::JobResultInternal::Running.to_string(),
40 )
41 .fetch_optional(&self.pool)
42 .await
43 }
44 fn get_queue_semaphore(&self, queue: String) -> Arc<Semaphore> {
45 let entry = self
46 .queue_semaphores
47 .entry(queue)
48 .or_insert_with(|| Arc::new(Semaphore::new(self.queue_sem_count)));
49 let semaphore = entry.value();
50 semaphore.clone()
51 }
52 pub async fn run(
58 self: Arc<Self>,
59 start_permit: Option<tokio::sync::OwnedSemaphorePermit>,
60 ) -> Result<(), BoxDynError> {
61 drop(start_permit);
62 loop {
63 let _span = tracing::info_span!("poll");
64
65 let _global_permit = self.global_semaphore.clone().acquire_owned().await?;
66 let job = self.fetch_next_job().await?;
67 if let Some(job) = job {
68 let _job_span = tracing::info_span!("job", id = %job.id, queue = %job.queue);
69 #[cfg(feature = "wait-for-job")]
70 let _wait_guard = get_waiting_guard(job.id);
71 let _heartbeat = heartbeat::Heartbeat::start(
72 self.pool.clone(),
73 &job.id,
74 self.heartbeat_interval,
75 );
76
77 let result = select! {
78 sem_result = self.get_queue_semaphore(job.queue.clone())
79 .acquire_owned() => sem_result.map_err(|_| tokio::sync::TryAcquireError::Closed),
80 _ = tokio::time::sleep(self.hold_queue_semaphore) => Err(tokio::sync::TryAcquireError::NoPermits),
81 };
82 let Ok(_queue_permit) = result else {
83 tracing::warn!(
84 "Job queue semaphore acquire failed for queue: {}",
85 job.queue
86 );
87 match self.release_job(&job.id).await {
88 Ok(_) => {}
89 Err(e) => {
90 tracing::error!("Failed to release {:?}: {}", job, e);
91 return Err(e.into());
92 }
93 }
94 continue;
95 };
96 let q = Arc::clone(&self);
97 let job = Arc::new(job);
98 tokio::spawn(async move {
99 if job.reprocess_count >= q.max_reprocess_count as i32 {
100 handle_result(
101 AnyJobResult::Internal(JobResultInternal::BadJob),
102 &job,
103 &q.pool,
104 &q.get_backoff_strategy(&job),
105 ).await;
106 return;
107 }
108 let permit_result = q.get_job_strategy(&job).acquire(&job).await;
109 let backoff_strategy = q.get_backoff_strategy(&job);
110
111 if let Err(permit_err) = permit_result {
112 handle_strategy_error(permit_err, &job, &q.pool, &backoff_strategy).await;
113 return;
114 };
115 let _permit = permit_result.unwrap();
116 let q_name: String = job.queue.clone();
117 let result = if let Some(handler) = q.job_registry.get(q_name.as_str()) {
118
119 let wrapped_result = AssertUnwindSafe(
120 handler.process_dyn(&q, &job)
121 .instrument(tracing::info_span!("process_job", job_id = %&job.id, queue = %&job.queue, attempt = %&job.attempt, max_attempts = %&job.max_attempts, run_at = ?&job.run_at)))
122 .catch_unwind().await;
123
124 match wrapped_result {
125 Ok(Ok(process)) => process,
126 Ok(Err(e)) => {
127 tracing::error!("Handler returned error: {:?}", e);
128 JobResult::InternalError
129 }
130 Err(_) => {
131 tracing::error!("Handler panicked or returned error: {}", &job.id);
132 JobResult::InternalError
133 }
134 }
135 } else {
136 tracing::warn!("Missing handler for: {:?}", (job.queue).clone().as_str());
137 JobResult::HandlerMissing
138 };
139
140 handle_result(AnyJobResult::Public(result), &job, &q.pool, &backoff_strategy).await;
141 drop(_permit);
142 drop(_queue_permit);
143 drop(_global_permit);
144 drop(_heartbeat);
145 #[cfg(feature = "wait-for-job")]
146 drop(_wait_guard);
147 }.instrument(_job_span));
148 } else {
149 tokio::time::sleep(self.empty_poll_sleep).await;
150 }
151 }
152 }
153
154 #[tracing::instrument(skip(self))]
155 async fn release_job(&self, id: &uuid::Uuid) -> Result<(), sqlx::Error> {
156 sqlx::query!(
157 "UPDATE job_queue SET status = $1 WHERE id = $2",
158 result::JobResultInternal::Pending.to_string(),
159 id
160 )
161 .execute(&self.pool)
162 .await?;
163 Ok(())
164 }
165
166 fn get_backoff_strategy(&self, job: &Job) -> BackoffStrategy {
167 self.queue_backoff_strategies
168 .get(job.queue.as_str())
169 .map(|r| r.value().clone())
170 .unwrap_or(self.default_backoff_strategy.clone())
171 }
172 fn get_job_strategy(&self, job: &Job) -> Arc<dyn sync::JobStrategy> {
173 self.queue_strategies
174 .get(job.queue.as_str())
175 .map(|r| r.value().clone())
176 .unwrap_or(self.default_queue_strategy.clone())
177 }
178}
179
180async fn handle_strategy_error(
181 err: JobStrategyError,
182 job: &Job,
183 pool: &PgPool,
184 backoff_strategy: &BackoffStrategy,
185) {
186 match err {
187 JobStrategyError::CancelJob => {
188 handle_result(JobResult::Cancel.into(), job, pool, backoff_strategy).await
189 }
190 JobStrategyError::TryAfter(time_delta) => {
191 handle_result(
192 JobResult::RetryAt(chrono::Utc::now() + time_delta).into(),
193 job,
194 pool,
195 backoff_strategy,
196 )
197 .await
198 }
199 JobStrategyError::Retry => {
200 handle_result(JobResult::Failed.into(), job, pool, backoff_strategy).await
201 }
202 JobStrategyError::MarkCompleted => {
203 handle_result(JobResult::Success.into(), job, pool, backoff_strategy).await
204 }
205 }
206}
207
208async fn handle_result(
209 result: AnyJobResult,
210 job: &Job,
211 pool: &PgPool,
212 backoff_strategy: &BackoffStrategy,
213) -> () {
214 match result {
215 AnyJobResult::Internal(result) => {
216 handle_result_internal(result, job, pool, backoff_strategy).await
217 }
218 AnyJobResult::Public(result) => {
219 handle_result_public(result, job, pool, backoff_strategy).await
220 }
221 }
222}
223async fn handle_result_internal(
224 result: JobResultInternal,
225 job: &Job,
226 pool: &PgPool,
227 _backoff_strategy: &BackoffStrategy,
228) -> () {
229 match result {
230 JobResultInternal::BadJob => {
231 let _ = sqlx::query!(
233 "UPDATE job_queue SET status = $1 WHERE id = $2",
234 result.to_string(),
235 &job.id,
236 )
237 .execute(pool)
238 .await;
239 }
240 _ => {
241 tracing::error!("Unexpected internal status in job processing: {:?}", result)
242 }
243 }
244}
245async fn handle_result_public(
247 result: JobResult,
248 job: &Job,
249 pool: &PgPool,
250 backoff_strategy: &BackoffStrategy,
251) -> () {
252 use result::{JobResult, JobResultInternal};
253 let next_status_str = result.handle().to_string();
254 match result {
255 JobResult::InternalError => {
256 Box::pin(async move {
257 handle_result_public(JobResult::Failed, job, pool, backoff_strategy).await;
258 })
259 .await;
260 }
261 JobResult::Success => {
262 let _ = sqlx::query!(
263 "UPDATE job_queue SET status = $1 WHERE id = $2",
264 next_status_str,
265 job.id.clone(),
266 )
267 .execute(pool)
268 .await;
269 tracing::info!("[{}] Job {} succeeded", job.queue, job.id);
270 }
271 JobResult::Failed => {
272 let _ = sqlx::query!(
274 "UPDATE job_queue SET status = $1, run_at = $2 WHERE id = $3",
275 next_status_str,
276 backoff_strategy.next_attempt(job),
277 job.id.clone(),
278 )
279 .execute(pool)
280 .await;
281 tracing::info!("Job {} failed", job.id);
282 }
283 JobResult::RetryAt(run_at) => {
284 let _ = sqlx::query!(
285 "UPDATE job_queue SET status = $1, run_at = $2 WHERE id = $3",
286 next_status_str,
287 run_at,
288 job.id.clone()
289 )
290 .execute(pool)
291 .await;
292 }
293 JobResult::RescheduleAt(run_at) => {
294 let backoff = backoff_strategy.next_attempt(job);
296 let scheduled = if run_at < backoff { backoff } else { run_at };
297 let _ = sqlx::query!(
298 "UPDATE job_queue SET status = $1, run_at = $2, attempt = attempt - 1, reprocess_count = reprocess_count + 1 WHERE id = $3",
299 next_status_str, scheduled, job.id.clone()
300 ).execute(pool).await;
301 }
302 JobResult::Critical => {
303 let _ = update_job(pool, &job.id, JobResultInternal::Critical).await;
304 }
305 JobResult::HandlerMissing => {
306 let _ = update_job(pool, &job.id, JobResultInternal::Critical).await;
307 tracing::info!("Handler missing for job {}", job.id);
308 }
309 JobResult::Cancel => {
310 let _ = update_job(pool, &job.id, JobResultInternal::Cancelled).await;
311 tracing::info!("Job {} cancelled", job.id);
312 }
313 JobResult::Unprocessable => {
314 let _ = update_job(pool, &job.id, JobResultInternal::Unprocessable).await;
315 tracing::info!("Job {} unprocessable", job.id);
316 }
317 }
318}
319async fn update_job(
321 pool: &PgPool,
322 id: &uuid::Uuid,
323 result: result::JobResultInternal,
324) -> Result<(), sqlx::Error> {
325 sqlx::query!(
326 "UPDATE job_queue SET status = $1 WHERE id = $2",
327 result.to_string(),
328 id
329 )
330 .execute(pool)
331 .await?;
332 Ok(())
333}