1use std::convert::TryFrom;
4use std::time::Duration;
5
6use chrono::{DateTime, Utc};
7use tokio_postgres::GenericClient;
8
9use crate::error::ForceSyncError;
10
11use super::PgStore;
12
13#[derive(Debug, Clone, PartialEq, Eq)]
15pub struct LeasedTask {
16 pub task_id: i64,
18 pub lease_owner: String,
20 pub lease_until: DateTime<Utc>,
22}
23
24struct LeaseDeadline {
25 lease_until: DateTime<Utc>,
26}
27
28fn compute_lease_deadline(lease_for: Duration) -> Result<LeaseDeadline, ForceSyncError> {
29 let secs =
30 i64::try_from(lease_for.as_secs()).map_err(|_| ForceSyncError::InvalidLeaseDuration)?;
31 let nanos = i64::from(lease_for.subsec_nanos());
32
33 let duration_secs =
34 chrono::Duration::try_seconds(secs).ok_or(ForceSyncError::InvalidLeaseDuration)?;
35 let duration_nanos = chrono::Duration::nanoseconds(nanos);
36
37 let duration = duration_secs
38 .checked_add(&duration_nanos)
39 .ok_or(ForceSyncError::InvalidLeaseDuration)?;
40
41 let lease_until = Utc::now()
42 .checked_add_signed(duration)
43 .ok_or(ForceSyncError::InvalidLeaseDuration)?;
44
45 Ok(LeaseDeadline { lease_until })
46}
47
48async fn enqueue_apply_task_query<C>(
49 client: &C,
50 journal_id: i64,
51 priority: i32,
52) -> Result<i64, ForceSyncError>
53where
54 C: GenericClient + Sync + ?Sized,
55{
56 let journal_key = journal_id.to_string();
57 let row = client
58 .query_one(
59 "insert into sync_task (status, task_kind, target_key, priority, payload)
60 values ('ready', 'apply', $1::text, $2, jsonb_build_object('journal_id', $1))
61 returning task_id",
62 &[&journal_key, &priority],
63 )
64 .await?;
65
66 Ok(row.get(0))
67}
68
69async fn lease_ready_tasks_query<C>(
70 client: &C,
71 worker_id: &str,
72 limit: i64,
73 lease_until: &LeaseDeadline,
74) -> Result<Vec<LeasedTask>, ForceSyncError>
75where
76 C: GenericClient + Sync + ?Sized,
77{
78 let rows = client
79 .query(
80 "with claimed as (
81 select task_id
82 from sync_task
83 where status = 'ready'
84 and next_attempt_at <= now()
85 and (lease_until is null or lease_until <= now())
86 order by priority desc, next_attempt_at asc, task_id asc
87 for update skip locked
88 limit $2
89 )
90 update sync_task
91 set status = 'leased',
92 lease_owner = $1,
93 lease_until = $3::timestamptz,
94 attempt_count = attempt_count + 1,
95 updated_at = now()
96 from claimed
97 where sync_task.task_id = claimed.task_id
98 returning sync_task.task_id",
99 &[&worker_id, &limit, &lease_until.lease_until],
100 )
101 .await?;
102
103 Ok(rows
104 .into_iter()
105 .map(|row| LeasedTask {
106 task_id: row.get(0),
107 lease_owner: worker_id.to_owned(),
108 lease_until: lease_until.lease_until,
109 })
110 .collect())
111}
112
113async fn update_task_status_unguarded<C>(
114 client: &C,
115 task_id: i64,
116 status: &str,
117 last_error: Option<&str>,
118 next_attempt_at: Option<DateTime<Utc>>,
119) -> Result<u64, ForceSyncError>
120where
121 C: GenericClient + Sync + ?Sized,
122{
123 Ok(client
124 .execute(
125 "update sync_task
126 set status = $2,
127 last_error = $3,
128 next_attempt_at = coalesce($4::timestamptz, next_attempt_at),
129 lease_owner = null,
130 lease_until = null,
131 updated_at = now()
132 where task_id = $1",
133 &[&task_id, &status, &last_error, &next_attempt_at],
134 )
135 .await?)
136}
137
138async fn update_task_status_guarded<C>(
139 client: &C,
140 task_id: i64,
141 status: &str,
142 last_error: Option<&str>,
143 next_attempt_at: Option<DateTime<Utc>>,
144 worker_id: &str,
145) -> Result<u64, ForceSyncError>
146where
147 C: GenericClient + Sync + ?Sized,
148{
149 Ok(client
150 .execute(
151 "update sync_task
152 set status = $2,
153 last_error = $3,
154 next_attempt_at = coalesce($4::timestamptz, next_attempt_at),
155 lease_owner = null,
156 lease_until = null,
157 updated_at = now()
158 where task_id = $1
159 and status = 'leased'
160 and lease_owner = $5",
161 &[&task_id, &status, &last_error, &next_attempt_at, &worker_id],
162 )
163 .await?)
164}
165
166async fn update_task_status<C>(
167 client: &C,
168 task_id: i64,
169 status: &str,
170 last_error: Option<&str>,
171 next_attempt_at: Option<DateTime<Utc>>,
172 expected_lease_owner: Option<&str>,
173) -> Result<u64, ForceSyncError>
174where
175 C: GenericClient + Sync + ?Sized,
176{
177 match expected_lease_owner {
178 Some(worker_id) => {
179 update_task_status_guarded(
180 client,
181 task_id,
182 status,
183 last_error,
184 next_attempt_at,
185 worker_id,
186 )
187 .await
188 }
189 None => {
190 update_task_status_unguarded(client, task_id, status, last_error, next_attempt_at).await
191 }
192 }
193}
194
195impl PgStore {
196 pub async fn enqueue_apply_task(
202 &self,
203 journal_id: i64,
204 priority: i32,
205 ) -> Result<i64, ForceSyncError> {
206 let client = self.pool().get().await?;
207 enqueue_apply_task_query(&**client, journal_id, priority).await
208 }
209
210 pub async fn lease_ready_tasks(
216 &self,
217 worker_id: &str,
218 limit: i64,
219 lease_for: Duration,
220 ) -> Result<Vec<LeasedTask>, ForceSyncError> {
221 let lease_deadline = compute_lease_deadline(lease_for)?;
222 let client = self.pool().get().await?;
223 lease_ready_tasks_query(&**client, worker_id, limit, &lease_deadline).await
224 }
225
226 pub async fn ack_task(&self, task_id: i64) -> Result<u64, ForceSyncError> {
232 let client = self.pool().get().await?;
233 update_task_status(&**client, task_id, "done", None, None, None).await
234 }
235
236 pub async fn ack_task_for_worker(
244 &self,
245 worker_id: &str,
246 task_id: i64,
247 ) -> Result<u64, ForceSyncError> {
248 let client = self.pool().get().await?;
249 update_task_status(&**client, task_id, "done", None, None, Some(worker_id)).await
250 }
251
252 pub async fn retry_task(
258 &self,
259 task_id: i64,
260 next_attempt_at: DateTime<Utc>,
261 error: impl AsRef<str>,
262 ) -> Result<u64, ForceSyncError> {
263 let error = error.as_ref().to_owned();
264 let client = self.pool().get().await?;
265 update_task_status(
266 &**client,
267 task_id,
268 "ready",
269 Some(&error),
270 Some(next_attempt_at),
271 None,
272 )
273 .await
274 }
275
276 pub async fn retry_task_for_worker(
284 &self,
285 worker_id: &str,
286 task_id: i64,
287 next_attempt_at: DateTime<Utc>,
288 error: impl AsRef<str>,
289 ) -> Result<u64, ForceSyncError> {
290 let error = error.as_ref().to_owned();
291 let client = self.pool().get().await?;
292 update_task_status(
293 &**client,
294 task_id,
295 "ready",
296 Some(&error),
297 Some(next_attempt_at),
298 Some(worker_id),
299 )
300 .await
301 }
302
303 pub async fn fail_task(
309 &self,
310 task_id: i64,
311 error: impl AsRef<str>,
312 ) -> Result<u64, ForceSyncError> {
313 let error = error.as_ref().to_owned();
314 let client = self.pool().get().await?;
315 update_task_status(&**client, task_id, "failed", Some(&error), None, None).await
316 }
317
318 pub async fn fail_task_for_worker(
326 &self,
327 worker_id: &str,
328 task_id: i64,
329 error: impl AsRef<str>,
330 ) -> Result<u64, ForceSyncError> {
331 let error = error.as_ref().to_owned();
332 let client = self.pool().get().await?;
333 update_task_status(
334 &**client,
335 task_id,
336 "failed",
337 Some(&error),
338 None,
339 Some(worker_id),
340 )
341 .await
342 }
343
344 pub async fn enqueue_apply_task_in_tx<C>(
350 client: &C,
351 journal_id: i64,
352 priority: i32,
353 ) -> Result<i64, ForceSyncError>
354 where
355 C: GenericClient + Sync + ?Sized,
356 {
357 enqueue_apply_task_query(client, journal_id, priority).await
358 }
359
360 pub async fn lease_ready_tasks_in_tx<C>(
366 client: &C,
367 worker_id: &str,
368 limit: i64,
369 lease_for: Duration,
370 ) -> Result<Vec<LeasedTask>, ForceSyncError>
371 where
372 C: GenericClient + Sync + ?Sized,
373 {
374 let lease_deadline = compute_lease_deadline(lease_for)?;
375 lease_ready_tasks_query(client, worker_id, limit, &lease_deadline).await
376 }
377
378 pub async fn ack_task_in_tx<C>(client: &C, task_id: i64) -> Result<u64, ForceSyncError>
384 where
385 C: GenericClient + Sync + ?Sized,
386 {
387 update_task_status(client, task_id, "done", None, None, None).await
388 }
389
390 pub async fn retry_task_in_tx<C>(
396 client: &C,
397 task_id: i64,
398 next_attempt_at: DateTime<Utc>,
399 error: impl AsRef<str>,
400 ) -> Result<u64, ForceSyncError>
401 where
402 C: GenericClient + Sync + ?Sized,
403 {
404 let error = error.as_ref().to_owned();
405 update_task_status(
406 client,
407 task_id,
408 "ready",
409 Some(&error),
410 Some(next_attempt_at),
411 None,
412 )
413 .await
414 }
415
416 pub async fn fail_task_in_tx<C>(
422 client: &C,
423 task_id: i64,
424 error: impl AsRef<str>,
425 ) -> Result<u64, ForceSyncError>
426 where
427 C: GenericClient + Sync + ?Sized,
428 {
429 let error = error.as_ref().to_owned();
430 update_task_status(client, task_id, "failed", Some(&error), None, None).await
431 }
432}
433
434#[cfg(test)]
435mod tests {
436 use std::time::Duration;
437
438 use chrono::Utc;
439
440 use super::compute_lease_deadline;
441
442 #[test]
443 fn lease_deadline_keeps_datetime_type() {
444 let before = Utc::now();
445 let deadline = compute_lease_deadline(Duration::from_secs(5))
446 .unwrap_or_else(|error| panic!("unexpected lease deadline error: {error}"));
447 let after = Utc::now();
448
449 let _: chrono::DateTime<chrono::Utc> = deadline.lease_until;
450 assert!(deadline.lease_until >= before);
451 assert!(deadline.lease_until <= after + chrono::Duration::seconds(5));
452 }
453}