Skip to main content

force_sync/store/pg/
task_queue.rs

1//! Task queue helpers for the `PostgreSQL` sync store.
2
3use std::convert::TryFrom;
4use std::time::Duration;
5
6use chrono::{DateTime, Utc};
7use tokio_postgres::GenericClient;
8
9use crate::error::ForceSyncError;
10
11use super::PgStore;
12
13/// A leased task returned from the task queue.
14#[derive(Debug, Clone, PartialEq, Eq)]
15pub struct LeasedTask {
16    /// The leased task ID.
17    pub task_id: i64,
18    /// The worker that owns the lease.
19    pub lease_owner: String,
20    /// When the lease expires.
21    pub lease_until: DateTime<Utc>,
22}
23
24struct LeaseDeadline {
25    lease_until: DateTime<Utc>,
26}
27
28fn compute_lease_deadline(lease_for: Duration) -> Result<LeaseDeadline, ForceSyncError> {
29    let secs =
30        i64::try_from(lease_for.as_secs()).map_err(|_| ForceSyncError::InvalidLeaseDuration)?;
31    let nanos = i64::from(lease_for.subsec_nanos());
32    let duration = chrono::Duration::seconds(secs) + chrono::Duration::nanoseconds(nanos);
33    let lease_until = Utc::now() + duration;
34
35    Ok(LeaseDeadline { lease_until })
36}
37
38async fn enqueue_apply_task_query<C>(
39    client: &C,
40    journal_id: i64,
41    priority: i32,
42) -> Result<i64, ForceSyncError>
43where
44    C: GenericClient + Sync + ?Sized,
45{
46    let journal_key = journal_id.to_string();
47    let row = client
48        .query_one(
49            "insert into sync_task (status, task_kind, target_key, priority, payload)
50             values ('ready', 'apply', $1::text, $2, jsonb_build_object('journal_id', $1))
51             returning task_id",
52            &[&journal_key, &priority],
53        )
54        .await?;
55
56    Ok(row.get(0))
57}
58
59async fn lease_ready_tasks_query<C>(
60    client: &C,
61    worker_id: &str,
62    limit: i64,
63    lease_until: &LeaseDeadline,
64) -> Result<Vec<LeasedTask>, ForceSyncError>
65where
66    C: GenericClient + Sync + ?Sized,
67{
68    let rows = client
69        .query(
70            "with claimed as (
71                select task_id
72                from sync_task
73                where status = 'ready'
74                  and next_attempt_at <= now()
75                  and (lease_until is null or lease_until <= now())
76                order by priority desc, next_attempt_at asc, task_id asc
77                for update skip locked
78                limit $2
79            )
80            update sync_task
81            set status = 'leased',
82                lease_owner = $1,
83                lease_until = $3::timestamptz,
84                attempt_count = attempt_count + 1,
85                updated_at = now()
86            from claimed
87            where sync_task.task_id = claimed.task_id
88            returning sync_task.task_id",
89            &[&worker_id, &limit, &lease_until.lease_until],
90        )
91        .await?;
92
93    Ok(rows
94        .into_iter()
95        .map(|row| LeasedTask {
96            task_id: row.get(0),
97            lease_owner: worker_id.to_owned(),
98            lease_until: lease_until.lease_until,
99        })
100        .collect())
101}
102
103async fn update_task_status_unguarded<C>(
104    client: &C,
105    task_id: i64,
106    status: &str,
107    last_error: Option<&str>,
108    next_attempt_at: Option<DateTime<Utc>>,
109) -> Result<u64, ForceSyncError>
110where
111    C: GenericClient + Sync + ?Sized,
112{
113    Ok(client
114        .execute(
115            "update sync_task
116                 set status = $2,
117                     last_error = $3,
118                     next_attempt_at = coalesce($4::timestamptz, next_attempt_at),
119                     lease_owner = null,
120                     lease_until = null,
121                     updated_at = now()
122                 where task_id = $1",
123            &[&task_id, &status, &last_error, &next_attempt_at],
124        )
125        .await?)
126}
127
128async fn update_task_status_guarded<C>(
129    client: &C,
130    task_id: i64,
131    status: &str,
132    last_error: Option<&str>,
133    next_attempt_at: Option<DateTime<Utc>>,
134    worker_id: &str,
135) -> Result<u64, ForceSyncError>
136where
137    C: GenericClient + Sync + ?Sized,
138{
139    Ok(client
140        .execute(
141            "update sync_task
142                 set status = $2,
143                     last_error = $3,
144                     next_attempt_at = coalesce($4::timestamptz, next_attempt_at),
145                     lease_owner = null,
146                     lease_until = null,
147                     updated_at = now()
148                 where task_id = $1
149                   and status = 'leased'
150                   and lease_owner = $5",
151            &[&task_id, &status, &last_error, &next_attempt_at, &worker_id],
152        )
153        .await?)
154}
155
156async fn update_task_status<C>(
157    client: &C,
158    task_id: i64,
159    status: &str,
160    last_error: Option<&str>,
161    next_attempt_at: Option<DateTime<Utc>>,
162    expected_lease_owner: Option<&str>,
163) -> Result<u64, ForceSyncError>
164where
165    C: GenericClient + Sync + ?Sized,
166{
167    match expected_lease_owner {
168        Some(worker_id) => {
169            update_task_status_guarded(
170                client,
171                task_id,
172                status,
173                last_error,
174                next_attempt_at,
175                worker_id,
176            )
177            .await
178        }
179        None => {
180            update_task_status_unguarded(client, task_id, status, last_error, next_attempt_at).await
181        }
182    }
183}
184
185impl PgStore {
186    /// Enqueues an apply task for a journal row.
187    ///
188    /// # Errors
189    ///
190    /// Returns an error if the database write fails.
191    pub async fn enqueue_apply_task(
192        &self,
193        journal_id: i64,
194        priority: i32,
195    ) -> Result<i64, ForceSyncError> {
196        let client = self.pool().get().await?;
197        enqueue_apply_task_query(&**client, journal_id, priority).await
198    }
199
200    /// Leases ready tasks for a worker.
201    ///
202    /// # Errors
203    ///
204    /// Returns an error if the lease duration is invalid or the database write fails.
205    pub async fn lease_ready_tasks(
206        &self,
207        worker_id: &str,
208        limit: i64,
209        lease_for: Duration,
210    ) -> Result<Vec<LeasedTask>, ForceSyncError> {
211        let lease_deadline = compute_lease_deadline(lease_for)?;
212        let client = self.pool().get().await?;
213        lease_ready_tasks_query(&**client, worker_id, limit, &lease_deadline).await
214    }
215
216    /// Acknowledges a completed task administratively, without lease-owner checks.
217    ///
218    /// # Errors
219    ///
220    /// Returns an error if the database write fails.
221    pub async fn ack_task(&self, task_id: i64) -> Result<u64, ForceSyncError> {
222        let client = self.pool().get().await?;
223        update_task_status(&**client, task_id, "done", None, None, None).await
224    }
225
226    /// Acknowledges a completed task for the current lease owner.
227    ///
228    /// Returns `0` if the task is no longer leased by `worker_id`.
229    ///
230    /// # Errors
231    ///
232    /// Returns an error if the database write fails.
233    pub async fn ack_task_for_worker(
234        &self,
235        worker_id: &str,
236        task_id: i64,
237    ) -> Result<u64, ForceSyncError> {
238        let client = self.pool().get().await?;
239        update_task_status(&**client, task_id, "done", None, None, Some(worker_id)).await
240    }
241
242    /// Marks a task for retry at a future time administratively, without lease-owner checks.
243    ///
244    /// # Errors
245    ///
246    /// Returns an error if the database write fails.
247    pub async fn retry_task(
248        &self,
249        task_id: i64,
250        next_attempt_at: DateTime<Utc>,
251        error: impl AsRef<str>,
252    ) -> Result<u64, ForceSyncError> {
253        let error = error.as_ref().to_owned();
254        let client = self.pool().get().await?;
255        update_task_status(
256            &**client,
257            task_id,
258            "ready",
259            Some(&error),
260            Some(next_attempt_at),
261            None,
262        )
263        .await
264    }
265
266    /// Marks a task for retry for the current lease owner.
267    ///
268    /// Returns `0` if the task is no longer leased by `worker_id`.
269    ///
270    /// # Errors
271    ///
272    /// Returns an error if the database write fails.
273    pub async fn retry_task_for_worker(
274        &self,
275        worker_id: &str,
276        task_id: i64,
277        next_attempt_at: DateTime<Utc>,
278        error: impl AsRef<str>,
279    ) -> Result<u64, ForceSyncError> {
280        let error = error.as_ref().to_owned();
281        let client = self.pool().get().await?;
282        update_task_status(
283            &**client,
284            task_id,
285            "ready",
286            Some(&error),
287            Some(next_attempt_at),
288            Some(worker_id),
289        )
290        .await
291    }
292
293    /// Marks a task as failed administratively, without lease-owner checks.
294    ///
295    /// # Errors
296    ///
297    /// Returns an error if the database write fails.
298    pub async fn fail_task(
299        &self,
300        task_id: i64,
301        error: impl AsRef<str>,
302    ) -> Result<u64, ForceSyncError> {
303        let error = error.as_ref().to_owned();
304        let client = self.pool().get().await?;
305        update_task_status(&**client, task_id, "failed", Some(&error), None, None).await
306    }
307
308    /// Marks a task as failed for the current lease owner.
309    ///
310    /// Returns `0` if the task is no longer leased by `worker_id`.
311    ///
312    /// # Errors
313    ///
314    /// Returns an error if the database write fails.
315    pub async fn fail_task_for_worker(
316        &self,
317        worker_id: &str,
318        task_id: i64,
319        error: impl AsRef<str>,
320    ) -> Result<u64, ForceSyncError> {
321        let error = error.as_ref().to_owned();
322        let client = self.pool().get().await?;
323        update_task_status(
324            &**client,
325            task_id,
326            "failed",
327            Some(&error),
328            None,
329            Some(worker_id),
330        )
331        .await
332    }
333
334    /// Enqueues an apply task inside an existing transaction.
335    ///
336    /// # Errors
337    ///
338    /// Returns an error if the database write fails.
339    pub async fn enqueue_apply_task_in_tx<C>(
340        client: &C,
341        journal_id: i64,
342        priority: i32,
343    ) -> Result<i64, ForceSyncError>
344    where
345        C: GenericClient + Sync + ?Sized,
346    {
347        enqueue_apply_task_query(client, journal_id, priority).await
348    }
349
350    /// Leases ready tasks inside an existing transaction.
351    ///
352    /// # Errors
353    ///
354    /// Returns an error if the lease duration is invalid or the database write fails.
355    pub async fn lease_ready_tasks_in_tx<C>(
356        client: &C,
357        worker_id: &str,
358        limit: i64,
359        lease_for: Duration,
360    ) -> Result<Vec<LeasedTask>, ForceSyncError>
361    where
362        C: GenericClient + Sync + ?Sized,
363    {
364        let lease_deadline = compute_lease_deadline(lease_for)?;
365        lease_ready_tasks_query(client, worker_id, limit, &lease_deadline).await
366    }
367
368    /// Acknowledges a completed task inside an existing transaction.
369    ///
370    /// # Errors
371    ///
372    /// Returns an error if the database write fails.
373    pub async fn ack_task_in_tx<C>(client: &C, task_id: i64) -> Result<u64, ForceSyncError>
374    where
375        C: GenericClient + Sync + ?Sized,
376    {
377        update_task_status(client, task_id, "done", None, None, None).await
378    }
379
380    /// Marks a task for retry inside an existing transaction.
381    ///
382    /// # Errors
383    ///
384    /// Returns an error if the database write fails.
385    pub async fn retry_task_in_tx<C>(
386        client: &C,
387        task_id: i64,
388        next_attempt_at: DateTime<Utc>,
389        error: impl AsRef<str>,
390    ) -> Result<u64, ForceSyncError>
391    where
392        C: GenericClient + Sync + ?Sized,
393    {
394        let error = error.as_ref().to_owned();
395        update_task_status(
396            client,
397            task_id,
398            "ready",
399            Some(&error),
400            Some(next_attempt_at),
401            None,
402        )
403        .await
404    }
405
406    /// Marks a task as failed inside an existing transaction.
407    ///
408    /// # Errors
409    ///
410    /// Returns an error if the database write fails.
411    pub async fn fail_task_in_tx<C>(
412        client: &C,
413        task_id: i64,
414        error: impl AsRef<str>,
415    ) -> Result<u64, ForceSyncError>
416    where
417        C: GenericClient + Sync + ?Sized,
418    {
419        let error = error.as_ref().to_owned();
420        update_task_status(client, task_id, "failed", Some(&error), None, None).await
421    }
422}
423
424#[cfg(test)]
425mod tests {
426    use std::time::Duration;
427
428    use chrono::Utc;
429
430    use super::compute_lease_deadline;
431
432    #[test]
433    fn lease_deadline_keeps_datetime_type() {
434        let before = Utc::now();
435        let deadline = compute_lease_deadline(Duration::from_secs(5))
436            .unwrap_or_else(|error| panic!("unexpected lease deadline error: {error}"));
437        let after = Utc::now();
438
439        let _: chrono::DateTime<chrono::Utc> = deadline.lease_until;
440        assert!(deadline.lease_until >= before);
441        assert!(deadline.lease_until <= after + chrono::Duration::seconds(5));
442    }
443}