Crate pg_task

source ·
Expand description

§pg_task

License Crates.io Docs.rs

FSM-based Resumable Postgres tasks

  • FSM-based - each task is a granular state machine
  • Resumable - on error, after you fix the step logic or the external world, the task is able to pick up where it stopped
  • Postgres - a single table is enough to handle task scheduling, state transitions, and error processing

§Table of Contents

§Tutorial

The full runnable code is in examples/tutorial.rs.

§Defining Tasks

We create a greeter task consisting of two steps:

#[derive(Debug, Deserialize, Serialize)]
pub struct ReadName {
    filename: String,
}

#[async_trait]
impl Step<Greeter> for ReadName {
    const RETRY_LIMIT: i32 = 5;

    async fn step(self, _db: &PgPool) -> StepResult<Greeter> {
        let name = std::fs::read_to_string(&self.filename)?;
        NextStep::now(SayHello { name })
    }
}

The first step tries to read a name from a file:

  • filename - the only state we need in this step
  • impl Step<Greeter> for ReadName - our step is a part of a Greeter task
  • RETRY_LIMIT - the step is fallible, let’s retry it a few times
  • NextStep::now(SayHello { name }) - move our task to the SayHello step right now
#[derive(Debug, Deserialize, Serialize)]
pub struct SayHello {
    name: String,
}
#[async_trait]
impl Step<Greeter> for SayHello {
    async fn step(self, _db: &PgPool) -> StepResult<Greeter> {
        println!("Hello, {}", self.name);
        NextStep::none()
    }
}

The second step prints the greeting and finishes the task returning NextStep::none().

That’s essentially all, except for some boilerplate you can find in the full code. Let’s run it:

cargo run --example hello

§Investigating Errors

You’ll see log messages about the 6 (first try + RETRY_LIMIT) attempts and the final error message. Let’s look into the DB to find out what happened:

~$ psql pg_task -c 'table pg_task'
-[ RECORD 1 ]------------------------------------------------
id         | cddf7de1-1194-4bee-90c6-af73d9206ce2
step       | {"Greeter":{"ReadName":{"filename":"name.txt"}}}
wakeup_at  | 2024-06-30 09:32:27.703599+06
tried      | 6
is_running | f
error      | No such file or directory (os error 2)
created_at | 2024-06-30 09:32:22.628563+06
updated_at | 2024-06-30 09:32:27.703599+06
  • a non-null error field indicates that the task has errored and contains the error message
  • the step field provides you with the information about a particular step and its state when the error occurred

§Fixing the World

In this case, the error is due to the external world state. Let’s fix it by creating the file:

echo 'Fixed World' > name.txt

To rerun the task, we just need to clear its error:

psql pg_task -c 'update pg_task set error = null'

You’ll see the log messages about rerunning the task and the greeting message of the final step. That’s all 🎉.

§Scheduling Tasks

Essentially scheduling a task is done by inserting a corresponding row into the pg_task table. You can do in by hands from psql or code in any language.

There’s also a few helpers to take care of the first step serialization and time scheduling:

  • enqueue - to run the task immediately
  • delay - to run it with a delay
  • schedule - to schedule it to a particular time

§Running Workers

After defining the steps of each task, we need to wrap them into enums representing whole tasks via task!:

pg_task::task!(Task1 { StepA, StepB });
pg_task::task!(Task2 { StepC });

One more enum is needed to combine all the possible tasks:

pg_task::scheduler!(Tasks { Task1, Task2 });

Now we can run the worker:

pg_task::Worker::<Tasks>::new(db).run().await?;

All the communication is synchronized by the DB, so it doesn’t matter how or how many workers you run. It could be a separate process as well as in-process tokio::spawn.

§Stopping Workers

You can gracefully stop task runners by sending a notification using the DB:

SELECT pg_notify('pg_task_changed', 'stop_worker');

The workers would wait until the current step of all the tasks is finished and then exit. You can wait for this by checking for the existence of running tasks:

SELECT EXISTS(SELECT 1 FROM pg_task WHERE is_running = true);

§Delaying Steps

Sometimes you need to delay the next step. Using tokio::time::sleep before returning the next step creates a couple of issues:

  • if the process is crashed while sleeping it wont be considered done and will rerun on restart
  • you’d have to wait for the sleeping task to finish on gracefulshutdown

Use NextStep::delay instead - it schedules the next step with the delay and finishes the current one right away.

You can find a runnable example in the examples/delay.rs

§Retrying Steps

Use Step::RETRY_LIMIT and Step::RETRY_DELAY when you need to retry a task on errors:

impl Step<MyTask> for ApiRequest {
    const RETRY_LIMIT: i32 = 5;
    const RETRY_DELAY: Duration = Duration::from_secs(5);

    async fn step(self, _db: &PgPool) -> StepResult<MyTask> {
        let result = api_request().await?;
        NextStep::now(ProcessResult { result })
    }
}

§Contributing

  • please run .pre-commit.sh before sending a PR, it will check everything

§License

This project is licensed under the MIT license.

Macros§

  • The macro implements the outer enum wrapper containing all the tasks
  • Implements enum wrapper for a single task containing all it’s steps

Structs§

  • A worker for processing tasks

Enums§

Traits§

  • A tait to implement on the outer enum wrapper containing all the tasks
  • A tait to implement on each task step

Functions§

  • Schedules a task to be run after a specified delay
  • Enqueues the task to be run immediately
  • Schedules a task to run at a specified time in the future

Type Aliases§