Skip to main content

bext_php/
pool.rs

1//! PHP Worker Pool — dedicated OS threads for PHP script execution.
2//!
3//! Supports two modes:
4//! 1. Classic mode: one `php_execute_script()` per HTTP request
5//! 2. Worker mode:  boot a worker script once, dispatch requests to its
6//!    `bext_handle_request()` loop (eliminates framework bootstrap)
7//!
8//! Architecture mirrors `bext-core::jsc_ssr::pool::JscRenderPool`.
9
10use super::context::RequestCtx;
11use super::ffi;
12use std::cell::RefCell;
13use std::ffi::CString;
14use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18// ─── Thread-local worker state (for worker mode) ─────────────────────────
19
20/// Opaque pointer to a heap-allocated `RequestCtx`, safe to send through channels.
21///
22/// # Safety
23///
24/// Ownership is transferred linearly: the producer creates a `Box<RequestCtx>`,
25/// converts it to a raw pointer, wraps it in `SendPtr`, and sends it through a
26/// bounded channel.  The consumer receives exclusive ownership and must either
27/// convert it back to `Box<RequestCtx>` via `Box::from_raw` or ensure the C
28/// side doesn't access it concurrently.  The pointer is never aliased — only
29/// one thread holds it at any time.
30pub(crate) struct SendPtr(pub(crate) *mut u8);
31unsafe impl Send for SendPtr {}
32
33/// Per-thread state used by the FFI callbacks in worker mode.
34pub(crate) struct WorkerThreadState {
35    /// Receives request context pointers from the pool dispatcher.
36    pub(crate) request_rx: flume::Receiver<SendPtr>,
37    /// Signals that the current request is complete.
38    pub(crate) done_tx: flume::Sender<()>,
39}
40
41thread_local! {
42    pub static WORKER_THREAD_STATE: RefCell<Option<WorkerThreadState>> = const { RefCell::new(None) };
43}
44
45// ─── Request / Response types ────────────────────────────────────────────
46
47/// A PHP execution request dispatched to a worker.
48pub enum PhpRequest {
49    /// Execute a PHP script and return the output.
50    Execute(Box<PhpExecuteRequest>),
51    Shutdown,
52}
53
54/// Inner data for a PHP execute request (boxed to reduce enum size).
55pub struct PhpExecuteRequest {
56    pub script_path: String,
57    pub method: String,
58    pub uri: String,
59    pub query_string: String,
60    pub content_type: Option<String>,
61    pub body: Vec<u8>,
62    pub cookies: Option<String>,
63    pub headers: Vec<(String, String)>,
64    pub remote_addr: Option<String>,
65    pub server_name: Option<String>,
66    pub server_port: u16,
67    pub https: bool,
68    pub reply: flume::Sender<PhpResponse>,
69}
70
71/// Response from a PHP worker.
72#[derive(Debug, Clone)]
73pub enum PhpResponse {
74    Ok {
75        status: u16,
76        body: Vec<u8>,
77        headers: Vec<(String, String)>,
78        exec_time_us: u64,
79    },
80    Error(String),
81}
82
83/// Pool-level statistics.
84#[derive(Debug, Clone, Default)]
85pub struct PhpPoolStats {
86    pub workers: u32,
87    pub active: u32,
88    pub total_requests: u64,
89    pub total_errors: u64,
90    pub avg_exec_time_us: u64,
91}
92
93// ─── Execution mode ──────────────────────────────────────────────────────
94
95/// How PHP scripts are executed.
96#[derive(Debug, Clone)]
97pub enum PhpMode {
98    /// One `php_execute_script()` per request. Simple, compatible with everything.
99    Classic,
100    /// Boot a worker script once; dispatch requests to its `bext_handle_request()` loop.
101    /// Eliminates per-request framework bootstrap (~3ms for Laravel).
102    Worker {
103        /// Path to the worker PHP script.
104        script: String,
105    },
106}
107
108// ─── Pool ────────────────────────────────────────────────────────────────
109
110pub struct PhpPool {
111    sender: flume::Sender<PhpRequest>,
112    workers: Vec<std::thread::JoinHandle<()>>,
113    active: Arc<AtomicU32>,
114    total_requests: Arc<AtomicU64>,
115    total_errors: Arc<AtomicU64>,
116    total_exec_time_us: Arc<AtomicU64>,
117    worker_count: u32,
118    mode: PhpMode,
119}
120
121impl PhpPool {
122    /// Create a classic-mode pool.
123    pub fn new(worker_count: usize) -> Result<Self, String> {
124        Self::create(worker_count, 0, PhpMode::Classic, None)
125    }
126
127    /// Create a classic-mode pool with per-worker request lifecycle limit.
128    pub fn with_max_requests(worker_count: usize, max_requests: u64) -> Result<Self, String> {
129        Self::create(worker_count, max_requests, PhpMode::Classic, None)
130    }
131
132    /// Create a worker-mode pool. The worker script boots the framework
133    /// once and calls `bext_handle_request($callback)` in a loop.
134    pub fn worker(
135        worker_count: usize,
136        worker_script: String,
137        max_requests: u64,
138    ) -> Result<Self, String> {
139        Self::create(
140            worker_count,
141            max_requests,
142            PhpMode::Worker {
143                script: worker_script.clone(),
144            },
145            Some(worker_script),
146        )
147    }
148
149    fn create(
150        worker_count: usize,
151        max_requests: u64,
152        mode: PhpMode,
153        worker_script: Option<String>,
154    ) -> Result<Self, String> {
155        // Queue depth = 128 × worker count.  Requests beyond this will block
156        // the caller (backpressure), matching PHP-FPM's behavior under load.
157        let (sender, receiver) = flume::bounded::<PhpRequest>(worker_count.max(1) * 128);
158        let active = Arc::new(AtomicU32::new(0));
159        let total_requests = Arc::new(AtomicU64::new(0));
160        let total_errors = Arc::new(AtomicU64::new(0));
161        let total_exec_time_us = Arc::new(AtomicU64::new(0));
162
163        let mut workers = Vec::with_capacity(worker_count);
164
165        for i in 0..worker_count {
166            let rx = receiver.clone();
167            let active = Arc::clone(&active);
168            let total_requests = Arc::clone(&total_requests);
169            let total_errors = Arc::clone(&total_errors);
170            let total_exec_time_us = Arc::clone(&total_exec_time_us);
171            let ws = worker_script.clone();
172
173            let handle = std::thread::Builder::new()
174                .name(format!("bext-php-{}", i))
175                // PHP 8.4's stack checker (zend.max_allowed_stack_size) auto-detects
176                // from getrlimit which reports the MAIN thread's limit, not ours.
177                // Use 16MB to ensure plenty of room for PHP's compiler + JIT.
178                .stack_size(16 * 1024 * 1024)
179                .spawn(move || {
180                    // Wrap in catch_unwind for crash recovery
181                    let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
182                        if let Some(ref script) = ws {
183                            worker_mode_loop(
184                                i,
185                                rx,
186                                active,
187                                total_requests,
188                                total_errors,
189                                total_exec_time_us,
190                                script,
191                                max_requests,
192                            );
193                        } else {
194                            classic_mode_loop(
195                                i,
196                                rx,
197                                active,
198                                total_requests,
199                                total_errors,
200                                total_exec_time_us,
201                                max_requests,
202                            );
203                        }
204                    }));
205                    if let Err(e) = result {
206                        let msg = if let Some(s) = e.downcast_ref::<String>() {
207                            s.clone()
208                        } else if let Some(s) = e.downcast_ref::<&str>() {
209                            s.to_string()
210                        } else {
211                            "unknown panic".to_string()
212                        };
213                        tracing::error!(worker = i, error = %msg, "PHP worker panicked");
214                    }
215                })
216                .map_err(|e| format!("Failed to spawn PHP worker {}: {}", i, e))?;
217
218            workers.push(handle);
219        }
220
221        Ok(Self {
222            sender,
223            workers,
224            active,
225            total_requests,
226            total_errors,
227            total_exec_time_us,
228            worker_count: worker_count as u32,
229            mode,
230        })
231    }
232
233    /// Execute a PHP request. Blocks until a worker completes.
234    #[allow(clippy::too_many_arguments)]
235    pub fn execute(
236        &self,
237        script_path: String,
238        method: String,
239        uri: String,
240        query_string: String,
241        content_type: Option<String>,
242        body: Vec<u8>,
243        cookies: Option<String>,
244        headers: Vec<(String, String)>,
245        remote_addr: Option<String>,
246        server_name: Option<String>,
247        server_port: u16,
248        https: bool,
249    ) -> Result<PhpResponse, String> {
250        let (tx, rx) = flume::bounded(1);
251        self.sender
252            .send_timeout(
253                PhpRequest::Execute(Box::new(PhpExecuteRequest {
254                    script_path,
255                    method,
256                    uri,
257                    query_string,
258                    content_type,
259                    body,
260                    cookies,
261                    headers,
262                    remote_addr,
263                    server_name,
264                    server_port,
265                    https,
266                    reply: tx,
267                })),
268                Duration::from_secs(30),
269            )
270            .map_err(|_| "PHP pool queue timeout (30s)".to_string())?;
271        rx.recv_timeout(Duration::from_secs(60))
272            .map_err(|_| "PHP worker timeout (60s)".to_string())
273    }
274
275    pub fn healthy_workers(&self) -> u32 {
276        self.workers.iter().filter(|h| !h.is_finished()).count() as u32
277    }
278
279    pub fn stats(&self) -> PhpPoolStats {
280        let total = self.total_requests.load(Ordering::Relaxed);
281        PhpPoolStats {
282            workers: self.worker_count,
283            active: self.active.load(Ordering::Relaxed),
284            total_requests: total,
285            total_errors: self.total_errors.load(Ordering::Relaxed),
286            avg_exec_time_us: if total > 0 {
287                self.total_exec_time_us.load(Ordering::Relaxed) / total
288            } else {
289                0
290            },
291        }
292    }
293
294    pub fn mode(&self) -> &PhpMode {
295        &self.mode
296    }
297
298    pub fn shutdown(self) {
299        for _ in &self.workers {
300            let _ = self.sender.send(PhpRequest::Shutdown);
301        }
302        for handle in self.workers {
303            let _ = handle.join();
304        }
305    }
306}
307
308const WORKER_RECV_TIMEOUT: Duration = Duration::from_secs(30);
309
310// ─── Classic mode worker loop ────────────────────────────────────────────
311
312fn classic_mode_loop(
313    worker_id: usize,
314    rx: flume::Receiver<PhpRequest>,
315    active: Arc<AtomicU32>,
316    total_requests: Arc<AtomicU64>,
317    total_errors: Arc<AtomicU64>,
318    total_exec_time_us: Arc<AtomicU64>,
319    max_requests: u64,
320) {
321    tracing::info!(worker = worker_id, mode = "classic", "PHP worker started");
322    let mut local_count: u64 = 0;
323
324    loop {
325        if max_requests > 0 && local_count >= max_requests {
326            tracing::info!(
327                worker = worker_id,
328                requests = local_count,
329                "PHP worker rotating"
330            );
331            break;
332        }
333
334        let request = match rx.recv_timeout(WORKER_RECV_TIMEOUT) {
335            Ok(req) => req,
336            Err(flume::RecvTimeoutError::Timeout) => continue,
337            Err(flume::RecvTimeoutError::Disconnected) => break,
338        };
339
340        match request {
341            PhpRequest::Shutdown => break,
342            PhpRequest::Execute(req) => {
343                active.fetch_add(1, Ordering::Relaxed);
344                total_requests.fetch_add(1, Ordering::Relaxed);
345                local_count += 1;
346
347                let response = execute_classic(
348                    &req.script_path,
349                    &req.method,
350                    &req.uri,
351                    &req.query_string,
352                    req.content_type.as_deref(),
353                    req.body,
354                    req.cookies.as_deref(),
355                    req.headers,
356                    req.remote_addr.as_deref(),
357                    req.server_name.as_deref(),
358                    req.server_port,
359                    req.https,
360                );
361
362                match &response {
363                    PhpResponse::Ok { exec_time_us, .. } => {
364                        total_exec_time_us.fetch_add(*exec_time_us, Ordering::Relaxed);
365                    }
366                    PhpResponse::Error(_) => {
367                        total_errors.fetch_add(1, Ordering::Relaxed);
368                    }
369                }
370
371                let _ = req.reply.send(response);
372                active.fetch_sub(1, Ordering::Relaxed);
373            }
374        }
375    }
376
377    tracing::info!(worker = worker_id, "PHP worker stopped");
378}
379
380// ─── Worker mode loop ────────────────────────────────────────────────────
381
382#[allow(clippy::too_many_arguments)]
383fn worker_mode_loop(
384    worker_id: usize,
385    rx: flume::Receiver<PhpRequest>,
386    active: Arc<AtomicU32>,
387    total_requests: Arc<AtomicU64>,
388    total_errors: Arc<AtomicU64>,
389    total_exec_time_us: Arc<AtomicU64>,
390    worker_script: &str,
391    _max_requests: u64,
392) {
393    tracing::info!(worker = worker_id, mode = "worker", script = %worker_script, "PHP worker started");
394
395    // Create channels for the C↔Rust worker protocol:
396    // - request_tx/rx: pool sends request ctx pointers to the C bext_handle_request()
397    // - done_tx/rx: C signals request completion back to pool
398    let (request_tx, request_rx) = flume::bounded::<SendPtr>(1);
399    let (done_tx, done_rx) = flume::bounded::<()>(1);
400
401    // Install thread-local state for FFI callbacks
402    WORKER_THREAD_STATE.with(|state| {
403        *state.borrow_mut() = Some(WorkerThreadState {
404            request_rx,
405            done_tx,
406        });
407    });
408
409    // Boot the worker script in a background thread.
410    // The script runs `while (bext_handle_request($cb)) { ... }` which blocks
411    // in bext_sapi_worker_wait_request() until we send a request.
412    let c_script = match CString::new(worker_script) {
413        Ok(s) => s,
414        Err(e) => {
415            tracing::error!(worker = worker_id, error = %e, "Invalid worker script path");
416            return;
417        }
418    };
419
420    // Create a minimal initial context for the boot phase
421    let mut boot_ctx = RequestCtx::new(Vec::new(), None, Vec::new(), None, None, 80, false);
422
423    // Launch PHP execution on THIS thread (it will block in the bext_handle_request loop)
424    // We use a separate thread to dispatch requests to it.
425    let dispatcher_rx = rx;
426    let dispatcher_active = active;
427    let dispatcher_total_requests = total_requests;
428    let dispatcher_total_errors = total_errors;
429    let dispatcher_total_exec_time_us = total_exec_time_us;
430
431    // Spawn a dispatcher thread that receives PhpRequests and feeds them to the worker
432    let request_tx_clone = request_tx.clone();
433    let dispatcher = std::thread::Builder::new()
434        .name(format!("bext-php-{}-dispatch", worker_id))
435        .spawn(move || {
436            loop {
437                // Drain any stale completion signal from a previous timed-out request.
438                // After a timeout, the PHP worker may still complete and send on done_tx,
439                // leaving a stale signal that would cause the next iteration to reclaim
440                // the wrong ctx_ptr (use-after-free).
441                while done_rx.try_recv().is_ok() {}
442
443                let request = match dispatcher_rx.recv_timeout(WORKER_RECV_TIMEOUT) {
444                    Ok(req) => req,
445                    Err(flume::RecvTimeoutError::Timeout) => continue,
446                    Err(flume::RecvTimeoutError::Disconnected) => break,
447                };
448
449                match request {
450                    PhpRequest::Shutdown => {
451                        // Signal the worker to exit by dropping the request channel
452                        drop(request_tx_clone);
453                        break;
454                    }
455                    PhpRequest::Execute(req) => {
456                        dispatcher_active.fetch_add(1, Ordering::Relaxed);
457                        dispatcher_total_requests.fetch_add(1, Ordering::Relaxed);
458                        let start = Instant::now();
459
460                        // Build a request context and send its pointer to the worker
461                        let mut req_ctx = Box::new(RequestCtx::new(
462                            req.body,
463                            req.cookies.as_deref(),
464                            req.headers,
465                            req.remote_addr.as_deref(),
466                            req.server_name.as_deref(),
467                            req.server_port,
468                            req.https,
469                        ));
470                        req_ctx.set_request_info(
471                            &req.method,
472                            &req.uri,
473                            &req.query_string,
474                            req.content_type.as_deref(),
475                        );
476
477                        let ctx_ptr = Box::into_raw(req_ctx) as *mut u8;
478
479                        // Send to the PHP worker (blocks until bext_handle_request picks it up)
480                        if request_tx_clone.send(SendPtr(ctx_ptr)).is_err() {
481                            // Worker died — convert back to Box to drop
482                            let _ = unsafe { Box::from_raw(ctx_ptr as *mut RequestCtx) };
483                            let _ = req.reply.send(PhpResponse::Error("Worker died".into()));
484                            dispatcher_active.fetch_sub(1, Ordering::Relaxed);
485                            dispatcher_total_errors.fetch_add(1, Ordering::Relaxed);
486                            break;
487                        }
488
489                        // Wait for the worker to finish processing
490                        match done_rx.recv_timeout(Duration::from_secs(60)) {
491                            Ok(()) => {
492                                // Reclaim the context and extract the response
493                                let req_ctx = unsafe { Box::from_raw(ctx_ptr as *mut RequestCtx) };
494                                let exec_time_us = start.elapsed().as_micros() as u64;
495                                dispatcher_total_exec_time_us
496                                    .fetch_add(exec_time_us, Ordering::Relaxed);
497
498                                let _ = req.reply.send(PhpResponse::Ok {
499                                    status: req_ctx.status_code,
500                                    body: req_ctx.output_buf,
501                                    headers: req_ctx.response_headers,
502                                    exec_time_us,
503                                });
504                            }
505                            Err(_) => {
506                                // On timeout, we intentionally do not free ctx_ptr to avoid double-free with the PHP worker thread. The worker will clean up when it completes.
507                                let _ = req
508                                    .reply
509                                    .send(PhpResponse::Error("PHP worker timeout".into()));
510                                dispatcher_total_errors.fetch_add(1, Ordering::Relaxed);
511                            }
512                        }
513
514                        dispatcher_active.fetch_sub(1, Ordering::Relaxed);
515                    }
516                }
517            }
518        });
519
520    let dispatcher = match dispatcher {
521        Ok(handle) => Some(handle),
522        Err(e) => {
523            tracing::error!(worker = worker_id, error = %e, "Failed to spawn dispatcher thread");
524            return;
525        }
526    };
527
528    // Run the PHP worker script on this thread.
529    // It will loop in bext_handle_request(), blocking on request_rx.
530    let exit_status = unsafe {
531        ffi::bext_php_execute_worker(
532            &mut boot_ctx as *mut RequestCtx as *mut ffi::BextRequestCtx,
533            c_script.as_ptr(),
534        )
535    };
536
537    tracing::info!(worker = worker_id, exit_status, "PHP worker script exited");
538
539    // Clean up: drop request_tx to unblock the dispatcher, then join it
540    drop(request_tx);
541    if let Some(d) = dispatcher {
542        let _ = d.join();
543    }
544
545    // Clean up thread-local state
546    WORKER_THREAD_STATE.with(|state| {
547        *state.borrow_mut() = None;
548    });
549}
550
551// ─── Classic mode execution ──────────────────────────────────────────────
552
553#[allow(clippy::too_many_arguments)]
554fn execute_classic(
555    script_path: &str,
556    method: &str,
557    uri: &str,
558    query_string: &str,
559    content_type: Option<&str>,
560    body: Vec<u8>,
561    cookies: Option<&str>,
562    headers: Vec<(String, String)>,
563    remote_addr: Option<&str>,
564    server_name: Option<&str>,
565    server_port: u16,
566    https: bool,
567) -> PhpResponse {
568    let c_script = match CString::new(script_path) {
569        Ok(s) => s,
570        Err(e) => return PhpResponse::Error(format!("Invalid script path: {}", e)),
571    };
572    let c_method = match CString::new(method) {
573        Ok(s) => s,
574        Err(e) => return PhpResponse::Error(format!("Invalid method: {}", e)),
575    };
576    let c_uri = match CString::new(uri) {
577        Ok(s) => s,
578        Err(e) => return PhpResponse::Error(format!("Invalid URI: {}", e)),
579    };
580    let c_query = match CString::new(query_string) {
581        Ok(s) => s,
582        Err(e) => return PhpResponse::Error(format!("Invalid query string: {}", e)),
583    };
584    let c_content_type = content_type.and_then(|ct| CString::new(ct).ok());
585    let content_length = body.len() as i64;
586
587    let mut req_ctx = RequestCtx::new(
588        body,
589        cookies,
590        headers,
591        remote_addr,
592        server_name,
593        server_port,
594        https,
595    );
596
597    let start = Instant::now();
598
599    let status = unsafe {
600        ffi::bext_php_execute_script(
601            &mut req_ctx as *mut RequestCtx as *mut ffi::BextRequestCtx,
602            c_script.as_ptr(),
603            c_method.as_ptr(),
604            c_uri.as_ptr(),
605            c_query.as_ptr(),
606            c_content_type
607                .as_ref()
608                .map(|c| c.as_ptr())
609                .unwrap_or(std::ptr::null()),
610            content_length,
611        )
612    };
613
614    let exec_time_us = start.elapsed().as_micros() as u64;
615
616    if status < 0 {
617        return PhpResponse::Error("PHP execution failed (request startup error)".into());
618    }
619
620    PhpResponse::Ok {
621        status: status as u16,
622        body: req_ctx.output_buf,
623        headers: req_ctx.response_headers,
624        exec_time_us,
625    }
626}