Skip to main content

Module worker_pool

Module worker_pool 

Source
Available on crate feature worker only.
Expand description

Worker pool implementations for concurrent message processing.

This module provides two types of worker pools:

  • WorkerPoolUnbounded: Uses an unbounded channel for message submission. Best for scenarios where you want to submit messages without blocking and let the pool handle scaling based on queue length.

  • WorkerPoolBounded: Uses a bounded channel with a fixed capacity. Best for scenarios where you want backpressure - submissions will block when the queue is full, preventing memory exhaustion.

§Common Features

Both pool types share these features:

  • Use provide their Worker object as template, new spawn Worker is produced via Cloned.

  • Async and Blocking Workers: Support for both WorkerAsync and WorkerBlocking implementations.

  • Dynamic Scaling: When max_workers > min_workers, the pool automatically scales up workers based on workload and scales down idle workers after a timeout.

  • Builder Pattern: Use WorkerPoolBuilder to configure pool parameters like worker counts, timeouts, and spawn intervals.

  • init and exit hook: you may custom actions hook when worker being spawn or exit.

§Quick Start

use orb::worker_pool::{Worker, WorkerBlocking, WorkerPoolBounded};
use std::time::Duration;

#[derive(Clone)]
struct MyWorker;

#[derive(Clone)]
struct MyMsg { value: u32 }

impl Worker for MyWorker {
    type Msg = MyMsg;
}

impl WorkerBlocking for MyWorker {
    fn run(&self, msg: Self::Msg) {
        println!("Processing: {}", msg.value);
    }
}

// Create a bounded pool with 2-8 workers
let pool = WorkerPoolBounded::builder(MyWorker, 2)
    .max_workers(8)
    .timeout(Duration::from_secs(5))
    .new_blocking(100); // channel capacity of 100

for i in 0..100 {
    pool.submit(MyMsg { value: i });
}

§Worker Traits

Implement WorkerAsync for async/await-based processing:

use orb::worker_pool::{Worker, WorkerAsync};

#[derive(Clone)]
struct AsyncWorker;

#[derive(Clone)]
struct Msg { data: String }

impl Worker for AsyncWorker {
    type Msg = Msg;
}

impl WorkerAsync for AsyncWorker {
    async fn run(&self, msg: Self::Msg) {
        // Async processing logic
        println!("Processing: {}", msg.data);
    }
}

Implement WorkerBlocking for CPU-bound or blocking operations:

use orb::worker_pool::{Worker, WorkerBlocking};

#[derive(Clone)]
struct BlockingWorker;

#[derive(Clone)]
struct Msg { data: Vec<u8> }

impl Worker for BlockingWorker {
    type Msg = Msg;
}

impl WorkerBlocking for BlockingWorker {
    fn run(&self, msg: Self::Msg) {
        // Blocking processing logic
        println!("Processing {} bytes", msg.data.len());
    }
}

Structs§

WorkerPoolBounded
A bounded worker pool that uses a fixed-size channel for message submission.
WorkerPoolBuilder
A builder for configuring and creating worker pools.
WorkerPoolUnbounded
A worker pool with an unbounded channel for message submission.

Traits§

Worker
WorkerAsync
WorkerBlocking