1use super::context::RequestCtx;
11use super::ffi;
12use std::cell::RefCell;
13use std::ffi::CString;
14use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18pub(crate) struct SendPtr(pub(crate) *mut u8);
31unsafe impl Send for SendPtr {}
32
33pub(crate) struct WorkerThreadState {
35 pub(crate) request_rx: flume::Receiver<SendPtr>,
37 pub(crate) done_tx: flume::Sender<()>,
39}
40
41thread_local! {
42 pub static WORKER_THREAD_STATE: RefCell<Option<WorkerThreadState>> = const { RefCell::new(None) };
43}
44
45pub enum PhpRequest {
49 Execute(Box<PhpExecuteRequest>),
51 Shutdown,
52}
53
54pub struct PhpExecuteRequest {
56 pub script_path: String,
57 pub method: String,
58 pub uri: String,
59 pub query_string: String,
60 pub content_type: Option<String>,
61 pub body: Vec<u8>,
62 pub cookies: Option<String>,
63 pub headers: Vec<(String, String)>,
64 pub remote_addr: Option<String>,
65 pub server_name: Option<String>,
66 pub server_port: u16,
67 pub https: bool,
68 pub reply: flume::Sender<PhpResponse>,
69}
70
71#[derive(Debug, Clone)]
73pub enum PhpResponse {
74 Ok {
75 status: u16,
76 body: Vec<u8>,
77 headers: Vec<(String, String)>,
78 exec_time_us: u64,
79 },
80 Error(String),
81}
82
83#[derive(Debug, Clone, Default)]
85pub struct PhpPoolStats {
86 pub workers: u32,
87 pub active: u32,
88 pub total_requests: u64,
89 pub total_errors: u64,
90 pub avg_exec_time_us: u64,
91}
92
93#[derive(Debug, Clone)]
97pub enum PhpMode {
98 Classic,
100 Worker {
103 script: String,
105 },
106}
107
108pub struct PhpPool {
111 sender: flume::Sender<PhpRequest>,
112 workers: Vec<std::thread::JoinHandle<()>>,
113 active: Arc<AtomicU32>,
114 total_requests: Arc<AtomicU64>,
115 total_errors: Arc<AtomicU64>,
116 total_exec_time_us: Arc<AtomicU64>,
117 worker_count: u32,
118 mode: PhpMode,
119}
120
121impl PhpPool {
122 pub fn new(worker_count: usize) -> Result<Self, String> {
124 Self::create(worker_count, 0, PhpMode::Classic, None)
125 }
126
127 pub fn with_max_requests(worker_count: usize, max_requests: u64) -> Result<Self, String> {
129 Self::create(worker_count, max_requests, PhpMode::Classic, None)
130 }
131
132 pub fn worker(
135 worker_count: usize,
136 worker_script: String,
137 max_requests: u64,
138 ) -> Result<Self, String> {
139 Self::create(
140 worker_count,
141 max_requests,
142 PhpMode::Worker {
143 script: worker_script.clone(),
144 },
145 Some(worker_script),
146 )
147 }
148
149 fn create(
150 worker_count: usize,
151 max_requests: u64,
152 mode: PhpMode,
153 worker_script: Option<String>,
154 ) -> Result<Self, String> {
155 let (sender, receiver) = flume::bounded::<PhpRequest>(worker_count.max(1) * 128);
158 let active = Arc::new(AtomicU32::new(0));
159 let total_requests = Arc::new(AtomicU64::new(0));
160 let total_errors = Arc::new(AtomicU64::new(0));
161 let total_exec_time_us = Arc::new(AtomicU64::new(0));
162
163 let mut workers = Vec::with_capacity(worker_count);
164
165 for i in 0..worker_count {
166 let rx = receiver.clone();
167 let active = Arc::clone(&active);
168 let total_requests = Arc::clone(&total_requests);
169 let total_errors = Arc::clone(&total_errors);
170 let total_exec_time_us = Arc::clone(&total_exec_time_us);
171 let ws = worker_script.clone();
172
173 let handle = std::thread::Builder::new()
174 .name(format!("bext-php-{}", i))
175 .stack_size(16 * 1024 * 1024)
179 .spawn(move || {
180 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
182 if let Some(ref script) = ws {
183 worker_mode_loop(
184 i,
185 rx,
186 active,
187 total_requests,
188 total_errors,
189 total_exec_time_us,
190 script,
191 max_requests,
192 );
193 } else {
194 classic_mode_loop(
195 i,
196 rx,
197 active,
198 total_requests,
199 total_errors,
200 total_exec_time_us,
201 max_requests,
202 );
203 }
204 }));
205 if let Err(e) = result {
206 let msg = if let Some(s) = e.downcast_ref::<String>() {
207 s.clone()
208 } else if let Some(s) = e.downcast_ref::<&str>() {
209 s.to_string()
210 } else {
211 "unknown panic".to_string()
212 };
213 tracing::error!(worker = i, error = %msg, "PHP worker panicked");
214 }
215 })
216 .map_err(|e| format!("Failed to spawn PHP worker {}: {}", i, e))?;
217
218 workers.push(handle);
219 }
220
221 Ok(Self {
222 sender,
223 workers,
224 active,
225 total_requests,
226 total_errors,
227 total_exec_time_us,
228 worker_count: worker_count as u32,
229 mode,
230 })
231 }
232
233 #[allow(clippy::too_many_arguments)]
235 pub fn execute(
236 &self,
237 script_path: String,
238 method: String,
239 uri: String,
240 query_string: String,
241 content_type: Option<String>,
242 body: Vec<u8>,
243 cookies: Option<String>,
244 headers: Vec<(String, String)>,
245 remote_addr: Option<String>,
246 server_name: Option<String>,
247 server_port: u16,
248 https: bool,
249 ) -> Result<PhpResponse, String> {
250 let (tx, rx) = flume::bounded(1);
251 self.sender
252 .send_timeout(
253 PhpRequest::Execute(Box::new(PhpExecuteRequest {
254 script_path,
255 method,
256 uri,
257 query_string,
258 content_type,
259 body,
260 cookies,
261 headers,
262 remote_addr,
263 server_name,
264 server_port,
265 https,
266 reply: tx,
267 })),
268 Duration::from_secs(30),
269 )
270 .map_err(|_| "PHP pool queue timeout (30s)".to_string())?;
271 rx.recv_timeout(Duration::from_secs(60))
272 .map_err(|_| "PHP worker timeout (60s)".to_string())
273 }
274
275 pub fn healthy_workers(&self) -> u32 {
276 self.workers.iter().filter(|h| !h.is_finished()).count() as u32
277 }
278
279 pub fn stats(&self) -> PhpPoolStats {
280 let total = self.total_requests.load(Ordering::Relaxed);
281 PhpPoolStats {
282 workers: self.worker_count,
283 active: self.active.load(Ordering::Relaxed),
284 total_requests: total,
285 total_errors: self.total_errors.load(Ordering::Relaxed),
286 avg_exec_time_us: if total > 0 {
287 self.total_exec_time_us.load(Ordering::Relaxed) / total
288 } else {
289 0
290 },
291 }
292 }
293
294 pub fn mode(&self) -> &PhpMode {
295 &self.mode
296 }
297
298 pub fn shutdown(self) {
299 for _ in &self.workers {
300 let _ = self.sender.send(PhpRequest::Shutdown);
301 }
302 for handle in self.workers {
303 let _ = handle.join();
304 }
305 }
306}
307
308const WORKER_RECV_TIMEOUT: Duration = Duration::from_secs(30);
309
310fn classic_mode_loop(
313 worker_id: usize,
314 rx: flume::Receiver<PhpRequest>,
315 active: Arc<AtomicU32>,
316 total_requests: Arc<AtomicU64>,
317 total_errors: Arc<AtomicU64>,
318 total_exec_time_us: Arc<AtomicU64>,
319 max_requests: u64,
320) {
321 tracing::info!(worker = worker_id, mode = "classic", "PHP worker started");
322 let mut local_count: u64 = 0;
323
324 loop {
325 if max_requests > 0 && local_count >= max_requests {
326 tracing::info!(
327 worker = worker_id,
328 requests = local_count,
329 "PHP worker rotating"
330 );
331 break;
332 }
333
334 let request = match rx.recv_timeout(WORKER_RECV_TIMEOUT) {
335 Ok(req) => req,
336 Err(flume::RecvTimeoutError::Timeout) => continue,
337 Err(flume::RecvTimeoutError::Disconnected) => break,
338 };
339
340 match request {
341 PhpRequest::Shutdown => break,
342 PhpRequest::Execute(req) => {
343 active.fetch_add(1, Ordering::Relaxed);
344 total_requests.fetch_add(1, Ordering::Relaxed);
345 local_count += 1;
346
347 let response = execute_classic(
348 &req.script_path,
349 &req.method,
350 &req.uri,
351 &req.query_string,
352 req.content_type.as_deref(),
353 req.body,
354 req.cookies.as_deref(),
355 req.headers,
356 req.remote_addr.as_deref(),
357 req.server_name.as_deref(),
358 req.server_port,
359 req.https,
360 );
361
362 match &response {
363 PhpResponse::Ok { exec_time_us, .. } => {
364 total_exec_time_us.fetch_add(*exec_time_us, Ordering::Relaxed);
365 }
366 PhpResponse::Error(_) => {
367 total_errors.fetch_add(1, Ordering::Relaxed);
368 }
369 }
370
371 let _ = req.reply.send(response);
372 active.fetch_sub(1, Ordering::Relaxed);
373 }
374 }
375 }
376
377 tracing::info!(worker = worker_id, "PHP worker stopped");
378}
379
380#[allow(clippy::too_many_arguments)]
383fn worker_mode_loop(
384 worker_id: usize,
385 rx: flume::Receiver<PhpRequest>,
386 active: Arc<AtomicU32>,
387 total_requests: Arc<AtomicU64>,
388 total_errors: Arc<AtomicU64>,
389 total_exec_time_us: Arc<AtomicU64>,
390 worker_script: &str,
391 _max_requests: u64,
392) {
393 tracing::info!(worker = worker_id, mode = "worker", script = %worker_script, "PHP worker started");
394
395 let (request_tx, request_rx) = flume::bounded::<SendPtr>(1);
399 let (done_tx, done_rx) = flume::bounded::<()>(1);
400
401 WORKER_THREAD_STATE.with(|state| {
403 *state.borrow_mut() = Some(WorkerThreadState {
404 request_rx,
405 done_tx,
406 });
407 });
408
409 let c_script = match CString::new(worker_script) {
413 Ok(s) => s,
414 Err(e) => {
415 tracing::error!(worker = worker_id, error = %e, "Invalid worker script path");
416 return;
417 }
418 };
419
420 let mut boot_ctx = RequestCtx::new(Vec::new(), None, Vec::new(), None, None, 80, false);
422
423 let dispatcher_rx = rx;
426 let dispatcher_active = active;
427 let dispatcher_total_requests = total_requests;
428 let dispatcher_total_errors = total_errors;
429 let dispatcher_total_exec_time_us = total_exec_time_us;
430
431 let request_tx_clone = request_tx.clone();
433 let dispatcher = std::thread::Builder::new()
434 .name(format!("bext-php-{}-dispatch", worker_id))
435 .spawn(move || {
436 loop {
437 while done_rx.try_recv().is_ok() {}
442
443 let request = match dispatcher_rx.recv_timeout(WORKER_RECV_TIMEOUT) {
444 Ok(req) => req,
445 Err(flume::RecvTimeoutError::Timeout) => continue,
446 Err(flume::RecvTimeoutError::Disconnected) => break,
447 };
448
449 match request {
450 PhpRequest::Shutdown => {
451 drop(request_tx_clone);
453 break;
454 }
455 PhpRequest::Execute(req) => {
456 dispatcher_active.fetch_add(1, Ordering::Relaxed);
457 dispatcher_total_requests.fetch_add(1, Ordering::Relaxed);
458 let start = Instant::now();
459
460 let mut req_ctx = Box::new(RequestCtx::new(
462 req.body,
463 req.cookies.as_deref(),
464 req.headers,
465 req.remote_addr.as_deref(),
466 req.server_name.as_deref(),
467 req.server_port,
468 req.https,
469 ));
470 req_ctx.set_request_info(
471 &req.method,
472 &req.uri,
473 &req.query_string,
474 req.content_type.as_deref(),
475 );
476
477 let ctx_ptr = Box::into_raw(req_ctx) as *mut u8;
478
479 if request_tx_clone.send(SendPtr(ctx_ptr)).is_err() {
481 let _ = unsafe { Box::from_raw(ctx_ptr as *mut RequestCtx) };
483 let _ = req.reply.send(PhpResponse::Error("Worker died".into()));
484 dispatcher_active.fetch_sub(1, Ordering::Relaxed);
485 dispatcher_total_errors.fetch_add(1, Ordering::Relaxed);
486 break;
487 }
488
489 match done_rx.recv_timeout(Duration::from_secs(60)) {
491 Ok(()) => {
492 let req_ctx = unsafe { Box::from_raw(ctx_ptr as *mut RequestCtx) };
494 let exec_time_us = start.elapsed().as_micros() as u64;
495 dispatcher_total_exec_time_us
496 .fetch_add(exec_time_us, Ordering::Relaxed);
497
498 let _ = req.reply.send(PhpResponse::Ok {
499 status: req_ctx.status_code,
500 body: req_ctx.output_buf,
501 headers: req_ctx.response_headers,
502 exec_time_us,
503 });
504 }
505 Err(_) => {
506 let _ = req
508 .reply
509 .send(PhpResponse::Error("PHP worker timeout".into()));
510 dispatcher_total_errors.fetch_add(1, Ordering::Relaxed);
511 }
512 }
513
514 dispatcher_active.fetch_sub(1, Ordering::Relaxed);
515 }
516 }
517 }
518 });
519
520 let dispatcher = match dispatcher {
521 Ok(handle) => Some(handle),
522 Err(e) => {
523 tracing::error!(worker = worker_id, error = %e, "Failed to spawn dispatcher thread");
524 return;
525 }
526 };
527
528 let exit_status = unsafe {
531 ffi::bext_php_execute_worker(
532 &mut boot_ctx as *mut RequestCtx as *mut ffi::BextRequestCtx,
533 c_script.as_ptr(),
534 )
535 };
536
537 tracing::info!(worker = worker_id, exit_status, "PHP worker script exited");
538
539 drop(request_tx);
541 if let Some(d) = dispatcher {
542 let _ = d.join();
543 }
544
545 WORKER_THREAD_STATE.with(|state| {
547 *state.borrow_mut() = None;
548 });
549}
550
551#[allow(clippy::too_many_arguments)]
554fn execute_classic(
555 script_path: &str,
556 method: &str,
557 uri: &str,
558 query_string: &str,
559 content_type: Option<&str>,
560 body: Vec<u8>,
561 cookies: Option<&str>,
562 headers: Vec<(String, String)>,
563 remote_addr: Option<&str>,
564 server_name: Option<&str>,
565 server_port: u16,
566 https: bool,
567) -> PhpResponse {
568 let c_script = match CString::new(script_path) {
569 Ok(s) => s,
570 Err(e) => return PhpResponse::Error(format!("Invalid script path: {}", e)),
571 };
572 let c_method = match CString::new(method) {
573 Ok(s) => s,
574 Err(e) => return PhpResponse::Error(format!("Invalid method: {}", e)),
575 };
576 let c_uri = match CString::new(uri) {
577 Ok(s) => s,
578 Err(e) => return PhpResponse::Error(format!("Invalid URI: {}", e)),
579 };
580 let c_query = match CString::new(query_string) {
581 Ok(s) => s,
582 Err(e) => return PhpResponse::Error(format!("Invalid query string: {}", e)),
583 };
584 let c_content_type = content_type.and_then(|ct| CString::new(ct).ok());
585 let content_length = body.len() as i64;
586
587 let mut req_ctx = RequestCtx::new(
588 body,
589 cookies,
590 headers,
591 remote_addr,
592 server_name,
593 server_port,
594 https,
595 );
596
597 let start = Instant::now();
598
599 let status = unsafe {
600 ffi::bext_php_execute_script(
601 &mut req_ctx as *mut RequestCtx as *mut ffi::BextRequestCtx,
602 c_script.as_ptr(),
603 c_method.as_ptr(),
604 c_uri.as_ptr(),
605 c_query.as_ptr(),
606 c_content_type
607 .as_ref()
608 .map(|c| c.as_ptr())
609 .unwrap_or(std::ptr::null()),
610 content_length,
611 )
612 };
613
614 let exec_time_us = start.elapsed().as_micros() as u64;
615
616 if status < 0 {
617 return PhpResponse::Error("PHP execution failed (request startup error)".into());
618 }
619
620 PhpResponse::Ok {
621 status: status as u16,
622 body: req_ctx.output_buf,
623 headers: req_ctx.response_headers,
624 exec_time_us,
625 }
626}