1use std::collections::{HashMap, VecDeque};
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::{Arc, Mutex};
8use std::thread::{self, JoinHandle};
9
10use crossbeam::channel::{unbounded, Receiver, Sender};
11
12use crate::error::{PerlError, PerlResult};
13use crate::interpreter::{Flow, FlowOrError, Interpreter};
14use crate::scope::{AtomicArray, AtomicHash};
15use crate::value::{PerlPpool, PerlSub, PerlValue};
16
17pub struct PpoolInner {
19 pub(crate) job_tx: Mutex<Option<Sender<PoolJob>>>,
21 result_rx: Mutex<Receiver<(u64, PerlValue)>>,
22 pending: Mutex<VecDeque<(u64, PerlValue)>>,
23 pub(crate) next_order: AtomicU64,
24 collect_from: AtomicU64,
25 workers: Mutex<Option<Vec<JoinHandle<()>>>>,
26}
27
28pub(crate) struct PoolJob {
29 order: u64,
30 sub: Arc<PerlSub>,
31 arg: PerlValue,
32 subs: HashMap<String, Arc<PerlSub>>,
33 capture: Vec<(String, PerlValue)>,
34 atomic_arrays: Vec<(String, AtomicArray)>,
35 atomic_hashes: Vec<(String, AtomicHash)>,
36}
37
38impl PerlPpool {
39 pub(crate) fn submit(
40 &self,
41 interp: &mut Interpreter,
42 args: &[PerlValue],
43 line: usize,
44 ) -> PerlResult<PerlValue> {
45 if args.is_empty() {
46 return Err(PerlError::runtime(
47 "submit() expects a code reference and optional argument for $_",
48 line,
49 ));
50 }
51 let Some(sub) = args[0].as_code_ref() else {
52 return Err(PerlError::runtime(
53 "submit() first argument must be a CODE ref",
54 line,
55 ));
56 };
57 let arg = if args.len() >= 2 {
60 args[1].clone()
61 } else {
62 interp.scope.get_scalar("_").clone()
63 };
64 let order = self.0.next_order.fetch_add(1, Ordering::SeqCst);
65 let subs = interp.subs.clone();
66 let (capture, atomic_arrays, atomic_hashes) = interp.scope.capture_with_atomics();
67 let job = PoolJob {
68 order,
69 sub: Arc::clone(&sub),
70 arg,
71 subs,
72 capture,
73 atomic_arrays,
74 atomic_hashes,
75 };
76 let tx = self
77 .0
78 .job_tx
79 .lock()
80 .map_err(|_| PerlError::runtime("ppool: job queue poisoned", line))?;
81 let Some(sender) = tx.as_ref() else {
82 return Err(PerlError::runtime("ppool: pool shut down", line));
83 };
84 sender
85 .send(job)
86 .map_err(|_| PerlError::runtime("ppool: submit failed (pool shut down)", line))?;
87 Ok(PerlValue::UNDEF)
88 }
89
90 pub(crate) fn collect(&self, line: usize) -> PerlResult<PerlValue> {
91 let start = self.0.collect_from.load(Ordering::SeqCst);
92 let end = self.0.next_order.load(Ordering::SeqCst);
93 let n = (end - start) as usize;
94 if n == 0 {
95 return Ok(PerlValue::array(vec![]));
96 }
97
98 let mut slots: Vec<Option<PerlValue>> = vec![None; n];
99 let mut count = 0usize;
100
101 {
102 let mut pending = self
103 .0
104 .pending
105 .lock()
106 .map_err(|_| PerlError::runtime("ppool: pending buffer poisoned", line))?;
107 let mut keep = VecDeque::new();
108 for (o, v) in pending.drain(..) {
109 if o >= start && o < end {
110 let idx = (o - start) as usize;
111 if slots[idx].is_none() {
112 slots[idx] = Some(v);
113 count += 1;
114 }
115 } else {
116 keep.push_back((o, v));
117 }
118 }
119 *pending = keep;
120 }
121
122 let rx = self
123 .0
124 .result_rx
125 .lock()
126 .map_err(|_| PerlError::runtime("ppool: collect lock poisoned", line))?;
127
128 while count < n {
129 let (o, v) = rx.recv().map_err(|_| {
130 PerlError::runtime("ppool: result channel closed (workers stopped)", line)
131 })?;
132 if o < start {
133 continue;
134 }
135 if o >= end {
136 self.0
137 .pending
138 .lock()
139 .map_err(|_| PerlError::runtime("ppool: pending buffer poisoned", line))?
140 .push_back((o, v));
141 continue;
142 }
143 let idx = (o - start) as usize;
144 if slots[idx].is_none() {
145 slots[idx] = Some(v);
146 count += 1;
147 }
148 }
149
150 self.0.collect_from.store(end, Ordering::SeqCst);
151 let out: Vec<PerlValue> = slots
152 .into_iter()
153 .map(|s| s.unwrap_or(PerlValue::UNDEF))
154 .collect();
155 Ok(PerlValue::array(out))
156 }
157}
158
159impl Drop for PpoolInner {
160 fn drop(&mut self) {
161 if let Ok(mut g) = self.job_tx.lock() {
162 let _ = g.take();
163 }
164 if let Ok(mut g) = self.workers.lock() {
165 if let Some(handles) = g.take() {
166 for h in handles {
167 let _ = h.join();
168 }
169 }
170 }
171 }
172}
173
174fn worker_loop(job_rx: Receiver<PoolJob>, result_tx: Sender<(u64, PerlValue)>) {
175 while let Ok(job) = job_rx.recv() {
176 let mut interp = Interpreter::new();
177 interp.subs = job.subs;
178 interp.scope.restore_capture(&job.capture);
179 interp
180 .scope
181 .restore_atomics(&job.atomic_arrays, &job.atomic_hashes);
182 if let Some(env) = job.sub.closure_env.as_ref() {
183 interp.scope.restore_capture(env);
184 }
185 interp.enable_parallel_guard();
186 interp.scope.set_topic(job.arg);
187 interp.scope_push_hook();
188 let val = match interp.exec_block_no_scope(&job.sub.body) {
189 Ok(v) => v,
190 Err(FlowOrError::Flow(Flow::Return(v))) => v,
191 Err(_) => PerlValue::UNDEF,
192 };
193 interp.scope_pop_hook();
194 let _ = result_tx.send((job.order, val));
195 }
196}
197
198pub fn create_pool(workers: usize) -> PerlResult<PerlValue> {
201 let workers = workers.clamp(1, 256);
202 let (job_tx, job_rx): (Sender<PoolJob>, Receiver<PoolJob>) = unbounded();
203 type ResultMsg = (u64, PerlValue);
204 let (result_tx, result_rx): (Sender<ResultMsg>, Receiver<ResultMsg>) = unbounded();
205
206 let mut handles = Vec::with_capacity(workers);
207 for _ in 0..workers {
208 let jrx = job_rx.clone();
209 let rtx = result_tx.clone();
210 handles.push(thread::spawn(move || worker_loop(jrx, rtx)));
211 }
212 drop(job_rx);
213 drop(result_tx);
214
215 let inner = Arc::new(PpoolInner {
216 job_tx: Mutex::new(Some(job_tx)),
217 result_rx: Mutex::new(result_rx),
218 pending: Mutex::new(VecDeque::new()),
219 next_order: AtomicU64::new(0),
220 collect_from: AtomicU64::new(0),
221 workers: Mutex::new(Some(handles)),
222 });
223
224 Ok(PerlValue::ppool(PerlPpool(inner)))
225}
226
227#[cfg(test)]
228mod tests {
229 use super::*;
230 use crate::parse;
231
232 #[test]
233 fn test_ppool_basic() {
234 let mut interp = Interpreter::new();
235 let pool_val = create_pool(2).expect("create_pool");
236 let pool = pool_val.as_ppool().expect("as_ppool");
237
238 let prog = parse("{ $_ * 2 }").expect("parse");
239 let body = match &prog.statements[0].kind {
240 crate::ast::StmtKind::Block(b) => b.clone(),
241 _ => panic!("expected block"),
242 };
243
244 let sub_val = PerlValue::code_ref(Arc::new(PerlSub {
245 name: "anon".to_string(),
246 params: vec![],
247 body,
248 prototype: None,
249 closure_env: None,
250 fib_like: None,
251 }));
252
253 for i in 1..=5 {
254 pool.submit(&mut interp, &[sub_val.clone(), PerlValue::integer(i)], 1)
255 .expect("submit");
256 }
257
258 let results = pool.collect(1).expect("collect");
259 let arr = results.as_array_vec().expect("array");
260 assert_eq!(arr.len(), 5);
261 let mut ints: Vec<i64> = arr.iter().map(|v| v.to_int()).collect();
262 ints.sort();
263 assert_eq!(ints, vec![2, 4, 6, 8, 10]);
264 }
265}