Skip to main content

simple_queue/queue/
logic.rs

1use futures::FutureExt as _;
2use sqlx::PgPool;
3use tokio::select;
4use tokio::sync::Semaphore;
5use tracing::Instrument as _;
6
7use crate::heartbeat;
8use crate::prelude::*;
9#[cfg(feature = "wait-for-job")]
10use crate::queue::wait_for_job::get_waiting_guard;
11use crate::result::{self, AnyJobResult, JobResultInternal};
12use crate::sync::{self, BackoffStrategy, JobStrategyError};
13
14use std::panic::AssertUnwindSafe;
15use std::sync::Arc;
16// Logic implementation
17impl SimpleQueue {
18    #[tracing::instrument(skip(self))]
19    async fn fetch_next_job(&self) -> Result<Option<Job>, sqlx::Error> {
20        sqlx::query_as!(
21            Job,
22            r#"
23        UPDATE job_queue
24        SET
25            status = $2,
26            attempt = attempt + 1
27
28        WHERE id = (
29            SELECT id FROM job_queue
30            WHERE status = $1
31            AND (CURRENT_TIMESTAMP > run_at OR run_at IS NULL)
32            AND attempt < max_attempts
33            FOR UPDATE SKIP LOCKED
34            LIMIT 1
35        )
36        RETURNING *
37        "#,
38            result::JobResultInternal::Pending.to_string(),
39            result::JobResultInternal::Running.to_string(),
40        )
41        .fetch_optional(&self.pool)
42        .await
43    }
44    fn get_queue_semaphore(&self, queue: String) -> Arc<Semaphore> {
45        let entry = self
46            .queue_semaphores
47            .entry(queue)
48            .or_insert_with(|| Arc::new(Semaphore::new(self.queue_sem_count)));
49        let semaphore = entry.value();
50        semaphore.clone()
51    }
52    /// Poll the queue for the next job to run (in loop). If no jobs are found, sleep for
53    /// `empty_poll_sleep` before retrying.
54    ///
55    /// Passed `start_permit` is used to guarantee that worker started.
56    /// Mostly helpful with tests, as checks might fail if queue was still warming up.
57    pub async fn run(
58        self: Arc<Self>,
59        start_permit: Option<tokio::sync::OwnedSemaphorePermit>,
60    ) -> Result<(), BoxDynError> {
61        drop(start_permit);
62        loop {
63            let _span = tracing::info_span!("poll");
64
65            let _global_permit = self.global_semaphore.clone().acquire_owned().await?;
66            let job = self.fetch_next_job().await?;
67            if let Some(job) = job {
68                let _job_span = tracing::info_span!("job", id = %job.id, queue = %job.queue);
69                #[cfg(feature = "wait-for-job")]
70                let _wait_guard = get_waiting_guard(job.id);
71                let _heartbeat = heartbeat::Heartbeat::start(
72                    self.pool.clone(),
73                    &job.id,
74                    self.heartbeat_interval,
75                );
76
77                let result = select! {
78                    sem_result = self.get_queue_semaphore(job.queue.clone())
79                        .acquire_owned() => sem_result.map_err(|_| tokio::sync::TryAcquireError::Closed),
80                    _ = tokio::time::sleep(self.hold_queue_semaphore) => Err(tokio::sync::TryAcquireError::NoPermits),
81                };
82                let Ok(_queue_permit) = result else {
83                    tracing::warn!(
84                        "Job queue semaphore acquire failed for queue: {}",
85                        job.queue
86                    );
87                    match self.release_job(&job.id).await {
88                        Ok(_) => {}
89                        Err(e) => {
90                            tracing::error!("Failed to release {:?}: {}", job, e);
91                            return Err(e.into());
92                        }
93                    }
94                    continue;
95                };
96                let q = Arc::clone(&self);
97                let job = Arc::new(job);
98                tokio::spawn(async move {
99                    if job.reprocess_count >= q.max_reprocess_count as i32 {
100                        handle_result(
101                            AnyJobResult::Internal(JobResultInternal::BadJob),
102                            &job,
103                            &q.pool,
104                            &q.get_backoff_strategy(&job),
105                        ).await;
106                        return;
107                    }
108                    let permit_result = q.get_job_strategy(&job).acquire(&job).await;
109                    let backoff_strategy = q.get_backoff_strategy(&job);
110
111                    if let Err(permit_err) = permit_result {
112                        handle_strategy_error(permit_err, &job, &q.pool, &backoff_strategy).await;
113                        return;
114                    };
115                    let _permit = permit_result.unwrap();
116                    let q_name: String = job.queue.clone();
117                    let result = if let Some(handler) = q.job_registry.get(q_name.as_str()) {
118
119                        let wrapped_result = AssertUnwindSafe(
120                            handler.process_dyn(&q, &job)
121                                .instrument(tracing::info_span!("process_job", job_id = %&job.id, queue = %&job.queue, attempt = %&job.attempt, max_attempts = %&job.max_attempts, run_at = ?&job.run_at)))
122                        .catch_unwind().await;
123
124                        match wrapped_result {
125                            Ok(Ok(process)) => process,
126                            Ok(Err(e)) => {
127                                tracing::error!("Handler returned error: {:?}", e);
128                                JobResult::InternalError
129                            }
130                            Err(_) => {
131                                tracing::error!("Handler panicked or returned error: {}", &job.id);
132                                JobResult::InternalError
133                            }
134                        }
135                    } else {
136                        tracing::warn!("Missing handler for: {:?}", (job.queue).clone().as_str());
137                        JobResult::HandlerMissing
138                    };
139
140                    handle_result(AnyJobResult::Public(result), &job, &q.pool, &backoff_strategy).await;
141                    drop(_permit);
142                    drop(_queue_permit);
143                    drop(_global_permit);
144                    drop(_heartbeat);
145                    #[cfg(feature = "wait-for-job")]
146                    drop(_wait_guard);
147                }.instrument(_job_span));
148            } else {
149                tokio::time::sleep(self.empty_poll_sleep).await;
150            }
151        }
152    }
153
154    #[tracing::instrument(skip(self))]
155    async fn release_job(&self, id: &uuid::Uuid) -> Result<(), sqlx::Error> {
156        sqlx::query!(
157            "UPDATE job_queue SET status = $1 WHERE id = $2",
158            result::JobResultInternal::Pending.to_string(),
159            id
160        )
161        .execute(&self.pool)
162        .await?;
163        Ok(())
164    }
165
166    fn get_backoff_strategy(&self, job: &Job) -> BackoffStrategy {
167        self.queue_backoff_strategies
168            .get(job.queue.as_str())
169            .map(|r| r.value().clone())
170            .unwrap_or(self.default_backoff_strategy.clone())
171    }
172    fn get_job_strategy(&self, job: &Job) -> Arc<dyn sync::JobStrategy> {
173        self.queue_strategies
174            .get(job.queue.as_str())
175            .map(|r| r.value().clone())
176            .unwrap_or(self.default_queue_strategy.clone())
177    }
178}
179
180async fn handle_strategy_error(
181    err: JobStrategyError,
182    job: &Job,
183    pool: &PgPool,
184    backoff_strategy: &BackoffStrategy,
185) {
186    match err {
187        JobStrategyError::CancelJob => {
188            handle_result(JobResult::Cancel.into(), job, pool, backoff_strategy).await
189        }
190        JobStrategyError::TryAfter(time_delta) => {
191            handle_result(
192                JobResult::RetryAt(chrono::Utc::now() + time_delta).into(),
193                job,
194                pool,
195                backoff_strategy,
196            )
197            .await
198        }
199        JobStrategyError::Retry => {
200            handle_result(JobResult::Failed.into(), job, pool, backoff_strategy).await
201        }
202        JobStrategyError::MarkCompleted => {
203            handle_result(JobResult::Success.into(), job, pool, backoff_strategy).await
204        }
205    }
206}
207
208async fn handle_result(
209    result: AnyJobResult,
210    job: &Job,
211    pool: &PgPool,
212    backoff_strategy: &BackoffStrategy,
213) -> () {
214    match result {
215        AnyJobResult::Internal(result) => {
216            handle_result_internal(result, job, pool, backoff_strategy).await
217        }
218        AnyJobResult::Public(result) => {
219            handle_result_public(result, job, pool, backoff_strategy).await
220        }
221    }
222}
223async fn handle_result_internal(
224    result: JobResultInternal,
225    job: &Job,
226    pool: &PgPool,
227    _backoff_strategy: &BackoffStrategy,
228) -> () {
229    match result {
230        JobResultInternal::BadJob => {
231            // Rest of the variants aren't supported right now, as they should be processed by public result handling
232            let _ = sqlx::query!(
233                "UPDATE job_queue SET status = $1 WHERE id = $2",
234                result.to_string(),
235                &job.id,
236            )
237            .execute(pool)
238            .await;
239        }
240        _ => {
241            tracing::error!("Unexpected internal status in job processing: {:?}", result)
242        }
243    }
244}
245/// Handles JobResult results
246async fn handle_result_public(
247    result: JobResult,
248    job: &Job,
249    pool: &PgPool,
250    backoff_strategy: &BackoffStrategy,
251) -> () {
252    use result::{JobResult, JobResultInternal};
253    let next_status_str = result.handle().to_string();
254    match result {
255        JobResult::InternalError => {
256            Box::pin(async move {
257                handle_result_public(JobResult::Failed, job, pool, backoff_strategy).await;
258            })
259            .await;
260        }
261        JobResult::Success => {
262            let _ = sqlx::query!(
263                "UPDATE job_queue SET status = $1 WHERE id = $2",
264                next_status_str,
265                job.id.clone(),
266            )
267            .execute(pool)
268            .await;
269            tracing::info!("[{}] Job {} succeeded", job.queue, job.id);
270        }
271        JobResult::Failed => {
272            // TODO: Add RetryAt
273            let _ = sqlx::query!(
274                "UPDATE job_queue SET status = $1, run_at = $2 WHERE id = $3",
275                next_status_str,
276                backoff_strategy.next_attempt(job),
277                job.id.clone(),
278            )
279            .execute(pool)
280            .await;
281            tracing::info!("Job {} failed", job.id);
282        }
283        JobResult::RetryAt(run_at) => {
284            let _ = sqlx::query!(
285                "UPDATE job_queue SET status = $1, run_at = $2 WHERE id = $3",
286                next_status_str,
287                run_at,
288                job.id.clone()
289            )
290            .execute(pool)
291            .await;
292        }
293        JobResult::RescheduleAt(run_at) => {
294            // run_at cannot be closer than backoff
295            let backoff = backoff_strategy.next_attempt(job);
296            let scheduled = if run_at < backoff { backoff } else { run_at };
297            let _ = sqlx::query!(
298                "UPDATE job_queue SET status = $1, run_at = $2, attempt = attempt - 1, reprocess_count = reprocess_count + 1 WHERE id = $3",
299                next_status_str, scheduled, job.id.clone()
300            ).execute(pool).await;
301        }
302        JobResult::Critical => {
303            let _ = update_job(pool, &job.id, JobResultInternal::Critical).await;
304        }
305        JobResult::HandlerMissing => {
306            let _ = update_job(pool, &job.id, JobResultInternal::Critical).await;
307            tracing::info!("Handler missing for job {}", job.id);
308        }
309        JobResult::Cancel => {
310            let _ = update_job(pool, &job.id, JobResultInternal::Cancelled).await;
311            tracing::info!("Job {} cancelled", job.id);
312        }
313        JobResult::Unprocessable => {
314            let _ = update_job(pool, &job.id, JobResultInternal::Unprocessable).await;
315            tracing::info!("Job {} unprocessable", job.id);
316        }
317    }
318}
319/// Helper: updates a job's status in the database.
320async fn update_job(
321    pool: &PgPool,
322    id: &uuid::Uuid,
323    result: result::JobResultInternal,
324) -> Result<(), sqlx::Error> {
325    sqlx::query!(
326        "UPDATE job_queue SET status = $1 WHERE id = $2",
327        result.to_string(),
328        id
329    )
330    .execute(pool)
331    .await?;
332    Ok(())
333}