copybook_codec_memory/worker_pool.rs
1// SPDX-License-Identifier: AGPL-3.0-or-later
2use copybook_sequence_ring::{SequenceRing, SequenceRingStats, SequencedRecord};
3use crossbeam_channel::{Sender, bounded};
4use std::sync::Arc;
5use std::thread;
6use tracing::{debug, warn};
7
8use super::ScratchBuffers;
9
10/// Worker pool for parallel record processing with bounded memory
11///
12/// Manages a pool of worker threads that process records in parallel while
13/// maintaining deterministic output ordering. Combines [`SequenceRing`] for
14/// ordered emission with per-worker [`ScratchBuffers`] for allocation-free
15/// processing.
16///
17/// # Key Features
18///
19/// - **Deterministic output** - Records emitted in original input order
20/// - **Bounded memory** - Fixed channel capacity prevents unbounded buffering
21/// - **Worker-local buffers** - Each worker has dedicated scratch buffers
22/// - **Automatic cleanup** - Workers terminated gracefully on shutdown
23///
24/// # Architecture
25///
26/// ```text
27/// Input → WorkerPool::submit() → [Worker 1] → SequenceRing → recv_ordered() → Output
28/// [Worker 2] ↗ ↘
29/// [Worker N] ↗
30/// ```
31///
32/// # Performance Tuning
33///
34/// - **`num_workers`**: Match CPU core count (or 2x for I/O-bound work)
35/// - **`channel_capacity`**: 2-4x worker count for good pipeline depth
36/// - **`max_window_size`**: `channel_capacity` / 2 to allow processing variance
37///
38/// # Examples
39///
40/// ## Basic Usage
41///
42/// ```rust
43/// use copybook_codec_memory::{WorkerPool, ScratchBuffers};
44///
45/// let mut pool = WorkerPool::new(
46/// 4, // 4 worker threads
47/// 16, // 16 records in flight
48/// 8, // 8 max reorder window
49/// |input: i32, _scratch: &mut ScratchBuffers| -> i32 {
50/// input * 2 // Processing function
51/// },
52/// );
53///
54/// // Submit work
55/// for i in 1..=10 {
56/// pool.submit(i).unwrap();
57/// }
58///
59/// // Receive results in order
60/// for i in 1..=10 {
61/// let result = pool.recv_ordered().unwrap().unwrap();
62/// assert_eq!(result, i * 2);
63/// }
64///
65/// pool.shutdown().unwrap();
66/// ```
67///
68/// ## COBOL Record Processing
69///
70/// ```ignore
71/// use copybook_codec_memory::{WorkerPool, ScratchBuffers};
72/// use copybook_codec::{decode_record_with_scratch, DecodeOptions};
73/// use copybook_core::{parse_copybook, Schema};
74/// use std::sync::Arc;
75///
76/// let copybook = "01 RECORD.\n 05 FIELD PIC X(10).";
77/// let schema = Arc::new(parse_copybook(copybook).unwrap());
78/// let options = DecodeOptions::new();
79///
80/// let schema_clone = Arc::clone(&schema);
81/// let mut pool = WorkerPool::new(
82/// 4, 100, 50,
83/// move |record_data: Vec<u8>, scratch: &mut ScratchBuffers| -> String {
84/// decode_record_with_scratch(&schema_clone, &record_data, &options, scratch)
85/// .unwrap()
86/// .to_string()
87/// },
88/// );
89///
90/// // Collect records so we know the count
91/// let records: Vec<Vec<u8>> = get_cobol_records();
92/// let num_records = records.len();
93///
94/// // Submit COBOL records for parallel processing
95/// for record in records {
96/// pool.submit(record).unwrap();
97/// }
98///
99/// // Receive exactly num_records JSON results in order
100/// for _ in 0..num_records {
101/// let json = pool.recv_ordered().unwrap().unwrap();
102/// println!("{}", json);
103/// }
104///
105/// pool.shutdown().unwrap();
106/// // Return one EBCDIC record (0xF1 repeated = '1' in EBCDIC)
107/// # fn get_cobol_records() -> Vec<Vec<u8>> { vec![vec![0xF1; 10]] }
108/// ```
109#[derive(Debug)]
110pub struct WorkerPool<Input, Output> {
111 /// Input channel for work items
112 input_sender: Sender<SequencedRecord<Input>>,
113 /// Output sequence ring for ordered results
114 output_ring: SequenceRing<Output>,
115 /// Worker thread handles
116 worker_handles: Vec<thread::JoinHandle<()>>,
117 /// Next sequence ID to assign
118 next_input_sequence: u64,
119}
120
121impl<Input, Output> WorkerPool<Input, Output>
122where
123 Input: Send + 'static,
124 Output: Send + 'static,
125{
126 /// Create a new worker pool
127 ///
128 /// # Arguments
129 /// * `num_workers` - Number of worker threads
130 /// * `channel_capacity` - Maximum records in flight
131 /// * `max_window_size` - Maximum reordering window
132 /// * `worker_fn` - Function to process each record
133 #[inline]
134 #[must_use]
135 pub fn new<F>(
136 num_workers: usize,
137 channel_capacity: usize,
138 max_window_size: usize,
139 worker_fn: F,
140 ) -> Self
141 where
142 F: Fn(Input, &mut ScratchBuffers) -> Output + Send + Sync + Clone + 'static,
143 {
144 let (input_sender, input_receiver) = bounded(channel_capacity);
145 let output_ring = SequenceRing::new(channel_capacity, max_window_size);
146 let output_sender = output_ring.sender();
147
148 let worker_fn = Arc::new(worker_fn);
149 let mut worker_handles = Vec::with_capacity(num_workers);
150
151 // Spawn worker threads
152 for worker_id in 0..num_workers {
153 let input_receiver = input_receiver.clone();
154 let output_sender = output_sender.clone();
155 let worker_fn = Arc::clone(&worker_fn);
156
157 let handle = thread::spawn(move || {
158 let mut scratch_buffers = ScratchBuffers::new();
159 debug!("Worker {} started", worker_id);
160
161 while let Ok(sequenced_input) = input_receiver.recv() {
162 let SequencedRecord {
163 sequence_id,
164 data: input,
165 } = sequenced_input;
166
167 // Clear scratch buffers for reuse
168 scratch_buffers.clear();
169
170 // Process the record
171 let output = worker_fn(input, &mut scratch_buffers);
172
173 // Send result with sequence ID
174 let sequenced_output = SequencedRecord::new(sequence_id, output);
175 if output_sender.send(sequenced_output).is_err() {
176 debug!("Worker {} output channel closed", worker_id);
177 break;
178 }
179 }
180
181 debug!("Worker {} finished", worker_id);
182 });
183
184 worker_handles.push(handle);
185 }
186
187 Self {
188 input_sender,
189 output_ring,
190 worker_handles,
191 next_input_sequence: 1,
192 }
193 }
194
195 /// Submit input for processing
196 ///
197 /// # Errors
198 /// Returns an error if the worker channel is disconnected.
199 #[inline]
200 #[must_use = "Handle the Result or propagate the error"]
201 pub fn submit(
202 &mut self,
203 input: Input,
204 ) -> Result<(), crossbeam_channel::SendError<SequencedRecord<Input>>> {
205 let sequenced_input = SequencedRecord::new(self.next_input_sequence, input);
206 self.next_input_sequence += 1;
207 self.input_sender.send(sequenced_input)
208 }
209
210 /// Receive the next processed result in order.
211 ///
212 /// # Errors
213 /// Returns an error if the channel is disconnected.
214 #[inline]
215 #[must_use = "Handle the Result or propagate the error"]
216 pub fn recv_ordered(&mut self) -> Result<Option<Output>, crossbeam_channel::RecvError> {
217 self.output_ring.recv_ordered()
218 }
219
220 /// Try to receive the next processed result without blocking.
221 ///
222 /// # Errors
223 /// Returns an error if the channel is disconnected or would block.
224 #[inline]
225 #[must_use = "Handle the Result or propagate the error"]
226 pub fn try_recv_ordered(&mut self) -> Result<Option<Output>, crossbeam_channel::TryRecvError> {
227 self.output_ring.try_recv_ordered()
228 }
229
230 /// Close the input channel and wait for all workers to finish.
231 ///
232 /// # Errors
233 /// Returns an error if any worker thread panicked.
234 #[inline]
235 #[must_use = "Handle the Result or propagate the error"]
236 pub fn shutdown(self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
237 // Close input channel
238 drop(self.input_sender);
239
240 // Wait for all workers to finish
241 for (i, handle) in self.worker_handles.into_iter().enumerate() {
242 if let Err(e) = handle.join() {
243 warn!("Worker {} panicked: {:?}", i, e);
244 }
245 }
246
247 Ok(())
248 }
249
250 /// Get statistics about the worker pool
251 #[inline]
252 #[must_use]
253 pub fn stats(&self) -> WorkerPoolStats {
254 WorkerPoolStats {
255 num_workers: self.worker_handles.len(),
256 next_input_sequence: self.next_input_sequence,
257 sequence_ring_stats: self.output_ring.stats(),
258 }
259 }
260}
261
262/// Statistics about worker pool operation
263#[derive(Debug, Clone)]
264pub struct WorkerPoolStats {
265 /// Number of worker threads
266 pub num_workers: usize,
267 /// Next input sequence ID to assign
268 pub next_input_sequence: u64,
269 /// Sequence ring statistics
270 pub sequence_ring_stats: SequenceRingStats,
271}