Skip to main content

force_sync/store/pg/
task_queue.rs

1//! Task queue helpers for the `PostgreSQL` sync store.
2
3use std::convert::TryFrom;
4use std::time::Duration;
5
6use chrono::{DateTime, Utc};
7use tokio_postgres::GenericClient;
8
9use crate::error::ForceSyncError;
10
11use super::PgStore;
12
13/// A leased task returned from the task queue.
14#[derive(Debug, Clone, PartialEq, Eq)]
15pub struct LeasedTask {
16    /// The leased task ID.
17    pub task_id: i64,
18    /// The worker that owns the lease.
19    pub lease_owner: String,
20    /// When the lease expires.
21    pub lease_until: DateTime<Utc>,
22}
23
24struct LeaseDeadline {
25    lease_until: DateTime<Utc>,
26}
27
28fn compute_lease_deadline(lease_for: Duration) -> Result<LeaseDeadline, ForceSyncError> {
29    let secs =
30        i64::try_from(lease_for.as_secs()).map_err(|_| ForceSyncError::InvalidLeaseDuration)?;
31    let nanos = i64::from(lease_for.subsec_nanos());
32
33    let duration_secs =
34        chrono::Duration::try_seconds(secs).ok_or(ForceSyncError::InvalidLeaseDuration)?;
35    let duration_nanos = chrono::Duration::nanoseconds(nanos);
36
37    let duration = duration_secs
38        .checked_add(&duration_nanos)
39        .ok_or(ForceSyncError::InvalidLeaseDuration)?;
40
41    let lease_until = Utc::now()
42        .checked_add_signed(duration)
43        .ok_or(ForceSyncError::InvalidLeaseDuration)?;
44
45    Ok(LeaseDeadline { lease_until })
46}
47
48async fn enqueue_apply_task_query<C>(
49    client: &C,
50    journal_id: i64,
51    priority: i32,
52) -> Result<i64, ForceSyncError>
53where
54    C: GenericClient + Sync + ?Sized,
55{
56    let journal_key = journal_id.to_string();
57    let row = client
58        .query_one(
59            "insert into sync_task (status, task_kind, target_key, priority, payload)
60             values ('ready', 'apply', $1::text, $2, jsonb_build_object('journal_id', $1))
61             returning task_id",
62            &[&journal_key, &priority],
63        )
64        .await?;
65
66    Ok(row.get(0))
67}
68
69async fn lease_ready_tasks_query<C>(
70    client: &C,
71    worker_id: &str,
72    limit: i64,
73    lease_until: &LeaseDeadline,
74) -> Result<Vec<LeasedTask>, ForceSyncError>
75where
76    C: GenericClient + Sync + ?Sized,
77{
78    let rows = client
79        .query(
80            "with claimed as (
81                select task_id
82                from sync_task
83                where status = 'ready'
84                  and next_attempt_at <= now()
85                  and (lease_until is null or lease_until <= now())
86                order by priority desc, next_attempt_at asc, task_id asc
87                for update skip locked
88                limit $2
89            )
90            update sync_task
91            set status = 'leased',
92                lease_owner = $1,
93                lease_until = $3::timestamptz,
94                attempt_count = attempt_count + 1,
95                updated_at = now()
96            from claimed
97            where sync_task.task_id = claimed.task_id
98            returning sync_task.task_id",
99            &[&worker_id, &limit, &lease_until.lease_until],
100        )
101        .await?;
102
103    Ok(rows
104        .into_iter()
105        .map(|row| LeasedTask {
106            task_id: row.get(0),
107            lease_owner: worker_id.to_owned(),
108            lease_until: lease_until.lease_until,
109        })
110        .collect())
111}
112
113async fn update_task_status_unguarded<C>(
114    client: &C,
115    task_id: i64,
116    status: &str,
117    last_error: Option<&str>,
118    next_attempt_at: Option<DateTime<Utc>>,
119) -> Result<u64, ForceSyncError>
120where
121    C: GenericClient + Sync + ?Sized,
122{
123    Ok(client
124        .execute(
125            "update sync_task
126                 set status = $2,
127                     last_error = $3,
128                     next_attempt_at = coalesce($4::timestamptz, next_attempt_at),
129                     lease_owner = null,
130                     lease_until = null,
131                     updated_at = now()
132                 where task_id = $1",
133            &[&task_id, &status, &last_error, &next_attempt_at],
134        )
135        .await?)
136}
137
138async fn update_task_status_guarded<C>(
139    client: &C,
140    task_id: i64,
141    status: &str,
142    last_error: Option<&str>,
143    next_attempt_at: Option<DateTime<Utc>>,
144    worker_id: &str,
145) -> Result<u64, ForceSyncError>
146where
147    C: GenericClient + Sync + ?Sized,
148{
149    Ok(client
150        .execute(
151            "update sync_task
152                 set status = $2,
153                     last_error = $3,
154                     next_attempt_at = coalesce($4::timestamptz, next_attempt_at),
155                     lease_owner = null,
156                     lease_until = null,
157                     updated_at = now()
158                 where task_id = $1
159                   and status = 'leased'
160                   and lease_owner = $5",
161            &[&task_id, &status, &last_error, &next_attempt_at, &worker_id],
162        )
163        .await?)
164}
165
166async fn update_task_status<C>(
167    client: &C,
168    task_id: i64,
169    status: &str,
170    last_error: Option<&str>,
171    next_attempt_at: Option<DateTime<Utc>>,
172    expected_lease_owner: Option<&str>,
173) -> Result<u64, ForceSyncError>
174where
175    C: GenericClient + Sync + ?Sized,
176{
177    match expected_lease_owner {
178        Some(worker_id) => {
179            update_task_status_guarded(
180                client,
181                task_id,
182                status,
183                last_error,
184                next_attempt_at,
185                worker_id,
186            )
187            .await
188        }
189        None => {
190            update_task_status_unguarded(client, task_id, status, last_error, next_attempt_at).await
191        }
192    }
193}
194
195impl PgStore {
196    /// Enqueues an apply task for a journal row.
197    ///
198    /// # Errors
199    ///
200    /// Returns an error if the database write fails.
201    pub async fn enqueue_apply_task(
202        &self,
203        journal_id: i64,
204        priority: i32,
205    ) -> Result<i64, ForceSyncError> {
206        let client = self.pool().get().await?;
207        enqueue_apply_task_query(&**client, journal_id, priority).await
208    }
209
210    /// Leases ready tasks for a worker.
211    ///
212    /// # Errors
213    ///
214    /// Returns an error if the lease duration is invalid or the database write fails.
215    pub async fn lease_ready_tasks(
216        &self,
217        worker_id: &str,
218        limit: i64,
219        lease_for: Duration,
220    ) -> Result<Vec<LeasedTask>, ForceSyncError> {
221        let lease_deadline = compute_lease_deadline(lease_for)?;
222        let client = self.pool().get().await?;
223        lease_ready_tasks_query(&**client, worker_id, limit, &lease_deadline).await
224    }
225
226    /// Acknowledges a completed task administratively, without lease-owner checks.
227    ///
228    /// # Errors
229    ///
230    /// Returns an error if the database write fails.
231    pub async fn ack_task(&self, task_id: i64) -> Result<u64, ForceSyncError> {
232        let client = self.pool().get().await?;
233        update_task_status(&**client, task_id, "done", None, None, None).await
234    }
235
236    /// Acknowledges a completed task for the current lease owner.
237    ///
238    /// Returns `0` if the task is no longer leased by `worker_id`.
239    ///
240    /// # Errors
241    ///
242    /// Returns an error if the database write fails.
243    pub async fn ack_task_for_worker(
244        &self,
245        worker_id: &str,
246        task_id: i64,
247    ) -> Result<u64, ForceSyncError> {
248        let client = self.pool().get().await?;
249        update_task_status(&**client, task_id, "done", None, None, Some(worker_id)).await
250    }
251
252    /// Marks a task for retry at a future time administratively, without lease-owner checks.
253    ///
254    /// # Errors
255    ///
256    /// Returns an error if the database write fails.
257    pub async fn retry_task(
258        &self,
259        task_id: i64,
260        next_attempt_at: DateTime<Utc>,
261        error: impl AsRef<str>,
262    ) -> Result<u64, ForceSyncError> {
263        let error = error.as_ref().to_owned();
264        let client = self.pool().get().await?;
265        update_task_status(
266            &**client,
267            task_id,
268            "ready",
269            Some(&error),
270            Some(next_attempt_at),
271            None,
272        )
273        .await
274    }
275
276    /// Marks a task for retry for the current lease owner.
277    ///
278    /// Returns `0` if the task is no longer leased by `worker_id`.
279    ///
280    /// # Errors
281    ///
282    /// Returns an error if the database write fails.
283    pub async fn retry_task_for_worker(
284        &self,
285        worker_id: &str,
286        task_id: i64,
287        next_attempt_at: DateTime<Utc>,
288        error: impl AsRef<str>,
289    ) -> Result<u64, ForceSyncError> {
290        let error = error.as_ref().to_owned();
291        let client = self.pool().get().await?;
292        update_task_status(
293            &**client,
294            task_id,
295            "ready",
296            Some(&error),
297            Some(next_attempt_at),
298            Some(worker_id),
299        )
300        .await
301    }
302
303    /// Marks a task as failed administratively, without lease-owner checks.
304    ///
305    /// # Errors
306    ///
307    /// Returns an error if the database write fails.
308    pub async fn fail_task(
309        &self,
310        task_id: i64,
311        error: impl AsRef<str>,
312    ) -> Result<u64, ForceSyncError> {
313        let error = error.as_ref().to_owned();
314        let client = self.pool().get().await?;
315        update_task_status(&**client, task_id, "failed", Some(&error), None, None).await
316    }
317
318    /// Marks a task as failed for the current lease owner.
319    ///
320    /// Returns `0` if the task is no longer leased by `worker_id`.
321    ///
322    /// # Errors
323    ///
324    /// Returns an error if the database write fails.
325    pub async fn fail_task_for_worker(
326        &self,
327        worker_id: &str,
328        task_id: i64,
329        error: impl AsRef<str>,
330    ) -> Result<u64, ForceSyncError> {
331        let error = error.as_ref().to_owned();
332        let client = self.pool().get().await?;
333        update_task_status(
334            &**client,
335            task_id,
336            "failed",
337            Some(&error),
338            None,
339            Some(worker_id),
340        )
341        .await
342    }
343
344    /// Enqueues an apply task inside an existing transaction.
345    ///
346    /// # Errors
347    ///
348    /// Returns an error if the database write fails.
349    pub async fn enqueue_apply_task_in_tx<C>(
350        client: &C,
351        journal_id: i64,
352        priority: i32,
353    ) -> Result<i64, ForceSyncError>
354    where
355        C: GenericClient + Sync + ?Sized,
356    {
357        enqueue_apply_task_query(client, journal_id, priority).await
358    }
359
360    /// Leases ready tasks inside an existing transaction.
361    ///
362    /// # Errors
363    ///
364    /// Returns an error if the lease duration is invalid or the database write fails.
365    pub async fn lease_ready_tasks_in_tx<C>(
366        client: &C,
367        worker_id: &str,
368        limit: i64,
369        lease_for: Duration,
370    ) -> Result<Vec<LeasedTask>, ForceSyncError>
371    where
372        C: GenericClient + Sync + ?Sized,
373    {
374        let lease_deadline = compute_lease_deadline(lease_for)?;
375        lease_ready_tasks_query(client, worker_id, limit, &lease_deadline).await
376    }
377
378    /// Acknowledges a completed task inside an existing transaction.
379    ///
380    /// # Errors
381    ///
382    /// Returns an error if the database write fails.
383    pub async fn ack_task_in_tx<C>(client: &C, task_id: i64) -> Result<u64, ForceSyncError>
384    where
385        C: GenericClient + Sync + ?Sized,
386    {
387        update_task_status(client, task_id, "done", None, None, None).await
388    }
389
390    /// Marks a task for retry inside an existing transaction.
391    ///
392    /// # Errors
393    ///
394    /// Returns an error if the database write fails.
395    pub async fn retry_task_in_tx<C>(
396        client: &C,
397        task_id: i64,
398        next_attempt_at: DateTime<Utc>,
399        error: impl AsRef<str>,
400    ) -> Result<u64, ForceSyncError>
401    where
402        C: GenericClient + Sync + ?Sized,
403    {
404        let error = error.as_ref().to_owned();
405        update_task_status(
406            client,
407            task_id,
408            "ready",
409            Some(&error),
410            Some(next_attempt_at),
411            None,
412        )
413        .await
414    }
415
416    /// Marks a task as failed inside an existing transaction.
417    ///
418    /// # Errors
419    ///
420    /// Returns an error if the database write fails.
421    pub async fn fail_task_in_tx<C>(
422        client: &C,
423        task_id: i64,
424        error: impl AsRef<str>,
425    ) -> Result<u64, ForceSyncError>
426    where
427        C: GenericClient + Sync + ?Sized,
428    {
429        let error = error.as_ref().to_owned();
430        update_task_status(client, task_id, "failed", Some(&error), None, None).await
431    }
432}
433
434#[cfg(test)]
435mod tests {
436    use std::time::Duration;
437
438    use chrono::Utc;
439
440    use super::compute_lease_deadline;
441
442    #[test]
443    fn lease_deadline_keeps_datetime_type() {
444        let before = Utc::now();
445        let deadline = compute_lease_deadline(Duration::from_secs(5))
446            .unwrap_or_else(|error| panic!("unexpected lease deadline error: {error}"));
447        let after = Utc::now();
448
449        let _: chrono::DateTime<chrono::Utc> = deadline.lease_until;
450        assert!(deadline.lease_until >= before);
451        assert!(deadline.lease_until <= after + chrono::Duration::seconds(5));
452    }
453}