Skip to main content

Crate pg_task

Crate pg_task 

Source
Expand description

§pg_task

FSM-based Resumable Postgres tasks

  • FSM-based - each task is a granular state machine
  • Resumable - on error, after you fix the step logic or the external world, the task is able to pick up where it stopped
  • Postgres - a single table handles task scheduling, state transitions, and error processing

§Table of Contents

§Tutorial

The full runnable code is in examples/tutorial.rs.

§Defining Tasks

We create a greeter task consisting of two steps:

#[derive(Debug, Deserialize, Serialize)]
pub struct ReadName {
    filename: String,
}

#[async_trait]
impl Step<Greeter> for ReadName {
    const RETRY_LIMIT: i32 = 5;

    async fn step(self, _db: &PgPool) -> StepResult<Greeter> {
        let name = std::fs::read_to_string(&self.filename)?;
        NextStep::now(SayHello { name })
    }
}

The first step tries to read a name from a file:

  • filename - the only state we need in this step
  • impl Step<Greeter> for ReadName - our step is a part of a Greeter task
  • RETRY_LIMIT - the step is fallible, let’s retry it a few times
  • NextStep::now(SayHello { name }) - move our task to the SayHello step right now
#[derive(Debug, Deserialize, Serialize)]
pub struct SayHello {
    name: String,
}
#[async_trait]
impl Step<Greeter> for SayHello {
    async fn step(self, _db: &PgPool) -> StepResult<Greeter> {
        println!("Hello, {}", self.name);
        NextStep::none()
    }
}

The second step prints the greeting and finishes the task returning NextStep::none().

The full code includes the remaining setup. Run it with:

cargo run --example tutorial

§Investigating Errors

The example logs 6 attempts: the first try plus RETRY_LIMIT retries. Inspect the row to see what happened:

~$ psql pg_task -c 'table pg_task'
-[ RECORD 1 ]------------------------------------------------
id              | cddf7de1-1194-4bee-90c6-af73d9206ce2
step            | {"Greeter":{"ReadName":{"filename":"name.txt"}}}
wakeup_at       | 2024-06-30 09:32:27.703599+06
tried           | 6
locked_by       |
lock_expires_at |
error           | No such file or directory (os error 2)
created_at      | 2024-06-30 09:32:22.628563+06
updated_at      | 2024-06-30 09:32:27.703599+06
  • a non-null error field indicates that the task has errored and contains the error message
  • the step field provides you with the information about a particular step and its state when the error occurred

§Listening for Task Errors

Use listen_for_task_errors to receive live task error events:

let mut errors = pg_task::listen_for_task_errors(&pool).await?;

loop {
    let task = errors.recv().await?;
    eprintln!("task {} failed: {}", task.id, task.error);
}

§Fixing the World

The task failed because the file is missing. Create it:

echo 'Fixed World' >name.txt

Clear error to rerun the task:

psql pg_task -c 'update pg_task set error = null'

The worker reruns the task and prints the greeting from the final step.

§Scheduling Tasks

Scheduling a task means inserting a row into the pg_task table. You can do it from psql or from code in any language.

The crate also provides helpers for first-step serialization and scheduling:

  • enqueue - to run the task immediately
  • delay - to run it with a delay
  • schedule - to schedule it to a particular time

§Running Workers

After defining the steps of each task, we need to wrap them into enums representing whole tasks via task!:

pg_task::task!(Task1 { StepA, StepB });
pg_task::task!(Task2 { StepC });

One more enum is needed to combine all the possible tasks:

pg_task::scheduler!(Tasks { Task1, Task2 });

Now we can run the worker:

pg_task::Worker::<Tasks>::new(db).run().await?;

Workers coordinate through Postgres, so you can run one or many of them, either in separate processes or with tokio::spawn.

§Stopping Workers

Gracefully stop workers by sending a notification through the database:

SELECT pg_notify('pg_task_changed', 'stop_worker');

Workers finish their current steps before exiting. To wait for them, check for live leases:

SELECT EXISTS(
    SELECT 1
    FROM pg_task
    WHERE lock_expires_at > now()
);

§Delaying Steps

Sometimes you need to delay the next step. Using tokio::time::sleep before returning the next step creates a couple of issues:

  • if the process crashes while sleeping it won’t be considered done and will rerun on restart
  • you’d have to wait for the sleeping task to finish on graceful shutdown

Use NextStep::delay instead - it schedules the next step with the delay and finishes the current one right away.

You can find a runnable example in the examples/delay.rs

§Retrying Steps

Use Step::RETRY_LIMIT and Step::RETRY_DELAY when you need to retry a task on errors:

impl Step<MyTask> for ApiRequest {
    const RETRY_LIMIT: i32 = 5;
    const RETRY_DELAY: Duration = Duration::from_secs(5);

    async fn step(self, _db: &PgPool) -> StepResult<MyTask> {
        let result = api_request().await?;
        NextStep::now(ProcessResult { result })
    }
}

Macros§

scheduler
The macro implements the outer enum wrapper containing all the tasks
task
Implements an enum wrapper for a single task containing all its steps

Structs§

TaskError
A task that entered the terminal error state.
TaskErrorListener
A live subscription to task error events.
Worker
A worker for processing tasks

Enums§

Error
The crate error
NextStep
Represents next step of the task

Traits§

Scheduler
A trait to implement on the outer enum wrapper containing all the tasks
Step
A trait to implement on each task step

Functions§

delay
Schedules a task to be run after a specified delay
enqueue
Enqueues the task to be run immediately
listen_for_task_errors
Listens for task errors that happen after this function returns.
schedule
Schedules a task to run at a specified time in the future

Type Aliases§

Result
The crate result
StepError
Error of a task step
StepResult
Result returning from task steps