Skip to main content

stryke/
ppool.rs

1//! Persistent thread pool (`ppool`) — workers pull jobs from a shared queue and run
2//! each task on a **fresh** [`Interpreter`] on an **existing** OS thread (no rayon task
3//! spawn per item; threads stay alive between jobs).
4
5use std::collections::{HashMap, VecDeque};
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::{Arc, Mutex};
8use std::thread::{self, JoinHandle};
9
10use crossbeam::channel::{unbounded, Receiver, Sender};
11
12use crate::error::{PerlError, PerlResult};
13use crate::interpreter::{Flow, FlowOrError, Interpreter};
14use crate::scope::{AtomicArray, AtomicHash};
15use crate::value::{PerlPpool, PerlSub, PerlValue};
16
17/// Shared pool state (jobs in, results out-of-order; `PerlPpool::collect` reorders).
18pub struct PpoolInner {
19    /// `None` after the pool is shut down.
20    pub(crate) job_tx: Mutex<Option<Sender<PoolJob>>>,
21    result_rx: Mutex<Receiver<(u64, PerlValue)>>,
22    pending: Mutex<VecDeque<(u64, PerlValue)>>,
23    pub(crate) next_order: AtomicU64,
24    collect_from: AtomicU64,
25    workers: Mutex<Option<Vec<JoinHandle<()>>>>,
26}
27
28pub(crate) struct PoolJob {
29    order: u64,
30    sub: Arc<PerlSub>,
31    arg: PerlValue,
32    subs: HashMap<String, Arc<PerlSub>>,
33    capture: Vec<(String, PerlValue)>,
34    atomic_arrays: Vec<(String, AtomicArray)>,
35    atomic_hashes: Vec<(String, AtomicHash)>,
36}
37
38impl PerlPpool {
39    pub(crate) fn submit(
40        &self,
41        interp: &mut Interpreter,
42        args: &[PerlValue],
43        line: usize,
44    ) -> PerlResult<PerlValue> {
45        if args.is_empty() {
46            return Err(PerlError::runtime(
47                "submit() expects a code reference and optional argument for $_",
48                line,
49            ));
50        }
51        let Some(sub) = args[0].as_code_ref() else {
52            return Err(PerlError::runtime(
53                "submit() first argument must be a CODE ref",
54                line,
55            ));
56        };
57        // One-arg form: bind worker `$_` from the caller's `$_` at submit time (postfix `for @tasks`
58        // sets `$_` each iteration). Two-arg form: explicit binding (may be `undef`).
59        let arg = if args.len() >= 2 {
60            args[1].clone()
61        } else {
62            interp.scope.get_scalar("_").clone()
63        };
64        let order = self.0.next_order.fetch_add(1, Ordering::SeqCst);
65        let subs = interp.subs.clone();
66        let (capture, atomic_arrays, atomic_hashes) = interp.scope.capture_with_atomics();
67        let job = PoolJob {
68            order,
69            sub: Arc::clone(&sub),
70            arg,
71            subs,
72            capture,
73            atomic_arrays,
74            atomic_hashes,
75        };
76        let tx = self
77            .0
78            .job_tx
79            .lock()
80            .map_err(|_| PerlError::runtime("ppool: job queue poisoned", line))?;
81        let Some(sender) = tx.as_ref() else {
82            return Err(PerlError::runtime("ppool: pool shut down", line));
83        };
84        sender
85            .send(job)
86            .map_err(|_| PerlError::runtime("ppool: submit failed (pool shut down)", line))?;
87        Ok(PerlValue::UNDEF)
88    }
89
90    pub(crate) fn collect(&self, line: usize) -> PerlResult<PerlValue> {
91        let start = self.0.collect_from.load(Ordering::SeqCst);
92        let end = self.0.next_order.load(Ordering::SeqCst);
93        let n = (end - start) as usize;
94        if n == 0 {
95            return Ok(PerlValue::array(vec![]));
96        }
97
98        let mut slots: Vec<Option<PerlValue>> = vec![None; n];
99        let mut count = 0usize;
100
101        {
102            let mut pending = self
103                .0
104                .pending
105                .lock()
106                .map_err(|_| PerlError::runtime("ppool: pending buffer poisoned", line))?;
107            let mut keep = VecDeque::new();
108            for (o, v) in pending.drain(..) {
109                if o >= start && o < end {
110                    let idx = (o - start) as usize;
111                    if slots[idx].is_none() {
112                        slots[idx] = Some(v);
113                        count += 1;
114                    }
115                } else {
116                    keep.push_back((o, v));
117                }
118            }
119            *pending = keep;
120        }
121
122        let rx = self
123            .0
124            .result_rx
125            .lock()
126            .map_err(|_| PerlError::runtime("ppool: collect lock poisoned", line))?;
127
128        while count < n {
129            let (o, v) = rx.recv().map_err(|_| {
130                PerlError::runtime("ppool: result channel closed (workers stopped)", line)
131            })?;
132            if o < start {
133                continue;
134            }
135            if o >= end {
136                self.0
137                    .pending
138                    .lock()
139                    .map_err(|_| PerlError::runtime("ppool: pending buffer poisoned", line))?
140                    .push_back((o, v));
141                continue;
142            }
143            let idx = (o - start) as usize;
144            if slots[idx].is_none() {
145                slots[idx] = Some(v);
146                count += 1;
147            }
148        }
149
150        self.0.collect_from.store(end, Ordering::SeqCst);
151        let out: Vec<PerlValue> = slots
152            .into_iter()
153            .map(|s| s.unwrap_or(PerlValue::UNDEF))
154            .collect();
155        Ok(PerlValue::array(out))
156    }
157}
158
159impl Drop for PpoolInner {
160    fn drop(&mut self) {
161        if let Ok(mut g) = self.job_tx.lock() {
162            let _ = g.take();
163        }
164        if let Ok(mut g) = self.workers.lock() {
165            if let Some(handles) = g.take() {
166                for h in handles {
167                    let _ = h.join();
168                }
169            }
170        }
171    }
172}
173
174fn worker_loop(job_rx: Receiver<PoolJob>, result_tx: Sender<(u64, PerlValue)>) {
175    while let Ok(job) = job_rx.recv() {
176        let mut interp = Interpreter::new();
177        interp.subs = job.subs;
178        interp.scope.restore_capture(&job.capture);
179        interp
180            .scope
181            .restore_atomics(&job.atomic_arrays, &job.atomic_hashes);
182        if let Some(env) = job.sub.closure_env.as_ref() {
183            interp.scope.restore_capture(env);
184        }
185        interp.enable_parallel_guard();
186        interp.scope.set_topic(job.arg);
187        interp.scope_push_hook();
188        let val = match interp.exec_block_no_scope(&job.sub.body) {
189            Ok(v) => v,
190            Err(FlowOrError::Flow(Flow::Return(v))) => v,
191            Err(_) => PerlValue::UNDEF,
192        };
193        interp.scope_pop_hook();
194        let _ = result_tx.send((job.order, val));
195    }
196}
197
198/// Create a pool with `workers` OS threads (clamped to 1..=256). Each thread runs jobs
199/// sequentially; new [`Interpreter`] values are constructed per job (cheap vs thread spawn).
200pub fn create_pool(workers: usize) -> PerlResult<PerlValue> {
201    let workers = workers.clamp(1, 256);
202    let (job_tx, job_rx): (Sender<PoolJob>, Receiver<PoolJob>) = unbounded();
203    type ResultMsg = (u64, PerlValue);
204    let (result_tx, result_rx): (Sender<ResultMsg>, Receiver<ResultMsg>) = unbounded();
205
206    let mut handles = Vec::with_capacity(workers);
207    for _ in 0..workers {
208        let jrx = job_rx.clone();
209        let rtx = result_tx.clone();
210        handles.push(thread::spawn(move || worker_loop(jrx, rtx)));
211    }
212    drop(job_rx);
213    drop(result_tx);
214
215    let inner = Arc::new(PpoolInner {
216        job_tx: Mutex::new(Some(job_tx)),
217        result_rx: Mutex::new(result_rx),
218        pending: Mutex::new(VecDeque::new()),
219        next_order: AtomicU64::new(0),
220        collect_from: AtomicU64::new(0),
221        workers: Mutex::new(Some(handles)),
222    });
223
224    Ok(PerlValue::ppool(PerlPpool(inner)))
225}
226
227#[cfg(test)]
228mod tests {
229    use super::*;
230    use crate::parse;
231
232    #[test]
233    fn test_ppool_basic() {
234        let mut interp = Interpreter::new();
235        let pool_val = create_pool(2).expect("create_pool");
236        let pool = pool_val.as_ppool().expect("as_ppool");
237
238        let prog = parse("{ $_ * 2 }").expect("parse");
239        let body = match &prog.statements[0].kind {
240            crate::ast::StmtKind::Block(b) => b.clone(),
241            _ => panic!("expected block"),
242        };
243
244        let sub_val = PerlValue::code_ref(Arc::new(PerlSub {
245            name: "anon".to_string(),
246            params: vec![],
247            body,
248            prototype: None,
249            closure_env: None,
250            fib_like: None,
251        }));
252
253        for i in 1..=5 {
254            pool.submit(&mut interp, &[sub_val.clone(), PerlValue::integer(i)], 1)
255                .expect("submit");
256        }
257
258        let results = pool.collect(1).expect("collect");
259        let arr = results.as_array_vec().expect("array");
260        assert_eq!(arr.len(), 5);
261        let mut ints: Vec<i64> = arr.iter().map(|v| v.to_int()).collect();
262        ints.sort();
263        assert_eq!(ints, vec![2, 4, 6, 8, 10]);
264    }
265}