Expand description
§pg_task
FSM-based Resumable Postgres tasks
- FSM-based - each task is a granular state machine
- Resumable - on error, after you fix the step logic or the external world, the task is able to pick up where it stopped
- Postgres - a single table handles task scheduling, state transitions, and error processing
§Table of Contents
§Tutorial
The full runnable code is in examples/tutorial.rs.
§Defining Tasks
We create a greeter task consisting of two steps:
#[derive(Debug, Deserialize, Serialize)]
pub struct ReadName {
filename: String,
}
#[async_trait]
impl Step<Greeter> for ReadName {
const RETRY_LIMIT: i32 = 5;
async fn step(self, _db: &PgPool) -> StepResult<Greeter> {
let name = std::fs::read_to_string(&self.filename)?;
NextStep::now(SayHello { name })
}
}The first step tries to read a name from a file:
filename- the only state we need in this stepimpl Step<Greeter> for ReadName- our step is a part of aGreetertaskRETRY_LIMIT- the step is fallible, let’s retry it a few timesNextStep::now(SayHello { name })- move our task to theSayHellostep right now
#[derive(Debug, Deserialize, Serialize)]
pub struct SayHello {
name: String,
}
#[async_trait]
impl Step<Greeter> for SayHello {
async fn step(self, _db: &PgPool) -> StepResult<Greeter> {
println!("Hello, {}", self.name);
NextStep::none()
}
}The second step prints the greeting and finishes the task returning
NextStep::none().
The full code includes the remaining setup. Run it with:
cargo run --example tutorial§Investigating Errors
The example logs 6 attempts: the first try plus RETRY_LIMIT retries. Inspect
the row to see what happened:
~$ psql pg_task -c 'table pg_task'
-[ RECORD 1 ]------------------------------------------------
id | cddf7de1-1194-4bee-90c6-af73d9206ce2
step | {"Greeter":{"ReadName":{"filename":"name.txt"}}}
wakeup_at | 2024-06-30 09:32:27.703599+06
tried | 6
locked_by |
lock_expires_at |
error | No such file or directory (os error 2)
created_at | 2024-06-30 09:32:22.628563+06
updated_at | 2024-06-30 09:32:27.703599+06- a non-null
errorfield indicates that the task has errored and contains the error message - the
stepfield provides you with the information about a particular step and its state when the error occurred
§Listening for Task Errors
Use listen_for_task_errors to receive live task error events:
let mut errors = pg_task::listen_for_task_errors(&pool).await?;
loop {
let task = errors.recv().await?;
eprintln!("task {} failed: {}", task.id, task.error);
}§Fixing the World
The task failed because the file is missing. Create it:
echo 'Fixed World' >name.txtClear error to rerun the task:
psql pg_task -c 'update pg_task set error = null'The worker reruns the task and prints the greeting from the final step.
§Scheduling Tasks
Scheduling a task means inserting a row into the pg_task table. You can do it
from psql or from code in any language.
The crate also provides helpers for first-step serialization and scheduling:
enqueue- to run the task immediatelydelay- to run it with a delayschedule- to schedule it to a particular time
§Running Workers
After defining the steps of each task, we need to wrap them
into enums representing whole tasks via task!:
pg_task::task!(Task1 { StepA, StepB });
pg_task::task!(Task2 { StepC });One more enum is needed to combine all the possible tasks:
pg_task::scheduler!(Tasks { Task1, Task2 });Now we can run the worker:
pg_task::Worker::<Tasks>::new(db).run().await?;Workers coordinate through Postgres, so you can run one or many of them, either
in separate processes or with tokio::spawn.
§Stopping Workers
Gracefully stop workers by sending a notification through the database:
SELECT pg_notify('pg_task_changed', 'stop_worker');Workers finish their current steps before exiting. To wait for them, check for live leases:
SELECT EXISTS(
SELECT 1
FROM pg_task
WHERE lock_expires_at > now()
);§Delaying Steps
Sometimes you need to delay the next step. Using tokio::time::sleep before
returning the next step creates a couple of issues:
- if the process crashes while sleeping it won’t be considered done and will rerun on restart
- you’d have to wait for the sleeping task to finish on graceful shutdown
Use NextStep::delay instead - it schedules the next step with the delay and
finishes the current one right away.
You can find a runnable example in the examples/delay.rs
§Retrying Steps
Use Step::RETRY_LIMIT and Step::RETRY_DELAY when you need to retry a
task on errors:
impl Step<MyTask> for ApiRequest {
const RETRY_LIMIT: i32 = 5;
const RETRY_DELAY: Duration = Duration::from_secs(5);
async fn step(self, _db: &PgPool) -> StepResult<MyTask> {
let result = api_request().await?;
NextStep::now(ProcessResult { result })
}
}Macros§
- scheduler
- The macro implements the outer enum wrapper containing all the tasks
- task
- Implements an enum wrapper for a single task containing all its steps
Structs§
- Task
Error - A task that entered the terminal error state.
- Task
Error Listener - A live subscription to task error events.
- Worker
- A worker for processing tasks
Enums§
Traits§
- Scheduler
- A trait to implement on the outer enum wrapper containing all the tasks
- Step
- A trait to implement on each task step
Functions§
- delay
- Schedules a task to be run after a specified delay
- enqueue
- Enqueues the task to be run immediately
- listen_
for_ task_ errors - Listens for task errors that happen after this function returns.
- schedule
- Schedules a task to run at a specified time in the future
Type Aliases§
- Result
- The crate result
- Step
Error - Error of a task step
- Step
Result - Result returning from task steps