1use std::convert::TryFrom;
4use std::time::Duration;
5
6use chrono::{DateTime, Utc};
7use tokio_postgres::GenericClient;
8
9use crate::error::ForceSyncError;
10
11use super::PgStore;
12
13#[derive(Debug, Clone, PartialEq, Eq)]
15pub struct LeasedTask {
16 pub task_id: i64,
18 pub lease_owner: String,
20 pub lease_until: DateTime<Utc>,
22}
23
24struct LeaseDeadline {
25 lease_until: DateTime<Utc>,
26}
27
28fn compute_lease_deadline(lease_for: Duration) -> Result<LeaseDeadline, ForceSyncError> {
29 let secs =
30 i64::try_from(lease_for.as_secs()).map_err(|_| ForceSyncError::InvalidLeaseDuration)?;
31 let nanos = i64::from(lease_for.subsec_nanos());
32 let duration = chrono::Duration::seconds(secs) + chrono::Duration::nanoseconds(nanos);
33 let lease_until = Utc::now() + duration;
34
35 Ok(LeaseDeadline { lease_until })
36}
37
38async fn enqueue_apply_task_query<C>(
39 client: &C,
40 journal_id: i64,
41 priority: i32,
42) -> Result<i64, ForceSyncError>
43where
44 C: GenericClient + Sync + ?Sized,
45{
46 let journal_key = journal_id.to_string();
47 let row = client
48 .query_one(
49 "insert into sync_task (status, task_kind, target_key, priority, payload)
50 values ('ready', 'apply', $1::text, $2, jsonb_build_object('journal_id', $1))
51 returning task_id",
52 &[&journal_key, &priority],
53 )
54 .await?;
55
56 Ok(row.get(0))
57}
58
59async fn lease_ready_tasks_query<C>(
60 client: &C,
61 worker_id: &str,
62 limit: i64,
63 lease_until: &LeaseDeadline,
64) -> Result<Vec<LeasedTask>, ForceSyncError>
65where
66 C: GenericClient + Sync + ?Sized,
67{
68 let rows = client
69 .query(
70 "with claimed as (
71 select task_id
72 from sync_task
73 where status = 'ready'
74 and next_attempt_at <= now()
75 and (lease_until is null or lease_until <= now())
76 order by priority desc, next_attempt_at asc, task_id asc
77 for update skip locked
78 limit $2
79 )
80 update sync_task
81 set status = 'leased',
82 lease_owner = $1,
83 lease_until = $3::timestamptz,
84 attempt_count = attempt_count + 1,
85 updated_at = now()
86 from claimed
87 where sync_task.task_id = claimed.task_id
88 returning sync_task.task_id",
89 &[&worker_id, &limit, &lease_until.lease_until],
90 )
91 .await?;
92
93 Ok(rows
94 .into_iter()
95 .map(|row| LeasedTask {
96 task_id: row.get(0),
97 lease_owner: worker_id.to_owned(),
98 lease_until: lease_until.lease_until,
99 })
100 .collect())
101}
102
103async fn update_task_status_unguarded<C>(
104 client: &C,
105 task_id: i64,
106 status: &str,
107 last_error: Option<&str>,
108 next_attempt_at: Option<DateTime<Utc>>,
109) -> Result<u64, ForceSyncError>
110where
111 C: GenericClient + Sync + ?Sized,
112{
113 Ok(client
114 .execute(
115 "update sync_task
116 set status = $2,
117 last_error = $3,
118 next_attempt_at = coalesce($4::timestamptz, next_attempt_at),
119 lease_owner = null,
120 lease_until = null,
121 updated_at = now()
122 where task_id = $1",
123 &[&task_id, &status, &last_error, &next_attempt_at],
124 )
125 .await?)
126}
127
128async fn update_task_status_guarded<C>(
129 client: &C,
130 task_id: i64,
131 status: &str,
132 last_error: Option<&str>,
133 next_attempt_at: Option<DateTime<Utc>>,
134 worker_id: &str,
135) -> Result<u64, ForceSyncError>
136where
137 C: GenericClient + Sync + ?Sized,
138{
139 Ok(client
140 .execute(
141 "update sync_task
142 set status = $2,
143 last_error = $3,
144 next_attempt_at = coalesce($4::timestamptz, next_attempt_at),
145 lease_owner = null,
146 lease_until = null,
147 updated_at = now()
148 where task_id = $1
149 and status = 'leased'
150 and lease_owner = $5",
151 &[&task_id, &status, &last_error, &next_attempt_at, &worker_id],
152 )
153 .await?)
154}
155
156async fn update_task_status<C>(
157 client: &C,
158 task_id: i64,
159 status: &str,
160 last_error: Option<&str>,
161 next_attempt_at: Option<DateTime<Utc>>,
162 expected_lease_owner: Option<&str>,
163) -> Result<u64, ForceSyncError>
164where
165 C: GenericClient + Sync + ?Sized,
166{
167 match expected_lease_owner {
168 Some(worker_id) => {
169 update_task_status_guarded(
170 client,
171 task_id,
172 status,
173 last_error,
174 next_attempt_at,
175 worker_id,
176 )
177 .await
178 }
179 None => {
180 update_task_status_unguarded(client, task_id, status, last_error, next_attempt_at).await
181 }
182 }
183}
184
185impl PgStore {
186 pub async fn enqueue_apply_task(
192 &self,
193 journal_id: i64,
194 priority: i32,
195 ) -> Result<i64, ForceSyncError> {
196 let client = self.pool().get().await?;
197 enqueue_apply_task_query(&**client, journal_id, priority).await
198 }
199
200 pub async fn lease_ready_tasks(
206 &self,
207 worker_id: &str,
208 limit: i64,
209 lease_for: Duration,
210 ) -> Result<Vec<LeasedTask>, ForceSyncError> {
211 let lease_deadline = compute_lease_deadline(lease_for)?;
212 let client = self.pool().get().await?;
213 lease_ready_tasks_query(&**client, worker_id, limit, &lease_deadline).await
214 }
215
216 pub async fn ack_task(&self, task_id: i64) -> Result<u64, ForceSyncError> {
222 let client = self.pool().get().await?;
223 update_task_status(&**client, task_id, "done", None, None, None).await
224 }
225
226 pub async fn ack_task_for_worker(
234 &self,
235 worker_id: &str,
236 task_id: i64,
237 ) -> Result<u64, ForceSyncError> {
238 let client = self.pool().get().await?;
239 update_task_status(&**client, task_id, "done", None, None, Some(worker_id)).await
240 }
241
242 pub async fn retry_task(
248 &self,
249 task_id: i64,
250 next_attempt_at: DateTime<Utc>,
251 error: impl AsRef<str>,
252 ) -> Result<u64, ForceSyncError> {
253 let error = error.as_ref().to_owned();
254 let client = self.pool().get().await?;
255 update_task_status(
256 &**client,
257 task_id,
258 "ready",
259 Some(&error),
260 Some(next_attempt_at),
261 None,
262 )
263 .await
264 }
265
266 pub async fn retry_task_for_worker(
274 &self,
275 worker_id: &str,
276 task_id: i64,
277 next_attempt_at: DateTime<Utc>,
278 error: impl AsRef<str>,
279 ) -> Result<u64, ForceSyncError> {
280 let error = error.as_ref().to_owned();
281 let client = self.pool().get().await?;
282 update_task_status(
283 &**client,
284 task_id,
285 "ready",
286 Some(&error),
287 Some(next_attempt_at),
288 Some(worker_id),
289 )
290 .await
291 }
292
293 pub async fn fail_task(
299 &self,
300 task_id: i64,
301 error: impl AsRef<str>,
302 ) -> Result<u64, ForceSyncError> {
303 let error = error.as_ref().to_owned();
304 let client = self.pool().get().await?;
305 update_task_status(&**client, task_id, "failed", Some(&error), None, None).await
306 }
307
308 pub async fn fail_task_for_worker(
316 &self,
317 worker_id: &str,
318 task_id: i64,
319 error: impl AsRef<str>,
320 ) -> Result<u64, ForceSyncError> {
321 let error = error.as_ref().to_owned();
322 let client = self.pool().get().await?;
323 update_task_status(
324 &**client,
325 task_id,
326 "failed",
327 Some(&error),
328 None,
329 Some(worker_id),
330 )
331 .await
332 }
333
334 pub async fn enqueue_apply_task_in_tx<C>(
340 client: &C,
341 journal_id: i64,
342 priority: i32,
343 ) -> Result<i64, ForceSyncError>
344 where
345 C: GenericClient + Sync + ?Sized,
346 {
347 enqueue_apply_task_query(client, journal_id, priority).await
348 }
349
350 pub async fn lease_ready_tasks_in_tx<C>(
356 client: &C,
357 worker_id: &str,
358 limit: i64,
359 lease_for: Duration,
360 ) -> Result<Vec<LeasedTask>, ForceSyncError>
361 where
362 C: GenericClient + Sync + ?Sized,
363 {
364 let lease_deadline = compute_lease_deadline(lease_for)?;
365 lease_ready_tasks_query(client, worker_id, limit, &lease_deadline).await
366 }
367
368 pub async fn ack_task_in_tx<C>(client: &C, task_id: i64) -> Result<u64, ForceSyncError>
374 where
375 C: GenericClient + Sync + ?Sized,
376 {
377 update_task_status(client, task_id, "done", None, None, None).await
378 }
379
380 pub async fn retry_task_in_tx<C>(
386 client: &C,
387 task_id: i64,
388 next_attempt_at: DateTime<Utc>,
389 error: impl AsRef<str>,
390 ) -> Result<u64, ForceSyncError>
391 where
392 C: GenericClient + Sync + ?Sized,
393 {
394 let error = error.as_ref().to_owned();
395 update_task_status(
396 client,
397 task_id,
398 "ready",
399 Some(&error),
400 Some(next_attempt_at),
401 None,
402 )
403 .await
404 }
405
406 pub async fn fail_task_in_tx<C>(
412 client: &C,
413 task_id: i64,
414 error: impl AsRef<str>,
415 ) -> Result<u64, ForceSyncError>
416 where
417 C: GenericClient + Sync + ?Sized,
418 {
419 let error = error.as_ref().to_owned();
420 update_task_status(client, task_id, "failed", Some(&error), None, None).await
421 }
422}
423
424#[cfg(test)]
425mod tests {
426 use std::time::Duration;
427
428 use chrono::Utc;
429
430 use super::compute_lease_deadline;
431
432 #[test]
433 fn lease_deadline_keeps_datetime_type() {
434 let before = Utc::now();
435 let deadline = compute_lease_deadline(Duration::from_secs(5))
436 .unwrap_or_else(|error| panic!("unexpected lease deadline error: {error}"));
437 let after = Utc::now();
438
439 let _: chrono::DateTime<chrono::Utc> = deadline.lease_until;
440 assert!(deadline.lease_until >= before);
441 assert!(deadline.lease_until <= after + chrono::Duration::seconds(5));
442 }
443}