Skip to main content

copybook_codec_memory/
worker_pool.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2use copybook_sequence_ring::{SequenceRing, SequenceRingStats, SequencedRecord};
3use crossbeam_channel::{Sender, bounded};
4use std::sync::Arc;
5use std::thread;
6use tracing::{debug, warn};
7
8use super::ScratchBuffers;
9
10/// Worker pool for parallel record processing with bounded memory
11///
12/// Manages a pool of worker threads that process records in parallel while
13/// maintaining deterministic output ordering. Combines [`SequenceRing`] for
14/// ordered emission with per-worker [`ScratchBuffers`] for allocation-free
15/// processing.
16///
17/// # Key Features
18///
19/// - **Deterministic output** - Records emitted in original input order
20/// - **Bounded memory** - Fixed channel capacity prevents unbounded buffering
21/// - **Worker-local buffers** - Each worker has dedicated scratch buffers
22/// - **Automatic cleanup** - Workers terminated gracefully on shutdown
23///
24/// # Architecture
25///
26/// ```text
27/// Input → WorkerPool::submit() → [Worker 1] → SequenceRing → recv_ordered() → Output
28///                                 [Worker 2] ↗            ↘
29///                                 [Worker N] ↗
30/// ```
31///
32/// # Performance Tuning
33///
34/// - **`num_workers`**: Match CPU core count (or 2x for I/O-bound work)
35/// - **`channel_capacity`**: 2-4x worker count for good pipeline depth
36/// - **`max_window_size`**: `channel_capacity` / 2 to allow processing variance
37///
38/// # Examples
39///
40/// ## Basic Usage
41///
42/// ```rust
43/// use copybook_codec_memory::{WorkerPool, ScratchBuffers};
44///
45/// let mut pool = WorkerPool::new(
46///     4,   // 4 worker threads
47///     16,  // 16 records in flight
48///     8,   // 8 max reorder window
49///     |input: i32, _scratch: &mut ScratchBuffers| -> i32 {
50///         input * 2 // Processing function
51///     },
52/// );
53///
54/// // Submit work
55/// for i in 1..=10 {
56///     pool.submit(i).unwrap();
57/// }
58///
59/// // Receive results in order
60/// for i in 1..=10 {
61///     let result = pool.recv_ordered().unwrap().unwrap();
62///     assert_eq!(result, i * 2);
63/// }
64///
65/// pool.shutdown().unwrap();
66/// ```
67///
68/// ## COBOL Record Processing
69///
70/// ```ignore
71/// use copybook_codec_memory::{WorkerPool, ScratchBuffers};
72/// use copybook_codec::{decode_record_with_scratch, DecodeOptions};
73/// use copybook_core::{parse_copybook, Schema};
74/// use std::sync::Arc;
75///
76/// let copybook = "01 RECORD.\n   05 FIELD PIC X(10).";
77/// let schema = Arc::new(parse_copybook(copybook).unwrap());
78/// let options = DecodeOptions::new();
79///
80/// let schema_clone = Arc::clone(&schema);
81/// let mut pool = WorkerPool::new(
82///     4, 100, 50,
83///     move |record_data: Vec<u8>, scratch: &mut ScratchBuffers| -> String {
84///         decode_record_with_scratch(&schema_clone, &record_data, &options, scratch)
85///             .unwrap()
86///             .to_string()
87///     },
88/// );
89///
90/// // Collect records so we know the count
91/// let records: Vec<Vec<u8>> = get_cobol_records();
92/// let num_records = records.len();
93///
94/// // Submit COBOL records for parallel processing
95/// for record in records {
96///     pool.submit(record).unwrap();
97/// }
98///
99/// // Receive exactly num_records JSON results in order
100/// for _ in 0..num_records {
101///     let json = pool.recv_ordered().unwrap().unwrap();
102///     println!("{}", json);
103/// }
104///
105/// pool.shutdown().unwrap();
106/// // Return one EBCDIC record (0xF1 repeated = '1' in EBCDIC)
107/// # fn get_cobol_records() -> Vec<Vec<u8>> { vec![vec![0xF1; 10]] }
108/// ```
109#[derive(Debug)]
110pub struct WorkerPool<Input, Output> {
111    /// Input channel for work items
112    input_sender: Sender<SequencedRecord<Input>>,
113    /// Output sequence ring for ordered results
114    output_ring: SequenceRing<Output>,
115    /// Worker thread handles
116    worker_handles: Vec<thread::JoinHandle<()>>,
117    /// Next sequence ID to assign
118    next_input_sequence: u64,
119}
120
121impl<Input, Output> WorkerPool<Input, Output>
122where
123    Input: Send + 'static,
124    Output: Send + 'static,
125{
126    /// Create a new worker pool
127    ///
128    /// # Arguments
129    /// * `num_workers` - Number of worker threads
130    /// * `channel_capacity` - Maximum records in flight
131    /// * `max_window_size` - Maximum reordering window
132    /// * `worker_fn` - Function to process each record
133    #[inline]
134    #[must_use]
135    pub fn new<F>(
136        num_workers: usize,
137        channel_capacity: usize,
138        max_window_size: usize,
139        worker_fn: F,
140    ) -> Self
141    where
142        F: Fn(Input, &mut ScratchBuffers) -> Output + Send + Sync + Clone + 'static,
143    {
144        let (input_sender, input_receiver) = bounded(channel_capacity);
145        let output_ring = SequenceRing::new(channel_capacity, max_window_size);
146        let output_sender = output_ring.sender();
147
148        let worker_fn = Arc::new(worker_fn);
149        let mut worker_handles = Vec::with_capacity(num_workers);
150
151        // Spawn worker threads
152        for worker_id in 0..num_workers {
153            let input_receiver = input_receiver.clone();
154            let output_sender = output_sender.clone();
155            let worker_fn = Arc::clone(&worker_fn);
156
157            let handle = thread::spawn(move || {
158                let mut scratch_buffers = ScratchBuffers::new();
159                debug!("Worker {} started", worker_id);
160
161                while let Ok(sequenced_input) = input_receiver.recv() {
162                    let SequencedRecord {
163                        sequence_id,
164                        data: input,
165                    } = sequenced_input;
166
167                    // Clear scratch buffers for reuse
168                    scratch_buffers.clear();
169
170                    // Process the record
171                    let output = worker_fn(input, &mut scratch_buffers);
172
173                    // Send result with sequence ID
174                    let sequenced_output = SequencedRecord::new(sequence_id, output);
175                    if output_sender.send(sequenced_output).is_err() {
176                        debug!("Worker {} output channel closed", worker_id);
177                        break;
178                    }
179                }
180
181                debug!("Worker {} finished", worker_id);
182            });
183
184            worker_handles.push(handle);
185        }
186
187        Self {
188            input_sender,
189            output_ring,
190            worker_handles,
191            next_input_sequence: 1,
192        }
193    }
194
195    /// Submit input for processing
196    ///
197    /// # Errors
198    /// Returns an error if the worker channel is disconnected.
199    #[inline]
200    #[must_use = "Handle the Result or propagate the error"]
201    pub fn submit(
202        &mut self,
203        input: Input,
204    ) -> Result<(), crossbeam_channel::SendError<SequencedRecord<Input>>> {
205        let sequenced_input = SequencedRecord::new(self.next_input_sequence, input);
206        self.next_input_sequence += 1;
207        self.input_sender.send(sequenced_input)
208    }
209
210    /// Receive the next processed result in order.
211    ///
212    /// # Errors
213    /// Returns an error if the channel is disconnected.
214    #[inline]
215    #[must_use = "Handle the Result or propagate the error"]
216    pub fn recv_ordered(&mut self) -> Result<Option<Output>, crossbeam_channel::RecvError> {
217        self.output_ring.recv_ordered()
218    }
219
220    /// Try to receive the next processed result without blocking.
221    ///
222    /// # Errors
223    /// Returns an error if the channel is disconnected or would block.
224    #[inline]
225    #[must_use = "Handle the Result or propagate the error"]
226    pub fn try_recv_ordered(&mut self) -> Result<Option<Output>, crossbeam_channel::TryRecvError> {
227        self.output_ring.try_recv_ordered()
228    }
229
230    /// Close the input channel and wait for all workers to finish.
231    ///
232    /// # Errors
233    /// Returns an error if any worker thread panicked.
234    #[inline]
235    #[must_use = "Handle the Result or propagate the error"]
236    pub fn shutdown(self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
237        // Close input channel
238        drop(self.input_sender);
239
240        // Wait for all workers to finish
241        for (i, handle) in self.worker_handles.into_iter().enumerate() {
242            if let Err(e) = handle.join() {
243                warn!("Worker {} panicked: {:?}", i, e);
244            }
245        }
246
247        Ok(())
248    }
249
250    /// Get statistics about the worker pool
251    #[inline]
252    #[must_use]
253    pub fn stats(&self) -> WorkerPoolStats {
254        WorkerPoolStats {
255            num_workers: self.worker_handles.len(),
256            next_input_sequence: self.next_input_sequence,
257            sequence_ring_stats: self.output_ring.stats(),
258        }
259    }
260}
261
262/// Statistics about worker pool operation
263#[derive(Debug, Clone)]
264pub struct WorkerPoolStats {
265    /// Number of worker threads
266    pub num_workers: usize,
267    /// Next input sequence ID to assign
268    pub next_input_sequence: u64,
269    /// Sequence ring statistics
270    pub sequence_ring_stats: SequenceRingStats,
271}