1use futures::FutureExt as _;
2use sqlx::PgPool;
3use tokio::select;
4use tokio::sync::Semaphore;
5use tracing::Instrument as _;
6
7use crate::heartbeat;
8use crate::prelude::*;
9#[cfg(feature = "wait-for-job")]
10use crate::queue::wait_for_job::get_waiting_guard;
11use crate::result::{self, AnyJobResult, JobResultInternal};
12use crate::sync::{self, BackoffStrategy, JobStrategyError};
13
14use std::panic::AssertUnwindSafe;
15use std::sync::Arc;
16impl SimpleQueue {
18 #[tracing::instrument(skip(self))]
19 async fn fetch_next_job(&self) -> Result<Option<Job>, sqlx::Error> {
20 sqlx::query_as!(
21 Job,
22 r#"
23 UPDATE job_queue
24 SET
25 status = $2,
26 attempt = attempt + 1
27
28 WHERE id = (
29 SELECT id FROM job_queue
30 WHERE status = $1
31 AND (NOW() > run_at OR run_at IS NULL)
32 AND attempt < max_attempts
33 FOR UPDATE SKIP LOCKED
34 LIMIT 1
35 )
36 RETURNING *
37 "#,
38 result::JobResultInternal::Pending.to_string(),
39 result::JobResultInternal::Running.to_string(),
40 )
41 .fetch_optional(&self.pool)
42 .await
43 }
44 fn get_queue_semaphore(&self, queue: String) -> Arc<Semaphore> {
45 let entry = self
46 .queue_semaphores
47 .entry(queue)
48 .or_insert_with(|| Arc::new(Semaphore::new(self.queue_sem_count)));
49 let semaphore = entry.value();
50 semaphore.clone()
51 }
52 pub async fn run(
58 self: Arc<Self>,
59 start_permit: Option<tokio::sync::OwnedSemaphorePermit>,
60 ) -> Result<(), BoxDynError> {
61 drop(start_permit);
62 tracing::info!("starting job loop");
63 loop {
64 let _span = tracing::info_span!("poll");
65
66 let _global_permit = self.global_semaphore.clone().acquire_owned().await?;
67 let job = self
68 .fetch_next_job()
69 .await
70 .inspect_err(|e| tracing::error!("Polling error: {}", e))?;
71 tracing::info!("got job");
72 if let Some(job) = job {
73 let _job_span = tracing::info_span!("job", id = %job.id, queue = %job.queue);
74 #[cfg(feature = "wait-for-job")]
75 let _wait_guard = get_waiting_guard(job.id);
76 let _heartbeat = heartbeat::Heartbeat::start(
77 self.pool.clone(),
78 &job.id,
79 self.heartbeat_interval,
80 );
81
82 let result = select! {
83 sem_result = self.get_queue_semaphore(job.queue.clone())
84 .acquire_owned() => sem_result.map_err(|_| tokio::sync::TryAcquireError::Closed),
85 _ = tokio::time::sleep(self.hold_queue_semaphore) => Err(tokio::sync::TryAcquireError::NoPermits),
86 };
87 let Ok(_queue_permit) = result else {
88 tracing::warn!(
89 "Job queue semaphore acquire failed for queue: {}",
90 job.queue
91 );
92 match self.release_job(&job.id).await {
93 Ok(_) => {}
94 Err(e) => {
95 tracing::error!("Failed to release {:?}: {}", job, e);
96 return Err(e.into());
97 }
98 }
99 continue;
100 };
101 let q = Arc::clone(&self);
102 let job = Arc::new(job);
103 tokio::spawn(async move {
104 if job.reprocess_count >= q.max_reprocess_count as i32 {
105 handle_result(
106 AnyJobResult::Internal(JobResultInternal::BadJob),
107 &job,
108 &q.pool,
109 &q.get_backoff_strategy(&job),
110 ).await;
111 return;
112 }
113 let permit_result = q.get_job_strategy(&job).acquire(&job).await;
114 let backoff_strategy = q.get_backoff_strategy(&job);
115
116 if let Err(permit_err) = permit_result {
117 handle_strategy_error(permit_err, &job, &q.pool, &backoff_strategy).await;
118 return;
119 };
120 let _permit = permit_result.unwrap();
121 let q_name: String = job.queue.clone();
122 let result = if let Some(handler) = q.job_registry.get(q_name.as_str()) {
123
124 let wrapped_result = AssertUnwindSafe(
125 handler.process_dyn(&q, &job)
126 .instrument(tracing::info_span!("process_job", job_id = %&job.id, queue = %&job.queue, attempt = %&job.attempt, max_attempts = %&job.max_attempts, run_at = ?&job.run_at)))
127 .catch_unwind().await;
128
129 match wrapped_result {
130 Ok(Ok(process)) => process,
131 Ok(Err(e)) => {
132 tracing::error!("Handler returned error: {:?}", e);
133 JobResult::InternalError
134 }
135 Err(_) => {
136 tracing::error!("Handler panicked or returned error: {}", &job.id);
137 JobResult::InternalError
138 }
139 }
140 } else {
141 tracing::warn!("Missing handler for: {:?}", (job.queue).clone().as_str());
142 JobResult::HandlerMissing
143 };
144
145 handle_result(AnyJobResult::Public(result), &job, &q.pool, &backoff_strategy).await;
146 drop(_permit);
147 drop(_queue_permit);
148 drop(_global_permit);
149 drop(_heartbeat);
150 #[cfg(feature = "wait-for-job")]
151 drop(_wait_guard);
152 }.instrument(_job_span));
153 } else {
154 tokio::time::sleep(self.empty_poll_sleep).await;
155 }
156 }
157 }
158
159 #[tracing::instrument(skip(self))]
160 async fn release_job(&self, id: &uuid::Uuid) -> Result<(), sqlx::Error> {
161 sqlx::query!(
162 "UPDATE job_queue SET status = $1 WHERE id = $2",
163 result::JobResultInternal::Pending.to_string(),
164 id
165 )
166 .execute(&self.pool)
167 .await?;
168 Ok(())
169 }
170
171 fn get_backoff_strategy(&self, job: &Job) -> BackoffStrategy {
172 self.queue_backoff_strategies
173 .get(job.queue.as_str())
174 .map(|r| r.value().clone())
175 .unwrap_or(self.default_backoff_strategy.clone())
176 }
177 fn get_job_strategy(&self, job: &Job) -> Arc<dyn sync::JobStrategy> {
178 self.queue_strategies
179 .get(job.queue.as_str())
180 .map(|r| r.value().clone())
181 .unwrap_or(self.default_queue_strategy.clone())
182 }
183}
184
185async fn handle_strategy_error(
186 err: JobStrategyError,
187 job: &Job,
188 pool: &PgPool,
189 backoff_strategy: &BackoffStrategy,
190) {
191 match err {
192 JobStrategyError::CancelJob => {
193 handle_result(JobResult::Cancel.into(), job, pool, backoff_strategy).await
194 }
195 JobStrategyError::TryAfter(time_delta) => {
196 handle_result(
197 JobResult::RetryAt(chrono::Utc::now() + time_delta).into(),
198 job,
199 pool,
200 backoff_strategy,
201 )
202 .await
203 }
204 JobStrategyError::Retry => {
205 handle_result(JobResult::Failed.into(), job, pool, backoff_strategy).await
206 }
207 JobStrategyError::MarkCompleted => {
208 handle_result(JobResult::Success.into(), job, pool, backoff_strategy).await
209 }
210 }
211}
212
213async fn handle_result(
214 result: AnyJobResult,
215 job: &Job,
216 pool: &PgPool,
217 backoff_strategy: &BackoffStrategy,
218) -> () {
219 match result {
220 AnyJobResult::Internal(result) => {
221 handle_result_internal(result, job, pool, backoff_strategy).await
222 }
223 AnyJobResult::Public(result) => {
224 handle_result_public(result, job, pool, backoff_strategy).await
225 }
226 }
227}
228async fn handle_result_internal(
229 result: JobResultInternal,
230 job: &Job,
231 pool: &PgPool,
232 _backoff_strategy: &BackoffStrategy,
233) -> () {
234 match result {
235 JobResultInternal::BadJob => {
236 let _ = sqlx::query!(
238 "UPDATE job_queue SET status = $1 WHERE id = $2",
239 result.to_string(),
240 &job.id,
241 )
242 .execute(pool)
243 .await;
244 }
245 _ => {
246 tracing::error!("Unexpected internal status in job processing: {:?}", result)
247 }
248 }
249}
250async fn handle_result_public(
252 result: JobResult,
253 job: &Job,
254 pool: &PgPool,
255 backoff_strategy: &BackoffStrategy,
256) -> () {
257 use result::{JobResult, JobResultInternal};
258 let next_status_str = result.handle().to_string();
259 match result {
260 JobResult::InternalError => {
261 Box::pin(async move {
262 handle_result_public(JobResult::Failed, job, pool, backoff_strategy).await;
263 })
264 .await;
265 }
266 JobResult::Success => {
267 let result = sqlx::query!(
268 "UPDATE job_queue SET status = $1, completed_at = NOW() WHERE id = $2",
269 next_status_str,
270 job.id.clone(),
271 )
272 .execute(pool)
273 .await;
274 if let Err(err) = result {
275 tracing::error!("[{}] Job {} insertion failed: {}", job.queue, job.id, err);
276 } else {
277 tracing::info!("[{}] Job {} succeeded", job.queue, job.id);
278 }
279 }
280 JobResult::Failed => {
281 let _ = sqlx::query!(
283 "UPDATE job_queue SET status = $1, run_at = $2 WHERE id = $3",
284 next_status_str,
285 backoff_strategy.next_attempt(job),
286 job.id.clone(),
287 )
288 .execute(pool)
289 .await;
290 tracing::info!("Job {} failed", job.id);
291 }
292 JobResult::RetryAt(run_at) => {
293 let _ = sqlx::query!(
294 "UPDATE job_queue SET status = $1, run_at = $2 WHERE id = $3",
295 next_status_str,
296 run_at,
297 job.id.clone()
298 )
299 .execute(pool)
300 .await;
301 }
302 JobResult::RescheduleAt(run_at) => {
303 let backoff = backoff_strategy.next_attempt(job);
305 let scheduled = if run_at < backoff { backoff } else { run_at };
306 let _ = sqlx::query!(
307 "UPDATE job_queue SET status = $1, run_at = $2, attempt = attempt - 1, reprocess_count = reprocess_count + 1 WHERE id = $3",
308 next_status_str, scheduled, job.id.clone()
309 ).execute(pool).await;
310 }
311 JobResult::Critical => {
312 let _ = update_job(pool, &job.id, JobResultInternal::Critical).await;
313 }
314 JobResult::HandlerMissing => {
315 let _ = update_job(pool, &job.id, JobResultInternal::Critical).await;
316 tracing::info!("Handler missing for job {}", job.id);
317 }
318 JobResult::Cancel => {
319 let _ = update_job(pool, &job.id, JobResultInternal::Cancelled).await;
320 tracing::info!("Job {} cancelled", job.id);
321 }
322 JobResult::Unprocessable => {
323 let _ = update_job(pool, &job.id, JobResultInternal::Unprocessable).await;
324 tracing::info!("Job {} unprocessable", job.id);
325 }
326 }
327}
328async fn update_job(
330 pool: &PgPool,
331 id: &uuid::Uuid,
332 result: result::JobResultInternal,
333) -> Result<(), sqlx::Error> {
334 sqlx::query!(
335 "UPDATE job_queue SET status = $1, completed_at = NOW() WHERE id = $2",
336 result.to_string(),
337 id
338 )
339 .execute(pool)
340 .await?;
341 Ok(())
342}