A simple, yet robust crate for managing asynchronous workers using `tokio`.
It allows you to define worker behaviors, dispatch messages to them, and
handle their replies asynchronously, all while ensuring graceful shutdown.
# Features
- **Asynchronous Message Passing:** Utilize `tokio::mpsc` and
`tokio::oneshot` channels for efficient communication between your main
application and workers.
- **Type-Safe Worker Management:** Workers are identified by themself,
ensuring strong type guarantees at compile time.
- **Graceful Shutdown:** The `WorkersManager` handles the stopping of all
managed workers, allowing them to finish ongoing tasks and perform
cleanup.
- **Asynchronous Lifecycle Hooks:** Workers can define `on_start` and
`on_stop` methods to execute asynchronous initialization and cleanup
logic.
- **Fire-and-Forget:** Send messages without awaiting a reply when the
response is not critical.
# How to Use
1. **Define Your Worker:** Implement the `Worker` trait for your custom
struct. This trait requires you to specify the `WorkerMessage` (the
message type your worker expects) and `WorkerReply` (the response type it
sends back).
```rust
use std::future::Future;
use std::pin::Pin;
use tokio_worker::{ManagerError, WorkerOutput};
#[derive(Default)]
struct Counter {
counter: u8,
}
#[derive(Debug)]
enum CounterMsg {
Inc,
Dec,
Get,
}
impl tokio_worker::Worker for Counter {
type WorkerMessage = CounterMsg;
type WorkerReply = Option<u8>;
fn call<'a>(&'a mut self, msg: Self::WorkerMessage) -> WorkerOutput<'a, Self::WorkerReply> {
Box::pin(async move {
println!("CALL: {msg:?}");
match msg {
CounterMsg::Inc => self.counter = self.counter.saturating_add(1),
CounterMsg::Dec => self.counter = self.counter.saturating_sub(1),
CounterMsg::Get => return Some(self.counter),
}
None
})
}
fn on_start<'a>(&'a mut self) -> WorkerOutput<'a, ()> {
Box::pin(async move {
println!("ON START COUNTER");
})
}
fn on_stop<'a>(&'a mut self) -> WorkerOutput<'a, ()> {
Box::pin(async move {
println!("ON STOP COUNTER");
})
}
}
```
2. **Manage Workers with `WorkersManager`:** Create an instance of
`WorkersManager`, add your worker(s) to it, and then use `send` or
`fire_send` to interact with them. Remember to call `stop()` for graceful
shutdown.
```rust
use std::future::Future;
use std::pin::Pin;
use tokio_worker::{ManagerError, WorkerOutput, WorkersManager};
#[derive(Default)]
struct Counter {
counter: u8,
}
#[derive(Debug)]
enum CounterMsg {
Inc,
Dec,
Get,
}
impl tokio_worker::Worker for Counter {
type WorkerMessage = CounterMsg;
type WorkerReply = Option<u8>;
fn call<'a>(&'a mut self, msg: Self::WorkerMessage) -> WorkerOutput<'a, Self::WorkerReply> {
Box::pin(async move {
match msg {
CounterMsg::Inc => self.counter = self.counter.saturating_add(1),
CounterMsg::Dec => self.counter = self.counter.saturating_sub(1),
CounterMsg::Get => return Some(self.counter),
}
None
})
}
}
#[tokio::main]
async fn main() -> Result<(), ManagerError> {
let mut workers_manager = WorkersManager::new();
// Add a Counter worker with a channel buffer size of 256
workers_manager.add_worker(Counter::default(), 256)?;
// Send an Inc message and don't wait for a reply
workers_manager
.fire_send::<Counter>(CounterMsg::Inc)
.await?;
workers_manager
.fire_send::<Counter>(CounterMsg::Inc)
.await?;
workers_manager
.fire_send::<Counter>(CounterMsg::Inc)
.await?;
// Send a Get message and await its reply
let current_count = workers_manager.send::<Counter>(CounterMsg::Get).await?;
assert_eq!(current_count, Some(3));
// Send a Dec message and wait for it
workers_manager.send::<Counter>(CounterMsg::Dec).await?;
let current_count = workers_manager.send::<Counter>(CounterMsg::Get).await?;
assert_eq!(current_count, Some(2));
// Stop all workers gracefully
workers_manager.stop().await;
Ok(())
}
```
## Important Note on Worker Uniqueness
The `WorkersManager` identifies workers by their types, allowing only one
instance of each type to be registered. To handle multiple workers of the
same type concurrently, implement a worker pool within the `Worker`'s `call`
method. Contribution are welcome to support this feature.
## Examples
Check out the `examples` directory for more examples.
## Contributing
Contributions to `tokio_worker` are welcome! You can help by opening issues
(such as bug reports or feature requests) or submitting patches. **All
contributions must be submitted through Nostr**. For more details on the
process, please refer to the CONTRIBUTING.md file. Your support is greatly
appreciated!
## License
This project is licensed under the MIT license for more details see the LICENSE file or
<http://opensource.org/licenses/MIT>.