Skip to main content

pg_task/
lib.rs

1/*!
2# pg_task
3
4FSM-based Resumable Postgres tasks
5
6- **FSM-based** - each task is a granular state machine
7- **Resumable** - on error, after you fix the step logic or the external world,
8  the task is able to pick up where it stopped
9- **Postgres** - a single table handles task scheduling, state
10  transitions, and error processing
11
12## Table of Contents
13
14- [Tutorial](#tutorial)
15    - [Defining Tasks](#defining-tasks)
16    - [Investigating Errors](#investigating-errors)
17    - [Listening for Task Errors](#listening-for-task-errors)
18    - [Fixing the World](#fixing-the-world)
19- [Scheduling Tasks](#scheduling-tasks)
20- [Running Workers](#running-workers)
21- [Stopping Workers](#stopping-workers)
22- [Delaying Steps](#delaying-steps)
23- [Retrying Steps](#retrying-steps)
24
25## Tutorial
26
27_The full runnable code is in [examples/tutorial.rs][tutorial-example]._
28
29### Defining Tasks
30
31We create a greeter task consisting of two steps:
32
33```rust
34# use async_trait::async_trait;
35# use pg_task::{NextStep, Step, StepResult};
36# use serde::{Deserialize, Serialize};
37# use sqlx::PgPool;
38#[derive(Debug, Deserialize, Serialize)]
39pub struct ReadName {
40    filename: String,
41}
42# #[derive(Debug, Deserialize, Serialize)]
43# pub struct SayHello {
44#     name: String,
45# }
46# pg_task::task!(Greeter { ReadName, SayHello });
47# #[async_trait]
48# impl Step<Greeter> for SayHello {
49#     async fn step(self, _db: &PgPool) -> StepResult<Greeter> {
50#         NextStep::none()
51#     }
52# }
53
54#[async_trait]
55impl Step<Greeter> for ReadName {
56    const RETRY_LIMIT: i32 = 5;
57
58    async fn step(self, _db: &PgPool) -> StepResult<Greeter> {
59        let name = std::fs::read_to_string(&self.filename)?;
60        NextStep::now(SayHello { name })
61    }
62}
63```
64
65The first step tries to read a name from a file:
66
67- `filename` - the only state we need in this step
68- `impl Step<Greeter> for ReadName` - our step is a part of a `Greeter` task
69- `RETRY_LIMIT` - the step is fallible, let's retry it a few times
70- `NextStep::now(SayHello { name })` - move our task to the `SayHello` step
71  right now
72
73```rust
74# use async_trait::async_trait;
75# use pg_task::{NextStep, Step, StepResult};
76# use serde::{Deserialize, Serialize};
77# use sqlx::PgPool;
78# #[derive(Debug, Deserialize, Serialize)]
79# pub struct ReadName {
80#     filename: String,
81# }
82#[derive(Debug, Deserialize, Serialize)]
83pub struct SayHello {
84    name: String,
85}
86# pg_task::task!(Greeter { ReadName, SayHello });
87# #[async_trait]
88# impl Step<Greeter> for ReadName {
89#     async fn step(self, _db: &PgPool) -> StepResult<Greeter> {
90#         NextStep::none()
91#     }
92# }
93#[async_trait]
94impl Step<Greeter> for SayHello {
95    async fn step(self, _db: &PgPool) -> StepResult<Greeter> {
96        println!("Hello, {}", self.name);
97        NextStep::none()
98    }
99}
100```
101
102The second step prints the greeting and finishes the task returning
103`NextStep::none()`.
104
105The [full code][tutorial-example] includes the remaining setup. Run it with:
106
107```bash
108cargo run --example tutorial
109```
110
111### Investigating Errors
112
113The example logs 6 attempts: the first try plus `RETRY_LIMIT` retries. Inspect
114the row to see what happened:
115
116```bash
117~$ psql pg_task -c 'table pg_task'
118-[ RECORD 1 ]------------------------------------------------
119id              | cddf7de1-1194-4bee-90c6-af73d9206ce2
120step            | {"Greeter":{"ReadName":{"filename":"name.txt"}}}
121wakeup_at       | 2024-06-30 09:32:27.703599+06
122tried           | 6
123locked_by       |
124lock_expires_at |
125error           | No such file or directory (os error 2)
126created_at      | 2024-06-30 09:32:22.628563+06
127updated_at      | 2024-06-30 09:32:27.703599+06
128```
129
130- a non-null `error` field indicates that the task has errored and contains the
131  error message
132- the `step` field provides you with the information about a particular step and
133  its state when the error occurred
134
135### Listening for Task Errors
136
137Use [`listen_for_task_errors`] to receive live task error events:
138
139```rust,no_run
140# async fn demo(pool: sqlx::PgPool) -> pg_task::Result<()> {
141let mut errors = pg_task::listen_for_task_errors(&pool).await?;
142
143loop {
144    let task = errors.recv().await?;
145    eprintln!("task {} failed: {}", task.id, task.error);
146}
147# }
148```
149
150### Fixing the World
151
152The task failed because the file is missing. Create it:
153
154```bash
155echo 'Fixed World' >name.txt
156```
157
158Clear `error` to rerun the task:
159
160```bash
161psql pg_task -c 'update pg_task set error = null'
162```
163
164The worker reruns the task and prints the greeting from the final step.
165
166### Scheduling Tasks
167
168Scheduling a task means inserting a row into the `pg_task` table. You can do it
169from `psql` or from code in any language.
170
171The crate also provides helpers for first-step serialization and scheduling:
172
173- [`enqueue`] - to run the task immediately
174- [`delay`] - to run it with a delay
175- [`schedule`] - to schedule it to a particular time
176
177### Running Workers
178
179After [defining](#defining-tasks) the steps of each task, we need to wrap them
180into enums representing whole tasks via [`task!`]:
181
182```rust
183# use async_trait::async_trait;
184# use pg_task::{NextStep, Step};
185# use sqlx::PgPool;
186# #[derive(Debug, serde::Deserialize, serde::Serialize)]
187# struct StepA;
188# #[derive(Debug, serde::Deserialize, serde::Serialize)]
189# struct StepB;
190# #[derive(Debug, serde::Deserialize, serde::Serialize)]
191# struct StepC;
192pg_task::task!(Task1 { StepA, StepB });
193pg_task::task!(Task2 { StepC });
194# #[async_trait]
195# impl Step<Task1> for StepA {
196#     async fn step(self, _db: &PgPool) -> pg_task::StepResult<Task1> {
197#         NextStep::none()
198#     }
199# }
200# #[async_trait]
201# impl Step<Task1> for StepB {
202#     async fn step(self, _db: &PgPool) -> pg_task::StepResult<Task1> {
203#         NextStep::none()
204#     }
205# }
206# #[async_trait]
207# impl Step<Task2> for StepC {
208#     async fn step(self, _db: &PgPool) -> pg_task::StepResult<Task2> {
209#         NextStep::none()
210#     }
211# }
212```
213
214One more enum is needed to combine all the possible tasks:
215
216```rust
217# use async_trait::async_trait;
218# use pg_task::{NextStep, Step};
219# use sqlx::PgPool;
220# #[derive(Debug, serde::Deserialize, serde::Serialize)]
221# struct StepA;
222# #[derive(Debug, serde::Deserialize, serde::Serialize)]
223# struct StepB;
224# #[derive(Debug, serde::Deserialize, serde::Serialize)]
225# struct StepC;
226# pg_task::task!(Task1 { StepA, StepB });
227# pg_task::task!(Task2 { StepC });
228# #[async_trait]
229# impl Step<Task1> for StepA {
230#     async fn step(self, _db: &PgPool) -> pg_task::StepResult<Task1> {
231#         NextStep::none()
232#     }
233# }
234# #[async_trait]
235# impl Step<Task1> for StepB {
236#     async fn step(self, _db: &PgPool) -> pg_task::StepResult<Task1> {
237#         NextStep::none()
238#     }
239# }
240# #[async_trait]
241# impl Step<Task2> for StepC {
242#     async fn step(self, _db: &PgPool) -> pg_task::StepResult<Task2> {
243#         NextStep::none()
244#     }
245# }
246pg_task::scheduler!(Tasks { Task1, Task2 });
247```
248
249Now we can run the worker:
250
251```rust
252# async fn demo(db: sqlx::PgPool) -> pg_task::Result<()> {
253# use async_trait::async_trait;
254# use pg_task::{NextStep, Step};
255# #[derive(Debug, serde::Deserialize, serde::Serialize)]
256# struct StepA;
257# pg_task::task!(Task1 { StepA });
258# pg_task::scheduler!(Tasks { Task1 });
259# #[async_trait]
260# impl Step<Task1> for StepA {
261#     async fn step(self, _db: &sqlx::PgPool) -> pg_task::StepResult<Task1> {
262#         NextStep::none()
263#     }
264# }
265pg_task::Worker::<Tasks>::new(db).run().await?;
266# Ok(())
267# }
268```
269
270Workers coordinate through Postgres, so you can run one or many of them, either
271in separate processes or with [`tokio::spawn`].
272
273### Stopping Workers
274
275Gracefully stop workers by sending a notification through the database:
276
277```sql
278SELECT pg_notify('pg_task_changed', 'stop_worker');
279```
280
281Workers finish their current steps before exiting. To wait for them, check for
282live leases:
283
284```sql
285SELECT EXISTS(
286    SELECT 1
287    FROM pg_task
288    WHERE lock_expires_at > now()
289);
290```
291
292### Delaying Steps
293
294Sometimes you need to delay the next step. Using [`tokio::time::sleep`] before
295returning the next step creates a couple of issues:
296
297- if the process crashes while sleeping it won't be considered done and will
298  rerun on restart
299- you'd have to wait for the sleeping task to finish on
300  [graceful shutdown](#stopping-workers)
301
302Use [`NextStep::delay`] instead - it schedules the next step with the delay and
303finishes the current one right away.
304
305You can find a runnable example in the [examples/delay.rs][delay-example]
306
307### Retrying Steps
308
309Use [`Step::RETRY_LIMIT`] and [`Step::RETRY_DELAY`] when you need to retry a
310task on errors:
311
312```rust
313# use async_trait::async_trait;
314# use pg_task::{NextStep, Step, StepResult};
315# use serde::{Deserialize, Serialize};
316# use sqlx::PgPool;
317# use std::time::Duration;
318# #[derive(Debug, Deserialize, Serialize)]
319# struct ProcessResult {
320#     result: String,
321# }
322# #[derive(Debug, Deserialize, Serialize)]
323# struct ApiRequest;
324# pg_task::task!(MyTask { ApiRequest, ProcessResult });
325# async fn api_request() -> Result<String, std::io::Error> {
326#     Ok(String::from("ok"))
327# }
328# #[async_trait]
329impl Step<MyTask> for ApiRequest {
330    const RETRY_LIMIT: i32 = 5;
331    const RETRY_DELAY: Duration = Duration::from_secs(5);
332
333    async fn step(self, _db: &PgPool) -> StepResult<MyTask> {
334        let result = api_request().await?;
335        NextStep::now(ProcessResult { result })
336    }
337}
338# #[async_trait]
339# impl Step<MyTask> for ProcessResult {
340#     async fn step(self, _db: &PgPool) -> StepResult<MyTask> {
341#         NextStep::none()
342#     }
343# }
344```
345
346[delay-example]: https://github.com/imbolc/pg_task/blob/main/examples/delay.rs
347[tutorial-example]:
348    https://github.com/imbolc/pg_task/blob/main/examples/tutorial.rs
349*/
350#![forbid(unsafe_code)]
351#![warn(clippy::all, missing_docs, nonstandard_style, future_incompatible)]
352
353mod error;
354mod listener;
355mod macros;
356mod next_step;
357mod task;
358mod task_error_listener;
359mod traits;
360mod util;
361mod worker;
362
363pub use error::{Error, Result, StepError, StepResult};
364pub use next_step::NextStep;
365pub use task_error_listener::{listen_for_task_errors, TaskError, TaskErrorListener};
366pub use traits::{Scheduler, Step};
367pub use worker::Worker;
368
369use chrono::{DateTime, Utc};
370use sqlx::{types::Uuid, PgExecutor};
371use std::time::Duration;
372
373const LOST_CONNECTION_SLEEP: Duration = Duration::from_secs(1);
374
375/// Enqueues the task to be run immediately
376pub async fn enqueue<'e>(db: impl PgExecutor<'e>, task: &impl Scheduler) -> Result<Uuid> {
377    task.enqueue(db).await
378}
379
380/// Schedules a task to be run after a specified delay
381pub async fn delay<'e>(
382    db: impl PgExecutor<'e>,
383    task: &impl Scheduler,
384    delay: Duration,
385) -> Result<Uuid> {
386    task.delay(db, delay).await
387}
388
389/// Schedules a task to run at a specified time in the future
390pub async fn schedule<'e>(
391    db: impl PgExecutor<'e>,
392    task: &impl Scheduler,
393    at: DateTime<Utc>,
394) -> Result<Uuid> {
395    task.schedule(db, at).await
396}