A simple, yet robust crate for managing asynchronous workers using tokio.
It allows you to define worker behaviors, dispatch messages to them, and
handle their replies asynchronously, all while ensuring graceful shutdown.
Features
- Asynchronous Message Passing: Utilize
tokio::mpsc and
tokio::oneshot channels for efficient communication between your main
application and workers.
- Type-Safe Worker Management: Workers are identified by themself,
ensuring strong type guarantees at compile time.
- Graceful Shutdown: The
WorkersManager handles the stopping of all
managed workers, allowing them to finish ongoing tasks and perform
cleanup.
- Asynchronous Lifecycle Hooks: Workers can define
on_start and
on_stop methods to execute asynchronous initialization and cleanup
logic.
- Fire-and-Forget: Send messages without awaiting a reply when the
response is not critical.
How to Use
- Define Your Worker: Implement the
Worker trait for your custom
struct. This trait requires you to specify the WorkerMessage (the
message type your worker expects) and WorkerReply (the response type it
sends back).
use std::future::Future;
use std::pin::Pin;
use tokio_worker::ManagerError;
#[derive(Default)]
struct Counter {
counter: u8,
}
#[derive(Debug)]
enum CounterMsg {
Inc,
Dec,
Get,
}
impl tokio_worker::Worker for Counter {
type WorkerMessage = CounterMsg;
type WorkerReply = Option<u8>;
fn call<'a>(
&'a mut self,
msg: Self::WorkerMessage,
) -> Pin<Box<dyn Future<Output = Self::WorkerReply> + Send + 'a>> {
Box::pin(async move {
println!("CALL: {msg:?}");
match msg {
CounterMsg::Inc => self.counter = self.counter.saturating_add(1),
CounterMsg::Dec => self.counter = self.counter.saturating_sub(1),
CounterMsg::Get => return Some(self.counter),
}
None
})
}
fn on_start<'a>(&'a mut self) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
Box::pin(async move {
println!("ON START COUNTER");
})
}
fn on_stop<'a>(&'a mut self) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
Box::pin(async move {
println!("ON STOP COUNTER");
})
}
}
- Manage Workers with
WorkersManager: Create an instance of
WorkersManager, add your worker(s) to it, and then use send or
fire_send to interact with them. Remember to call stop() for graceful
shutdown.
use std::future::Future;
use std::pin::Pin;
use tokio_worker::{ManagerError, WorkersManager};
#[derive(Default)]
struct Counter {
counter: u8,
}
#[derive(Debug)]
enum CounterMsg {
Inc,
Dec,
Get,
}
impl tokio_worker::Worker for Counter {
type WorkerMessage = CounterMsg;
type WorkerReply = Option<u8>;
fn call<'a>(
&'a mut self,
msg: Self::WorkerMessage,
) -> Pin<Box<dyn Future<Output = Self::WorkerReply> + Send + 'a>> {
Box::pin(async move {
match msg {
CounterMsg::Inc => self.counter = self.counter.saturating_add(1),
CounterMsg::Dec => self.counter = self.counter.saturating_sub(1),
CounterMsg::Get => return Some(self.counter),
}
None
})
}
}
#[tokio::main]
async fn main() -> Result<(), ManagerError> {
let mut workers_manager = WorkersManager::new();
workers_manager.add_worker(Counter::default(), 256)?;
workers_manager
.fire_send::<Counter>(CounterMsg::Inc)
.await?;
workers_manager
.fire_send::<Counter>(CounterMsg::Inc)
.await?;
workers_manager
.fire_send::<Counter>(CounterMsg::Inc)
.await?;
let current_count = workers_manager.send::<Counter>(CounterMsg::Get).await?;
assert_eq!(current_count, Some(3));
workers_manager.send::<Counter>(CounterMsg::Dec).await?;
let current_count = workers_manager.send::<Counter>(CounterMsg::Get).await?;
assert_eq!(current_count, Some(2));
workers_manager.stop().await;
Ok(())
}
Important Note on Worker Uniqueness
The WorkersManager identifies workers by their types, allowing only one
instance of each type to be registered. To handle multiple workers of the
same type concurrently, implement a worker pool within the Worker's call
method. Contribution are welcome to support this feature.
License
This project is licensed under the MIT license for more details see the LICENSE file or
http://opensource.org/licenses/MIT.