Coordinator
Description
Coordinator is a simple library to load balance tasks into task runners that run asynchronously. Each worker added into the coordinator will have a queue to process work unit (or task). Each worker will only process one task at a given time.
You can select which worker to process a task by using the following apis:
TaskPrefs::Any(my_coordinator.any()for#[coordinator]macro): This will tell the coordinator to queue with the most available workerTaskPrefs::Preferred(worker_id)(my_coordinator.prefer(worker_id)for#[coordinator]macro): This will tell the coordinator to queue with worker with idworker_idif it's not currently full, otherwise queue the task with any worker.TaskPrefs::Required(worker_id)(my_coordinator.require(worker_id)for#[coordinator]macro): This will tell the coordinator to queue with worker with idworker_id.
The coordinator will try to find the most available worker using the average task completion time and the number of task in queue of a worker.
Table of Contents
Installation
This crate is available on crates.io. Please visit the link to find the latest version and instructions for installation.
Usage
For full examples, check out playground/examples
// Create a worker that sleeps for 1 sec and return a number that double the input
;
// the queue thershold of a single queue, if the number of task item in queue exceeded the thershold
// any `TaskPref::Preferred(x)` will be processed by a different task processor.
let queue_len = 3;
let b = new;
// Add `Doubler` as task processor
b.add_worker
.await;
// Add a closure as a task processor. Any `FnMut` closure can be used as task processor!
b.add_worker.await;
// Schedule a task for processing. The task will be polled to completion in the worker future
// and not the current future. The `join_handle` can be used to retrieve the returned value
let join_handle = b.run.await.unwrap;
println!;
// Do other works.....
// Wait for the task result
let rs = join_handle.join.await.unwrap.0;
println!;
If your task processors can process different types of tasks (eg: CalculatorProcessor can process both add and subtract tasks), you can use the #[coordinator] attribute macro to avoid needing to define your own input and output enums and manually dispatch them when implementing TaskProcessor
; // implements [`InteractableObject`]
; // implements [`InteractableObject`]
// Type alias for not having to type out this long type every time we use it
type ArcMut<T> = ;
// Instead of implementing the [`TaskProcessor`] trait, we implement the trait generated by `#[coordinator]` instead, this way we don't have to enum dispatch ourself. The trait name will always be `[Name]Processor`
; // Another CatProcessor impl
async
Contributing
We welcome any contributions to this project. Before submitting a pull request, please open an issue to check if someone is already working on the feature.
License
This project is licensed under the MIT License.