pub struct WorkerPool<Input, Output> { /* private fields */ }Expand description
Worker pool for parallel record processing with bounded memory
Manages a pool of worker threads that process records in parallel while
maintaining deterministic output ordering. Combines SequenceRing for
ordered emission with per-worker ScratchBuffers for allocation-free
processing.
§Key Features
- Deterministic output - Records emitted in original input order
- Bounded memory - Fixed channel capacity prevents unbounded buffering
- Worker-local buffers - Each worker has dedicated scratch buffers
- Automatic cleanup - Workers terminated gracefully on shutdown
§Architecture
Input → WorkerPool::submit() → [Worker 1] → SequenceRing → recv_ordered() → Output
[Worker 2] ↗ ↘
[Worker N] ↗§Performance Tuning
num_workers: Match CPU core count (or 2x for I/O-bound work)channel_capacity: 2-4x worker count for good pipeline depthmax_window_size:channel_capacity/ 2 to allow processing variance
§Examples
§Basic Usage
use copybook_codec_memory::{WorkerPool, ScratchBuffers};
let mut pool = WorkerPool::new(
4, // 4 worker threads
16, // 16 records in flight
8, // 8 max reorder window
|input: i32, _scratch: &mut ScratchBuffers| -> i32 {
input * 2 // Processing function
},
);
// Submit work
for i in 1..=10 {
pool.submit(i).unwrap();
}
// Receive results in order
for i in 1..=10 {
let result = pool.recv_ordered().unwrap().unwrap();
assert_eq!(result, i * 2);
}
pool.shutdown().unwrap();§COBOL Record Processing
ⓘ
use copybook_codec_memory::{WorkerPool, ScratchBuffers};
use copybook_codec::{decode_record_with_scratch, DecodeOptions};
use copybook_core::{parse_copybook, Schema};
use std::sync::Arc;
let copybook = "01 RECORD.\n 05 FIELD PIC X(10).";
let schema = Arc::new(parse_copybook(copybook).unwrap());
let options = DecodeOptions::new();
let schema_clone = Arc::clone(&schema);
let mut pool = WorkerPool::new(
4, 100, 50,
move |record_data: Vec<u8>, scratch: &mut ScratchBuffers| -> String {
decode_record_with_scratch(&schema_clone, &record_data, &options, scratch)
.unwrap()
.to_string()
},
);
// Collect records so we know the count
let records: Vec<Vec<u8>> = get_cobol_records();
let num_records = records.len();
// Submit COBOL records for parallel processing
for record in records {
pool.submit(record).unwrap();
}
// Receive exactly num_records JSON results in order
for _ in 0..num_records {
let json = pool.recv_ordered().unwrap().unwrap();
println!("{}", json);
}
pool.shutdown().unwrap();
// Return one EBCDIC record (0xF1 repeated = '1' in EBCDIC)Implementations§
Source§impl<Input, Output> WorkerPool<Input, Output>
impl<Input, Output> WorkerPool<Input, Output>
Sourcepub fn new<F>(
num_workers: usize,
channel_capacity: usize,
max_window_size: usize,
worker_fn: F,
) -> Self
pub fn new<F>( num_workers: usize, channel_capacity: usize, max_window_size: usize, worker_fn: F, ) -> Self
Create a new worker pool
§Arguments
num_workers- Number of worker threadschannel_capacity- Maximum records in flightmax_window_size- Maximum reordering windowworker_fn- Function to process each record
Sourcepub fn recv_ordered(&mut self) -> Result<Option<Output>, RecvError>
pub fn recv_ordered(&mut self) -> Result<Option<Output>, RecvError>
Receive the next processed result in order.
§Errors
Returns an error if the channel is disconnected.
Sourcepub fn try_recv_ordered(&mut self) -> Result<Option<Output>, TryRecvError>
pub fn try_recv_ordered(&mut self) -> Result<Option<Output>, TryRecvError>
Try to receive the next processed result without blocking.
§Errors
Returns an error if the channel is disconnected or would block.
Sourcepub fn shutdown(self) -> Result<(), Box<dyn Error + Send + Sync>>
pub fn shutdown(self) -> Result<(), Box<dyn Error + Send + Sync>>
Close the input channel and wait for all workers to finish.
§Errors
Returns an error if any worker thread panicked.
Sourcepub fn stats(&self) -> WorkerPoolStats
pub fn stats(&self) -> WorkerPoolStats
Get statistics about the worker pool
Trait Implementations§
Auto Trait Implementations§
impl<Input, Output> Freeze for WorkerPool<Input, Output>
impl<Input, Output> !RefUnwindSafe for WorkerPool<Input, Output>
impl<Input, Output> Send for WorkerPool<Input, Output>
impl<Input, Output> Sync for WorkerPool<Input, Output>
impl<Input, Output> Unpin for WorkerPool<Input, Output>where
Output: Unpin,
impl<Input, Output> UnsafeUnpin for WorkerPool<Input, Output>
impl<Input, Output> !UnwindSafe for WorkerPool<Input, Output>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more