Skip to main content

WorkerPool

Struct WorkerPool 

Source
pub struct WorkerPool<Input, Output> { /* private fields */ }
Expand description

Worker pool for parallel record processing with bounded memory

Manages a pool of worker threads that process records in parallel while maintaining deterministic output ordering. Combines SequenceRing for ordered emission with per-worker ScratchBuffers for allocation-free processing.

§Key Features

  • Deterministic output - Records emitted in original input order
  • Bounded memory - Fixed channel capacity prevents unbounded buffering
  • Worker-local buffers - Each worker has dedicated scratch buffers
  • Automatic cleanup - Workers terminated gracefully on shutdown

§Architecture

Input → WorkerPool::submit() → [Worker 1] → SequenceRing → recv_ordered() → Output
                                [Worker 2] ↗            ↘
                                [Worker N] ↗

§Performance Tuning

  • num_workers: Match CPU core count (or 2x for I/O-bound work)
  • channel_capacity: 2-4x worker count for good pipeline depth
  • max_window_size: channel_capacity / 2 to allow processing variance

§Examples

§Basic Usage

use copybook_codec_memory::{WorkerPool, ScratchBuffers};

let mut pool = WorkerPool::new(
    4,   // 4 worker threads
    16,  // 16 records in flight
    8,   // 8 max reorder window
    |input: i32, _scratch: &mut ScratchBuffers| -> i32 {
        input * 2 // Processing function
    },
);

// Submit work
for i in 1..=10 {
    pool.submit(i).unwrap();
}

// Receive results in order
for i in 1..=10 {
    let result = pool.recv_ordered().unwrap().unwrap();
    assert_eq!(result, i * 2);
}

pool.shutdown().unwrap();

§COBOL Record Processing

use copybook_codec_memory::{WorkerPool, ScratchBuffers};
use copybook_codec::{decode_record_with_scratch, DecodeOptions};
use copybook_core::{parse_copybook, Schema};
use std::sync::Arc;

let copybook = "01 RECORD.\n   05 FIELD PIC X(10).";
let schema = Arc::new(parse_copybook(copybook).unwrap());
let options = DecodeOptions::new();

let schema_clone = Arc::clone(&schema);
let mut pool = WorkerPool::new(
    4, 100, 50,
    move |record_data: Vec<u8>, scratch: &mut ScratchBuffers| -> String {
        decode_record_with_scratch(&schema_clone, &record_data, &options, scratch)
            .unwrap()
            .to_string()
    },
);

// Collect records so we know the count
let records: Vec<Vec<u8>> = get_cobol_records();
let num_records = records.len();

// Submit COBOL records for parallel processing
for record in records {
    pool.submit(record).unwrap();
}

// Receive exactly num_records JSON results in order
for _ in 0..num_records {
    let json = pool.recv_ordered().unwrap().unwrap();
    println!("{}", json);
}

pool.shutdown().unwrap();
// Return one EBCDIC record (0xF1 repeated = '1' in EBCDIC)

Implementations§

Source§

impl<Input, Output> WorkerPool<Input, Output>
where Input: Send + 'static, Output: Send + 'static,

Source

pub fn new<F>( num_workers: usize, channel_capacity: usize, max_window_size: usize, worker_fn: F, ) -> Self
where F: Fn(Input, &mut ScratchBuffers) -> Output + Send + Sync + Clone + 'static,

Create a new worker pool

§Arguments
  • num_workers - Number of worker threads
  • channel_capacity - Maximum records in flight
  • max_window_size - Maximum reordering window
  • worker_fn - Function to process each record
Source

pub fn submit( &mut self, input: Input, ) -> Result<(), SendError<SequencedRecord<Input>>>

Submit input for processing

§Errors

Returns an error if the worker channel is disconnected.

Source

pub fn recv_ordered(&mut self) -> Result<Option<Output>, RecvError>

Receive the next processed result in order.

§Errors

Returns an error if the channel is disconnected.

Source

pub fn try_recv_ordered(&mut self) -> Result<Option<Output>, TryRecvError>

Try to receive the next processed result without blocking.

§Errors

Returns an error if the channel is disconnected or would block.

Source

pub fn shutdown(self) -> Result<(), Box<dyn Error + Send + Sync>>

Close the input channel and wait for all workers to finish.

§Errors

Returns an error if any worker thread panicked.

Source

pub fn stats(&self) -> WorkerPoolStats

Get statistics about the worker pool

Trait Implementations§

Source§

impl<Input: Debug, Output: Debug> Debug for WorkerPool<Input, Output>

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

§

impl<Input, Output> Freeze for WorkerPool<Input, Output>

§

impl<Input, Output> !RefUnwindSafe for WorkerPool<Input, Output>

§

impl<Input, Output> Send for WorkerPool<Input, Output>
where Input: Send, Output: Send,

§

impl<Input, Output> Sync for WorkerPool<Input, Output>
where Input: Send, Output: Send + Sync,

§

impl<Input, Output> Unpin for WorkerPool<Input, Output>
where Output: Unpin,

§

impl<Input, Output> UnsafeUnpin for WorkerPool<Input, Output>

§

impl<Input, Output> !UnwindSafe for WorkerPool<Input, Output>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more