Skip to main content

simple_queue/queue/
logic.rs

1use futures::FutureExt as _;
2use sqlx::PgPool;
3use tokio::select;
4use tokio::sync::Semaphore;
5use tracing::Instrument as _;
6
7use crate::heartbeat;
8use crate::prelude::*;
9#[cfg(feature = "wait-for-job")]
10use crate::queue::wait_for_job::get_waiting_guard;
11use crate::result::{self, AnyJobResult, JobResultInternal};
12use crate::sync::{self, BackoffStrategy, JobStrategyError};
13
14use std::panic::AssertUnwindSafe;
15use std::sync::Arc;
16// Logic implementation
17impl SimpleQueue {
18    #[tracing::instrument(skip(self))]
19    async fn fetch_next_job(&self) -> Result<Option<Job>, sqlx::Error> {
20        sqlx::query_as!(
21            Job,
22            r#"
23        UPDATE job_queue
24        SET
25            status = $2,
26            attempt = attempt + 1
27
28        WHERE id = (
29            SELECT id FROM job_queue
30            WHERE status = $1
31            AND (NOW() > run_at OR run_at IS NULL)
32            AND attempt < max_attempts
33            FOR UPDATE SKIP LOCKED
34            LIMIT 1
35        )
36        RETURNING *
37        "#,
38            result::JobResultInternal::Pending.to_string(),
39            result::JobResultInternal::Running.to_string(),
40        )
41        .fetch_optional(&self.pool)
42        .await
43    }
44    fn get_queue_semaphore(&self, queue: String) -> Arc<Semaphore> {
45        let entry = self
46            .queue_semaphores
47            .entry(queue)
48            .or_insert_with(|| Arc::new(Semaphore::new(self.queue_sem_count)));
49        let semaphore = entry.value();
50        semaphore.clone()
51    }
52    /// Poll the queue for the next job to run (in loop). If no jobs are found, sleep for
53    /// `empty_poll_sleep` before retrying.
54    ///
55    /// Passed `start_permit` is used to guarantee that worker started.
56    /// Mostly helpful with tests, as checks might fail if queue was still warming up.
57    pub async fn run(
58        self: Arc<Self>,
59        start_permit: Option<tokio::sync::OwnedSemaphorePermit>,
60    ) -> Result<(), BoxDynError> {
61        drop(start_permit);
62        tracing::info!("starting job loop");
63        loop {
64            let _span = tracing::info_span!("poll");
65
66            let _global_permit = self.global_semaphore.clone().acquire_owned().await?;
67            let job = self
68                .fetch_next_job()
69                .await
70                .inspect_err(|e| tracing::error!("Polling error: {}", e))?;
71            tracing::info!("got job");
72            if let Some(job) = job {
73                let _job_span = tracing::info_span!("job", id = %job.id, queue = %job.queue);
74                #[cfg(feature = "wait-for-job")]
75                let _wait_guard = get_waiting_guard(job.id);
76                let _heartbeat = heartbeat::Heartbeat::start(
77                    self.pool.clone(),
78                    &job.id,
79                    self.heartbeat_interval,
80                );
81
82                let result = select! {
83                    sem_result = self.get_queue_semaphore(job.queue.clone())
84                        .acquire_owned() => sem_result.map_err(|_| tokio::sync::TryAcquireError::Closed),
85                    _ = tokio::time::sleep(self.hold_queue_semaphore) => Err(tokio::sync::TryAcquireError::NoPermits),
86                };
87                let Ok(_queue_permit) = result else {
88                    tracing::warn!(
89                        "Job queue semaphore acquire failed for queue: {}",
90                        job.queue
91                    );
92                    match self.release_job(&job.id).await {
93                        Ok(_) => {}
94                        Err(e) => {
95                            tracing::error!("Failed to release {:?}: {}", job, e);
96                            return Err(e.into());
97                        }
98                    }
99                    continue;
100                };
101                let q = Arc::clone(&self);
102                let job = Arc::new(job);
103                tokio::spawn(async move {
104                    if job.reprocess_count >= q.max_reprocess_count as i32 {
105                        handle_result(
106                            AnyJobResult::Internal(JobResultInternal::BadJob),
107                            &job,
108                            &q.pool,
109                            &q.get_backoff_strategy(&job),
110                        ).await;
111                        return;
112                    }
113                    let permit_result = q.get_job_strategy(&job).acquire(&job).await;
114                    let backoff_strategy = q.get_backoff_strategy(&job);
115
116                    if let Err(permit_err) = permit_result {
117                        handle_strategy_error(permit_err, &job, &q.pool, &backoff_strategy).await;
118                        return;
119                    };
120                    let _permit = permit_result.unwrap();
121                    let q_name: String = job.queue.clone();
122                    let result = if let Some(handler) = q.job_registry.get(q_name.as_str()) {
123
124                        let wrapped_result = AssertUnwindSafe(
125                            handler.process_dyn(&q, &job)
126                                .instrument(tracing::info_span!("process_job", job_id = %&job.id, queue = %&job.queue, attempt = %&job.attempt, max_attempts = %&job.max_attempts, run_at = ?&job.run_at)))
127                        .catch_unwind().await;
128
129                        match wrapped_result {
130                            Ok(Ok(process)) => process,
131                            Ok(Err(e)) => {
132                                tracing::error!("Handler returned error: {:?}", e);
133                                JobResult::InternalError
134                            }
135                            Err(_) => {
136                                tracing::error!("Handler panicked or returned error: {}", &job.id);
137                                JobResult::InternalError
138                            }
139                        }
140                    } else {
141                        tracing::warn!("Missing handler for: {:?}", (job.queue).clone().as_str());
142                        JobResult::HandlerMissing
143                    };
144
145                    handle_result(AnyJobResult::Public(result), &job, &q.pool, &backoff_strategy).await;
146                    drop(_permit);
147                    drop(_queue_permit);
148                    drop(_global_permit);
149                    drop(_heartbeat);
150                    #[cfg(feature = "wait-for-job")]
151                    drop(_wait_guard);
152                }.instrument(_job_span));
153            } else {
154                tokio::time::sleep(self.empty_poll_sleep).await;
155            }
156        }
157    }
158
159    #[tracing::instrument(skip(self))]
160    async fn release_job(&self, id: &uuid::Uuid) -> Result<(), sqlx::Error> {
161        sqlx::query!(
162            "UPDATE job_queue SET status = $1 WHERE id = $2",
163            result::JobResultInternal::Pending.to_string(),
164            id
165        )
166        .execute(&self.pool)
167        .await?;
168        Ok(())
169    }
170
171    fn get_backoff_strategy(&self, job: &Job) -> BackoffStrategy {
172        self.queue_backoff_strategies
173            .get(job.queue.as_str())
174            .map(|r| r.value().clone())
175            .unwrap_or(self.default_backoff_strategy.clone())
176    }
177    fn get_job_strategy(&self, job: &Job) -> Arc<dyn sync::JobStrategy> {
178        self.queue_strategies
179            .get(job.queue.as_str())
180            .map(|r| r.value().clone())
181            .unwrap_or(self.default_queue_strategy.clone())
182    }
183}
184
185async fn handle_strategy_error(
186    err: JobStrategyError,
187    job: &Job,
188    pool: &PgPool,
189    backoff_strategy: &BackoffStrategy,
190) {
191    match err {
192        JobStrategyError::CancelJob => {
193            handle_result(JobResult::Cancel.into(), job, pool, backoff_strategy).await
194        }
195        JobStrategyError::TryAfter(time_delta) => {
196            handle_result(
197                JobResult::RetryAt(chrono::Utc::now() + time_delta).into(),
198                job,
199                pool,
200                backoff_strategy,
201            )
202            .await
203        }
204        JobStrategyError::Retry => {
205            handle_result(JobResult::Failed.into(), job, pool, backoff_strategy).await
206        }
207        JobStrategyError::MarkCompleted => {
208            handle_result(JobResult::Success.into(), job, pool, backoff_strategy).await
209        }
210    }
211}
212
213async fn handle_result(
214    result: AnyJobResult,
215    job: &Job,
216    pool: &PgPool,
217    backoff_strategy: &BackoffStrategy,
218) -> () {
219    match result {
220        AnyJobResult::Internal(result) => {
221            handle_result_internal(result, job, pool, backoff_strategy).await
222        }
223        AnyJobResult::Public(result) => {
224            handle_result_public(result, job, pool, backoff_strategy).await
225        }
226    }
227}
228async fn handle_result_internal(
229    result: JobResultInternal,
230    job: &Job,
231    pool: &PgPool,
232    _backoff_strategy: &BackoffStrategy,
233) -> () {
234    match result {
235        JobResultInternal::BadJob => {
236            // Rest of the variants aren't supported right now, as they should be processed by public result handling
237            let _ = sqlx::query!(
238                "UPDATE job_queue SET status = $1 WHERE id = $2",
239                result.to_string(),
240                &job.id,
241            )
242            .execute(pool)
243            .await;
244        }
245        _ => {
246            tracing::error!("Unexpected internal status in job processing: {:?}", result)
247        }
248    }
249}
250/// Handles JobResult results
251async fn handle_result_public(
252    result: JobResult,
253    job: &Job,
254    pool: &PgPool,
255    backoff_strategy: &BackoffStrategy,
256) -> () {
257    use result::{JobResult, JobResultInternal};
258    let next_status_str = result.handle().to_string();
259    match result {
260        JobResult::InternalError => {
261            Box::pin(async move {
262                handle_result_public(JobResult::Failed, job, pool, backoff_strategy).await;
263            })
264            .await;
265        }
266        JobResult::Success => {
267            let result = sqlx::query!(
268                "UPDATE job_queue SET status = $1, completed_at = NOW() WHERE id = $2",
269                next_status_str,
270                job.id.clone(),
271            )
272            .execute(pool)
273            .await;
274            if let Err(err) = result {
275                tracing::error!("[{}] Job {} insertion failed: {}", job.queue, job.id, err);
276            } else {
277                tracing::info!("[{}] Job {} succeeded", job.queue, job.id);
278            }
279        }
280        JobResult::Failed => {
281            // TODO: Add RetryAt
282            let _ = sqlx::query!(
283                "UPDATE job_queue SET status = $1, run_at = $2 WHERE id = $3",
284                next_status_str,
285                backoff_strategy.next_attempt(job),
286                job.id.clone(),
287            )
288            .execute(pool)
289            .await;
290            tracing::info!("Job {} failed", job.id);
291        }
292        JobResult::RetryAt(run_at) => {
293            let _ = sqlx::query!(
294                "UPDATE job_queue SET status = $1, run_at = $2 WHERE id = $3",
295                next_status_str,
296                run_at,
297                job.id.clone()
298            )
299            .execute(pool)
300            .await;
301        }
302        JobResult::RescheduleAt(run_at) => {
303            // run_at cannot be closer than backoff
304            let backoff = backoff_strategy.next_attempt(job);
305            let scheduled = if run_at < backoff { backoff } else { run_at };
306            let _ = sqlx::query!(
307                "UPDATE job_queue SET status = $1, run_at = $2, attempt = attempt - 1, reprocess_count = reprocess_count + 1 WHERE id = $3",
308                next_status_str, scheduled, job.id.clone()
309            ).execute(pool).await;
310        }
311        JobResult::Critical => {
312            let _ = update_job(pool, &job.id, JobResultInternal::Critical).await;
313        }
314        JobResult::HandlerMissing => {
315            let _ = update_job(pool, &job.id, JobResultInternal::Critical).await;
316            tracing::info!("Handler missing for job {}", job.id);
317        }
318        JobResult::Cancel => {
319            let _ = update_job(pool, &job.id, JobResultInternal::Cancelled).await;
320            tracing::info!("Job {} cancelled", job.id);
321        }
322        JobResult::Unprocessable => {
323            let _ = update_job(pool, &job.id, JobResultInternal::Unprocessable).await;
324            tracing::info!("Job {} unprocessable", job.id);
325        }
326    }
327}
328/// Helper: updates a job's status in the database - sets completed_At
329async fn update_job(
330    pool: &PgPool,
331    id: &uuid::Uuid,
332    result: result::JobResultInternal,
333) -> Result<(), sqlx::Error> {
334    sqlx::query!(
335        "UPDATE job_queue SET status = $1, completed_at = NOW() WHERE id = $2",
336        result.to_string(),
337        id
338    )
339    .execute(pool)
340    .await?;
341    Ok(())
342}