Fang
Background task processing library for Rust. It uses Postgres DB as a task queue.
Features
- Asynk feature uses
tokio. Workers are started in tokio tasks. - Blocking feature uses
std::thread. Workers are started in a separated threads.
Installation
- Add this to your Cargo.toml
Blocking feature
[]
= { = "0.9" , = ["blocking"], = false }
Asynk feature
[]
= { = "0.9" , = ["asynk"], = false }
Both features
= { = "0.9" }
Supports rustc 1.62+
- Create
fang_taskstable in the Postgres database. The migration can be found in the migrations directory.
Usage
Defining a task
Blocking feature
Every task should implement fang::Runnable trait which is used by fang to execute it.
use Error;
use Runnable;
use typetag;
use PgConnection;
use ;
As you can see from the example above, the trait implementation has #[typetag::serde] attribute which is used to deserialize the task.
The second parameter of the run function is a is an struct that implements fang::Queueable (fang::Queue for example), You can re-use it to manipulate the task queue, for example, to add a new job during the current job's execution. If you don't need it, just ignore it.
Asynk feature
Every task should implement fang::AsyncRunnable trait which is used by fang to execute it.
Also be careful to not to call with the same name two impl of AsyncRunnable, because will cause a fail with typetag.
use AsyncRunnable;
use AsyncQueueable;
use ;
use async_trait;
In both modules, tasks can be schedule to be execute once. Use Scheduled::ScheduleOnce enum variant to schedule in specific datetime.
Datetimes and cron pattern are interpreted in UTC timezone. So you should introduce an offset to schedule in the desire hour.
Example:
If your hour is UTC + 2 and you would like to schedule at 11:00 all days, your expression will be this one.
let expression = "0 0 9 * * * *";
Enqueuing a task
Blocking feature
To enqueue a task use Queue::enqueue_task
For Postgres Backend.
use Queue;
// create a r2d2 pool
// create a fang queue
let queue = builder.connection_pool.build;
let task_inserted = queue.insert_task.unwrap;
Queue::insert_task method will insert a task with uniqueness or not it depends on uniq method defined in a task.
If uniq is set to true and the task is already in storage this will return the task in the storage.
Asynk feature
To enqueue a task use AsyncQueueable::insert_task,
depending of the backend that you prefer you will need to do it with a specific queue.
For Postgres backend.
use AsyncQueue;
use NoTls;
use AsyncRunnable;
// Create a AsyncQueue
let max_pool_size: u32 = 2;
let mut queue = builder
// Postgres database url
.uri
// Max number of connections that are allowed
.max_pool_size
.build;
// Always connect first in order to perform any operation
queue.connect.await.unwrap;
For easy example we are using NoTls type, if for some reason you would like to encrypt postgres traffic.
You can implement a Tls type.
It is well documented for openssl and native-tls
// AsyncTask from first example
let task = AsyncTask ;
let task_returned = queue
.insert_task
.await
.unwrap;
Starting workers
Blocking feature
Every worker runs in a separate thread. In case of panic, they are always restarted.
Use WorkerPool to start workers. Use WorkerPool::builder to create your worker pool and run tasks.
use WorkerPool;
use Queue;
// create a Queue
let mut worker_pool = builder
.queue
.number_of_workers
// if you want to run tasks of the specific kind
.task_type
.build;
worker_pool.start;
Asynk feature
Every worker runs in a separate tokio task. In case of panic, they are always restarted.
Use AsyncWorkerPool to start workers.
use AsyncWorkerPool;
// Need to create a queue
// Also insert some tasks
let mut pool: = builder
.number_of_workers
.queue
// if you want to run tasks of the specific kind
.task_type
.build;
pool.start.await;
Check out:
- Simple Worker Example - simple worker example
- Simple Cron Worker Example - simple worker example
- Simple Async Worker Example - simple async worker example
- Simple Cron Async Worker Example - simple async worker example
- El Monitorro - telegram feed reader. It uses Fang blocking module to synchronize feeds and deliver updates to users.
- weather_bot_rust - A bot that provides weather info. It uses Fang asynk module to process updates from Telegram users and schedule weather info.
Configuration
Blocking feature
Just use TypeBuilder done for WorkerPool.
Asynk feature
Just use TypeBuilder done for AsyncWorkerPool.
Configuring the type of workers
Configuring retention mode
By default, all successfully finished tasks are removed from the DB, failed tasks aren't.
There are three retention modes you can use:
Set retention mode with worker pools TypeBuilder in both modules.
Configuring sleep values
Blocking feature
You can use use SleepParams to confugure sleep values:
If there are no tasks in the DB, a worker sleeps for sleep_period and each time this value increases by sleep_step until it reaches max_sleep_period. min_sleep_period is the initial value for sleep_period. All values are in seconds.
Use set_sleep_params to set it:
let sleep_params = SleepParams ;
Set sleep params with worker pools TypeBuilder in both modules.
Contributing
- Fork it!
- Create your feature branch (
git checkout -b my-new-feature) - Commit your changes (
git commit -am 'Add some feature') - Push to the branch (
git push origin my-new-feature) - Create new Pull Request
Running tests locally
- Install diesel_cli.
cargo install diesel_cli
-
Install docker in your machine.
-
Run a Postgres docker container. (See in Makefile.)
make db
- Run the migrations
make diesel
- Run tests
make tests
- Run dirty//long tests, DB must be recreated afterwards.
make ignored
- Kill docker container
make stop
Authors
-
Ayrat Badykov (@ayrat555)
-
Pepe Márquez (@pxp9)