Skip to main content

Crate tokio_worker

Crate tokio_worker 

Source
Expand description

A simple, yet robust crate for managing asynchronous workers using tokio. It allows you to define worker behaviors, dispatch messages to them, and handle their replies asynchronously, all while ensuring graceful shutdown.

§Features

  • Asynchronous Message Passing: Utilize tokio::mpsc and tokio::oneshot channels for efficient communication between your main application and workers.
  • Type-Safe Worker Management: Workers are identified by themself, ensuring strong type guarantees at compile time.
  • Graceful Shutdown: The WorkersManager handles the stopping of all managed workers, allowing them to finish ongoing tasks and perform cleanup.
  • Asynchronous Lifecycle Hooks: Workers can define on_start and on_stop methods to execute asynchronous initialization and cleanup logic.
  • Fire-and-Forget: Send messages without awaiting a reply when the response is not critical.

§How to Use

  1. Define Your Worker: Implement the Worker trait for your custom struct. This trait requires you to specify the WorkerMessage (the message type your worker expects) and WorkerReply (the response type it sends back).
use std::future::Future;
use std::pin::Pin;

use tokio_worker::{ManagerError, WorkerOutput};

#[derive(Default)]
struct Counter {
    counter: u8,
}

#[derive(Debug)]
enum CounterMsg {
    Inc,
    Dec,
    Get,
}

impl tokio_worker::Worker for Counter {
    type WorkerMessage = CounterMsg;
    type WorkerReply = Option<u8>;

    fn call<'a>(&'a mut self, msg: Self::WorkerMessage) -> WorkerOutput<'a, Self::WorkerReply> {
        Box::pin(async move {
            println!("CALL: {msg:?}");
            match msg {
                CounterMsg::Inc => self.counter = self.counter.saturating_add(1),
                CounterMsg::Dec => self.counter = self.counter.saturating_sub(1),
                CounterMsg::Get => return Some(self.counter),
            }
            None
        })
    }

    fn on_start<'a>(&'a mut self) -> WorkerOutput<'a, ()> {
        Box::pin(async move {
            println!("ON START COUNTER");
        })
    }

    fn on_stop<'a>(&'a mut self) -> WorkerOutput<'a, ()> {
        Box::pin(async move {
            println!("ON STOP COUNTER");
        })
    }
}
  1. Manage Workers with WorkersManager: Create an instance of WorkersManager, add your worker(s) to it, and then use send or fire_send to interact with them. Remember to call stop() for graceful shutdown.
use std::future::Future;
use std::pin::Pin;
use tokio_worker::{ManagerError, WorkerOutput, WorkersManager};

#[derive(Default)]
struct Counter {
    counter: u8,
}

#[derive(Debug)]
enum CounterMsg {
    Inc,
    Dec,
    Get,
}

impl tokio_worker::Worker for Counter {
    type WorkerMessage = CounterMsg;
    type WorkerReply = Option<u8>;

    fn call<'a>(&'a mut self, msg: Self::WorkerMessage) -> WorkerOutput<'a, Self::WorkerReply> {
        Box::pin(async move {
            match msg {
                CounterMsg::Inc => self.counter = self.counter.saturating_add(1),
                CounterMsg::Dec => self.counter = self.counter.saturating_sub(1),
                CounterMsg::Get => return Some(self.counter),
            }
            None
        })
    }
}

#[tokio::main]
async fn main() -> Result<(), ManagerError> {
    let mut workers_manager = WorkersManager::new();
    // Add a Counter worker with a channel buffer size of 256
    workers_manager.add_worker(Counter::default(), 256)?;

    // Send an Inc message and don't wait for a reply
    workers_manager
        .fire_send::<Counter>(CounterMsg::Inc)
        .await?;
    workers_manager
        .fire_send::<Counter>(CounterMsg::Inc)
        .await?;
    workers_manager
        .fire_send::<Counter>(CounterMsg::Inc)
        .await?;

    // Send a Get message and await its reply
    let current_count = workers_manager.send::<Counter>(CounterMsg::Get).await?;
    assert_eq!(current_count, Some(3));

    // Send a Dec message and wait for it
    workers_manager.send::<Counter>(CounterMsg::Dec).await?;
    let current_count = workers_manager.send::<Counter>(CounterMsg::Get).await?;
    assert_eq!(current_count, Some(2));

    // Stop all workers gracefully
    workers_manager.stop().await;

    Ok(())
}

§Important Note on Worker Uniqueness

The WorkersManager identifies workers by their types, allowing only one instance of each type to be registered. To handle multiple workers of the same type concurrently, implement a worker pool within the Worker’s call method. Contribution are welcome to support this feature.

§Examples

Check out the examples directory for more examples.

§Contributing

Contributions to tokio_worker are welcome! You can help by opening issues (such as bug reports or feature requests) or submitting patches. All contributions must be submitted through Nostr. For more details on the process, please refer to the CONTRIBUTING.md file. Your support is greatly appreciated!

§License

This project is licensed under the MIT license for more details see the LICENSE file or http://opensource.org/licenses/MIT.

Structs§

WorkerRunner
Manages the lifecycle of a worker, handling message processing.
WorkersManager
Coordinates workers threads and their communication channels.

Enums§

ManagerError
Possible errors encountered while managing workers.

Traits§

Worker
Trait defining the behavior of a worker.

Type Aliases§

WorkerOutput
Represents the worker output