pg_task
FSM-based Resumable Postgres tasks
- FSM-based - each task is a granular state machine
- Resumable - on error, after you fix the step logic or the external world, the task is able to pick up where it stopped
- Postgres - a single table is enough to handle task scheduling, state transitions, and error processing
Table of Contents
Tutorial
The full runnable code is in examples/tutorial.rs.
Defining Tasks
We create a greeter task consisting of two steps:
The first step tries to read a name from a file:
filename- the only state we need in this stepimpl Step<Greeter> for ReadName- our step is a part of aGreetertaskRETRY_LIMIT- the step is fallible, let's retry it a few timesNextStep::now(SayHello { name })- move our task to theSayHellostep right now
The second step prints the greeting and finishes the task returning
NextStep::none().
That's essentially all, except for some boilerplate you can find in the full code. Let's run it:
Investigating Errors
You'll see log messages about the 6 (first try + RETRY_LIMIT) attempts and
the final error message. Let's look into the DB to find out what happened:
|
| }}}
|
|
|
| )
|
|
- a non-null
errorfield indicates that the task has errored and contains the error message - the
stepfield provides you with the information about a particular step and its state when the error occurred
Fixing the World
In this case, the error is due to the external world state. Let's fix it by creating the file:
To rerun the task, we just need to clear its error:
You'll see the log messages about rerunning the task and the greeting message of the final step. That's all 🎉.
Scheduling Tasks
Essentially scheduling a task is done by inserting a corresponding row into
the pg_task table. You can do in by hands from psql or code in any
language.
There's also a few helpers to take care of the first step serialization and time scheduling:
- [
enqueue] - to run the task immediately - [
delay] - to run it with a delay - [
schedule] - to schedule it to a particular time
Running Workers
After defining the steps of each task, we need to
wrap them into enums representing whole tasks via [task!]:
task!;
task!;
One more enum is needed to combine all the possible tasks:
scheduler!;
Now we can run the worker:
new.run.await?;
All the communication is synchronized by the DB, so it doesn't matter how or
how many workers you run. It could be a separate process as well as
in-process [tokio::spawn].
Stopping Workers
You can gracefully stop task runners by sending a notification using the DB:
SELECT pg_notify('pg_task_changed', 'stop_worker');
The workers would wait until the current step of all the tasks is finished and then exit. You can wait for this by checking for the existence of running tasks:
SELECT EXISTS(SELECT 1 FROM pg_task WHERE is_running = true);
Delaying Steps
Sometimes you need to delay the next step. Using [tokio::time::sleep]
before returning the next step creates a couple of issues:
- if the process is crashed while sleeping it wont be considered done and will rerun on restart
- you'd have to wait for the sleeping task to finish on gracefulshutdown
Use [NextStep::delay] instead - it schedules the next step with the delay
and finishes the current one right away.
You can find a runnable example in the examples/delay.rs
Retrying Steps
Use [Step::RETRY_LIMIT] and [Step::RETRY_DELAY] when you need to retry a
task on errors:
Contributing
- please run pre-commit.sh before sending a PR, it will check everything
- don't change the
README.mddirectly, instead change thesrc/lib.rsmodule level comment and run cargo-sync-readme
License
This project is licensed under the MIT license.