Fang
Background task processing library for Rust. It uses Postgres DB as a task queue.
Features
- Asynk feature uses
tokio. Workers are started in tokio tasks. - Blocking feature uses
std::thread. Workers are started in a separated threads.
Installation
- Add this to your Cargo.toml
Blocking feature
[]
= { = "0.8" , = ["blocking"], = false }
Asynk feature
[]
= { = "0.8" , = ["asynk"], = false }
Both features
= { = "0.8" }
Supports rustc 1.62+
- Create
fang_taskstable in the Postgres database. The migration can be found in the migrations directory.
Usage
Defining a task
Blocking feature
Every task should implement fang::Runnable trait which is used by fang to execute it.
use Error;
use Runnable;
use typetag;
use PgConnection;
use ;
As you can see from the example above, the trait implementation has #[typetag::serde] attribute which is used to deserialize the task.
The second parameter of the run function is diesel's PgConnection, You can re-use it to manipulate the task queue, for example, to add a new job during the current job's execution. Or you can just re-use it in your own queries if you're using diesel. If you don't need it, just ignore it.
Asynk feature
Every task should implement fang::AsyncRunnable trait which is used by fang to execute it.
Also be careful to not to call with the same name two impl of AsyncRunnable, because will cause a fail with typetag.
use AsyncRunnable;
use AsyncQueueable;
use ;
use async_trait;
Enqueuing a task
Blocking feature
To enqueue a task use Queue::enqueue_task
use Queue;
...
enqueue_task.unwrap;
The example above creates a new postgres connection on every call. If you want to reuse the same postgres connection to enqueue several tasks use Postgres struct instance:
let queue = new;
for id in &unsynced_feed_ids
Or you can use PgConnection struct:
push_task_query.unwrap;
Asynk feature
To enqueue a task use AsyncQueueable::insert_task,
depending of the backend that you prefer you will need to do it with a specific queue.
For Postgres backend.
use AsyncQueue;
use NoTls;
use AsyncRunnable;
// Create a AsyncQueue
let max_pool_size: u32 = 2;
let mut queue = builder
// Postgres database url
.uri
// Max number of connections that are allowed
.max_pool_size
// false if would like Uniqueness in tasks
.duplicated_tasks
.build;
// Always connect first in order to perform any operation
queue.connect.await.unwrap;
For easy example we are using NoTls type, if for some reason you would like to encrypt postgres traffic.
You can implement a Tls type.
It is well documented for openssl and native-tls
// AsyncTask from first example
let task = AsyncTask ;
let task_returned = queue
.insert_task
.await
.unwrap;
Starting workers
Blocking feature
Every worker runs in a separate thread. In case of panic, they are always restarted.
Use WorkerPool to start workers. WorkerPool::new accepts one parameter - the number of workers.
use WorkerPool;
new.start;
Use shutdown to stop worker threads, they will try to finish in-progress tasks.
use WorkerPool;
worker_pool = new.start.unwrap;
worker_pool.shutdown
Using a library like signal-hook, it's possible to gracefully shutdown a worker. See the Simple Worker for an example implementation.
Asynk feature
Every worker runs in a separate tokio task. In case of panic, they are always restarted.
Use AsyncWorkerPool to start workers.
use AsyncWorkerPool;
// Need to create a queue
// Also insert some tasks
let mut pool: = builder
.number_of_workers
.queue
.build;
pool.start.await;
Check out:
- Simple Worker Example - simple worker example
- Simple Async Worker Example - simple async worker example
- El Monitorro - telegram feed reader. It uses Fang to synchronize feeds and deliver updates to users.
Configuration
Blocking feature
To configure workers, instead of WorkerPool::new which uses default values, use WorkerPool.new_with_params. It accepts two parameters - the number of workers and WorkerParams struct.
Asynk feature
Just use TypeBuilder done for AsyncWorkerPool.
Configuring the type of workers
Blocking feature
You can start workers for a specific types of tasks. These workers will be executing only tasks of the specified type.
Add task_type method to the Runnable trait implementation:
...
Set task_type to the WorkerParamas:
let mut worker_params = new;
worker_params.set_task_type;
new_with_params.start;
Without setting task_type workers will be executing any type of task.
Asynk feature
Same as Blocking feature.
Use TypeBuilder for AsyncWorker.
Configuring retention mode
By default, all successfully finished tasks are removed from the DB, failed tasks aren't.
There are three retention modes you can use:
Set retention mode with set_retention_mode:
Blocking feature
let mut worker_params = new;
worker_params.set_retention_mode;
new_with_params.start;
Asynk feature
Set it in AsyncWorker TypeBuilder.
Configuring sleep values
Blocking feature
You can use use SleepParams to confugure sleep values:
If there are no tasks in the DB, a worker sleeps for sleep_period and each time this value increases by sleep_step until it reaches max_sleep_period. min_sleep_period is the initial value for sleep_period. All values are in seconds.
Use set_sleep_params to set it:
let sleep_params = SleepParams ;
let mut worker_params = new;
worker_params.set_sleep_params;
new_with_params.start;
Asynk feature
Set it in AsyncWorker TypeBuilder.
Periodic Tasks
Fang can add tasks to fang_tasks periodically. To use this feature first run the migration with fang_periodic_tasks table.
Usage example:
Blocking feature
use Scheduler;
use Queue;
let queue = new;
queue
.push_periodic_task
.unwrap;
queue
.push_periodic_task
.unwrap;
start;
In the example above, push_periodic_task is used to save the specified task to the fang_periodic_tasks table which will be enqueued (saved to fang_tasks table) every specied number of milliseconds.
Scheduler::start(Duration::from_secs(10), Duration::from_secs(5)) starts scheduler. It accepts two parameters:
- Db check period
- Acceptable error limit - |current_time - scheduled_time| < error
Asynk feature
use Scheduler;
use AsyncQueueable;
use AsyncQueue;
use Duration;
use Duration as OtherDuration;
// Build a AsyncQueue as before
let schedule_in_future = now + seconds;
let _periodic_task = queue.insert_periodic_task
.await;
let check_period: u64 = 1;
let error_margin_seconds: u64 = 2;
let mut scheduler = builder
.check_period
.error_margin_seconds
.queue
.build;
// Add some more task in other thread or before loop
// Scheduler Loop
scheduler.start.await.unwrap;
Contributing
- Fork it!
- Create your feature branch (
git checkout -b my-new-feature) - Commit your changes (
git commit -am 'Add some feature') - Push to the branch (
git push origin my-new-feature) - Create new Pull Request
Running tests locally
cargo install diesel_cli
docker run --rm -d --name postgres -p 5432:5432 \
-e POSTGRES_DB=fang \
-e POSTGRES_USER=postgres \
-e POSTGRES_PASSWORD=postgres \
postgres:latest
DATABASE_URL=postgres://postgres:postgres@localhost/fang diesel migration run
// Run regular tests
cargo test --all-features
// Run dirty/long tests, DB must be recreated afterwards
cargo test --all-features -- --ignored --test-threads=1
docker kill postgres
Authors
-
Ayrat Badykov (@ayrat555)
-
Pepe Márquez (@pxp9)