Expand description
A simple, yet robust crate for managing asynchronous workers using tokio.
It allows you to define worker behaviors, dispatch messages to them, and
handle their replies asynchronously, all while ensuring graceful shutdown.
§Features
- Asynchronous Message Passing: Utilize
tokio::mpscandtokio::oneshotchannels for efficient communication between your main application and workers. - Type-Safe Worker Management: Workers are identified by themself, ensuring strong type guarantees at compile time.
- Graceful Shutdown: The
WorkersManagerhandles the stopping of all managed workers, allowing them to finish ongoing tasks and perform cleanup. - Asynchronous Lifecycle Hooks: Workers can define
on_startandon_stopmethods to execute asynchronous initialization and cleanup logic. - Fire-and-Forget: Send messages without awaiting a reply when the response is not critical.
§How to Use
- Define Your Worker: Implement the
Workertrait for your custom struct. This trait requires you to specify theWorkerMessage(the message type your worker expects) andWorkerReply(the response type it sends back).
use std::future::Future;
use std::pin::Pin;
use tokio_worker::{ManagerError, WorkerOutput};
#[derive(Default)]
struct Counter {
counter: u8,
}
#[derive(Debug)]
enum CounterMsg {
Inc,
Dec,
Get,
}
impl tokio_worker::Worker for Counter {
type WorkerMessage = CounterMsg;
type WorkerReply = Option<u8>;
fn call<'a>(&'a mut self, msg: Self::WorkerMessage) -> WorkerOutput<'a, Self::WorkerReply> {
Box::pin(async move {
println!("CALL: {msg:?}");
match msg {
CounterMsg::Inc => self.counter = self.counter.saturating_add(1),
CounterMsg::Dec => self.counter = self.counter.saturating_sub(1),
CounterMsg::Get => return Some(self.counter),
}
None
})
}
fn on_start<'a>(&'a mut self) -> WorkerOutput<'a, ()> {
Box::pin(async move {
println!("ON START COUNTER");
})
}
fn on_stop<'a>(&'a mut self) -> WorkerOutput<'a, ()> {
Box::pin(async move {
println!("ON STOP COUNTER");
})
}
}- Manage Workers with
WorkersManager: Create an instance ofWorkersManager, add your worker(s) to it, and then usesendorfire_sendto interact with them. Remember to callstop()for graceful shutdown.
use std::future::Future;
use std::pin::Pin;
use tokio_worker::{ManagerError, WorkerOutput, WorkersManager};
#[derive(Default)]
struct Counter {
counter: u8,
}
#[derive(Debug)]
enum CounterMsg {
Inc,
Dec,
Get,
}
impl tokio_worker::Worker for Counter {
type WorkerMessage = CounterMsg;
type WorkerReply = Option<u8>;
fn call<'a>(&'a mut self, msg: Self::WorkerMessage) -> WorkerOutput<'a, Self::WorkerReply> {
Box::pin(async move {
match msg {
CounterMsg::Inc => self.counter = self.counter.saturating_add(1),
CounterMsg::Dec => self.counter = self.counter.saturating_sub(1),
CounterMsg::Get => return Some(self.counter),
}
None
})
}
}
#[tokio::main]
async fn main() -> Result<(), ManagerError> {
let mut workers_manager = WorkersManager::new();
// Add a Counter worker with a channel buffer size of 256
workers_manager.add_worker(Counter::default(), 256)?;
// Send an Inc message and don't wait for a reply
workers_manager
.fire_send::<Counter>(CounterMsg::Inc)
.await?;
workers_manager
.fire_send::<Counter>(CounterMsg::Inc)
.await?;
workers_manager
.fire_send::<Counter>(CounterMsg::Inc)
.await?;
// Send a Get message and await its reply
let current_count = workers_manager.send::<Counter>(CounterMsg::Get).await?;
assert_eq!(current_count, Some(3));
// Send a Dec message and wait for it
workers_manager.send::<Counter>(CounterMsg::Dec).await?;
let current_count = workers_manager.send::<Counter>(CounterMsg::Get).await?;
assert_eq!(current_count, Some(2));
// Stop all workers gracefully
workers_manager.stop().await;
Ok(())
}§Important Note on Worker Uniqueness
The WorkersManager identifies workers by their types, allowing only one
instance of each type to be registered. To handle multiple workers of the
same type concurrently, implement a worker pool within the Worker’s call
method. Contribution are welcome to support this feature.
§Examples
Check out the examples directory for more examples.
§Contributing
Contributions to tokio_worker are welcome! You can help by opening issues
(such as bug reports or feature requests) or submitting patches. All
contributions must be submitted through Nostr. For more details on the
process, please refer to the CONTRIBUTING.md file. Your support is greatly
appreciated!
§License
This project is licensed under the MIT license for more details see the LICENSE file or http://opensource.org/licenses/MIT.
Structs§
- Worker
Runner - Manages the lifecycle of a worker, handling message processing.
- Workers
Manager - Coordinates workers threads and their communication channels.
Enums§
- Manager
Error - Possible errors encountered while managing workers.
Traits§
- Worker
- Trait defining the behavior of a worker.
Type Aliases§
- Worker
Output - Represents the worker output