use super::context::RequestCtx;
use super::ffi;
use std::cell::RefCell;
use std::ffi::CString;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
pub(crate) struct SendPtr(pub(crate) *mut u8);
unsafe impl Send for SendPtr {}
pub(crate) struct WorkerThreadState {
pub(crate) request_rx: flume::Receiver<SendPtr>,
pub(crate) done_tx: flume::Sender<()>,
}
thread_local! {
pub static WORKER_THREAD_STATE: RefCell<Option<WorkerThreadState>> = const { RefCell::new(None) };
}
pub enum PhpRequest {
Execute(Box<PhpExecuteRequest>),
Shutdown,
}
pub struct PhpExecuteRequest {
pub script_path: String,
pub method: String,
pub uri: String,
pub query_string: String,
pub content_type: Option<String>,
pub body: Vec<u8>,
pub cookies: Option<String>,
pub headers: Vec<(String, String)>,
pub remote_addr: Option<String>,
pub server_name: Option<String>,
pub server_port: u16,
pub https: bool,
pub reply: flume::Sender<PhpResponse>,
}
#[derive(Debug, Clone)]
pub enum PhpResponse {
Ok {
status: u16,
body: Vec<u8>,
headers: Vec<(String, String)>,
exec_time_us: u64,
},
Error(String),
}
#[derive(Debug, Clone, Default)]
pub struct PhpPoolStats {
pub workers: u32,
pub active: u32,
pub total_requests: u64,
pub total_errors: u64,
pub avg_exec_time_us: u64,
}
#[derive(Debug, Clone)]
pub enum PhpMode {
Classic,
Worker {
script: String,
},
}
pub struct PhpPool {
sender: flume::Sender<PhpRequest>,
workers: Vec<std::thread::JoinHandle<()>>,
active: Arc<AtomicU32>,
total_requests: Arc<AtomicU64>,
total_errors: Arc<AtomicU64>,
total_exec_time_us: Arc<AtomicU64>,
worker_count: u32,
mode: PhpMode,
}
impl PhpPool {
pub fn new(worker_count: usize) -> Result<Self, String> {
Self::create(worker_count, 0, PhpMode::Classic, None)
}
pub fn with_max_requests(worker_count: usize, max_requests: u64) -> Result<Self, String> {
Self::create(worker_count, max_requests, PhpMode::Classic, None)
}
pub fn worker(
worker_count: usize,
worker_script: String,
max_requests: u64,
) -> Result<Self, String> {
Self::create(
worker_count,
max_requests,
PhpMode::Worker {
script: worker_script.clone(),
},
Some(worker_script),
)
}
fn create(
worker_count: usize,
max_requests: u64,
mode: PhpMode,
worker_script: Option<String>,
) -> Result<Self, String> {
let (sender, receiver) = flume::bounded::<PhpRequest>(worker_count.max(1) * 128);
let active = Arc::new(AtomicU32::new(0));
let total_requests = Arc::new(AtomicU64::new(0));
let total_errors = Arc::new(AtomicU64::new(0));
let total_exec_time_us = Arc::new(AtomicU64::new(0));
let mut workers = Vec::with_capacity(worker_count);
for i in 0..worker_count {
let rx = receiver.clone();
let active = Arc::clone(&active);
let total_requests = Arc::clone(&total_requests);
let total_errors = Arc::clone(&total_errors);
let total_exec_time_us = Arc::clone(&total_exec_time_us);
let ws = worker_script.clone();
let handle = std::thread::Builder::new()
.name(format!("bext-php-{}", i))
.stack_size(16 * 1024 * 1024)
.spawn(move || {
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
if let Some(ref script) = ws {
worker_mode_loop(
i,
rx,
active,
total_requests,
total_errors,
total_exec_time_us,
script,
max_requests,
);
} else {
classic_mode_loop(
i,
rx,
active,
total_requests,
total_errors,
total_exec_time_us,
max_requests,
);
}
}));
if let Err(e) = result {
let msg = if let Some(s) = e.downcast_ref::<String>() {
s.clone()
} else if let Some(s) = e.downcast_ref::<&str>() {
s.to_string()
} else {
"unknown panic".to_string()
};
tracing::error!(worker = i, error = %msg, "PHP worker panicked");
}
})
.map_err(|e| format!("Failed to spawn PHP worker {}: {}", i, e))?;
workers.push(handle);
}
Ok(Self {
sender,
workers,
active,
total_requests,
total_errors,
total_exec_time_us,
worker_count: worker_count as u32,
mode,
})
}
#[allow(clippy::too_many_arguments)]
pub fn execute(
&self,
script_path: String,
method: String,
uri: String,
query_string: String,
content_type: Option<String>,
body: Vec<u8>,
cookies: Option<String>,
headers: Vec<(String, String)>,
remote_addr: Option<String>,
server_name: Option<String>,
server_port: u16,
https: bool,
) -> Result<PhpResponse, String> {
let (tx, rx) = flume::bounded(1);
self.sender
.send_timeout(
PhpRequest::Execute(Box::new(PhpExecuteRequest {
script_path,
method,
uri,
query_string,
content_type,
body,
cookies,
headers,
remote_addr,
server_name,
server_port,
https,
reply: tx,
})),
Duration::from_secs(30),
)
.map_err(|_| "PHP pool queue timeout (30s)".to_string())?;
rx.recv_timeout(Duration::from_secs(60))
.map_err(|_| "PHP worker timeout (60s)".to_string())
}
pub fn healthy_workers(&self) -> u32 {
self.workers.iter().filter(|h| !h.is_finished()).count() as u32
}
pub fn stats(&self) -> PhpPoolStats {
let total = self.total_requests.load(Ordering::Relaxed);
PhpPoolStats {
workers: self.worker_count,
active: self.active.load(Ordering::Relaxed),
total_requests: total,
total_errors: self.total_errors.load(Ordering::Relaxed),
avg_exec_time_us: if total > 0 {
self.total_exec_time_us.load(Ordering::Relaxed) / total
} else {
0
},
}
}
pub fn mode(&self) -> &PhpMode {
&self.mode
}
pub fn shutdown(self) {
for _ in &self.workers {
let _ = self.sender.send(PhpRequest::Shutdown);
}
for handle in self.workers {
let _ = handle.join();
}
}
}
const WORKER_RECV_TIMEOUT: Duration = Duration::from_secs(30);
fn classic_mode_loop(
worker_id: usize,
rx: flume::Receiver<PhpRequest>,
active: Arc<AtomicU32>,
total_requests: Arc<AtomicU64>,
total_errors: Arc<AtomicU64>,
total_exec_time_us: Arc<AtomicU64>,
max_requests: u64,
) {
tracing::info!(worker = worker_id, mode = "classic", "PHP worker started");
let mut local_count: u64 = 0;
loop {
if max_requests > 0 && local_count >= max_requests {
tracing::info!(
worker = worker_id,
requests = local_count,
"PHP worker rotating"
);
break;
}
let request = match rx.recv_timeout(WORKER_RECV_TIMEOUT) {
Ok(req) => req,
Err(flume::RecvTimeoutError::Timeout) => continue,
Err(flume::RecvTimeoutError::Disconnected) => break,
};
match request {
PhpRequest::Shutdown => break,
PhpRequest::Execute(req) => {
active.fetch_add(1, Ordering::Relaxed);
total_requests.fetch_add(1, Ordering::Relaxed);
local_count += 1;
let response = execute_classic(
&req.script_path,
&req.method,
&req.uri,
&req.query_string,
req.content_type.as_deref(),
req.body,
req.cookies.as_deref(),
req.headers,
req.remote_addr.as_deref(),
req.server_name.as_deref(),
req.server_port,
req.https,
);
match &response {
PhpResponse::Ok { exec_time_us, .. } => {
total_exec_time_us.fetch_add(*exec_time_us, Ordering::Relaxed);
}
PhpResponse::Error(_) => {
total_errors.fetch_add(1, Ordering::Relaxed);
}
}
let _ = req.reply.send(response);
active.fetch_sub(1, Ordering::Relaxed);
}
}
}
tracing::info!(worker = worker_id, "PHP worker stopped");
}
#[allow(clippy::too_many_arguments)]
fn worker_mode_loop(
worker_id: usize,
rx: flume::Receiver<PhpRequest>,
active: Arc<AtomicU32>,
total_requests: Arc<AtomicU64>,
total_errors: Arc<AtomicU64>,
total_exec_time_us: Arc<AtomicU64>,
worker_script: &str,
_max_requests: u64,
) {
tracing::info!(worker = worker_id, mode = "worker", script = %worker_script, "PHP worker started");
let (request_tx, request_rx) = flume::bounded::<SendPtr>(1);
let (done_tx, done_rx) = flume::bounded::<()>(1);
WORKER_THREAD_STATE.with(|state| {
*state.borrow_mut() = Some(WorkerThreadState {
request_rx,
done_tx,
});
});
let c_script = match CString::new(worker_script) {
Ok(s) => s,
Err(e) => {
tracing::error!(worker = worker_id, error = %e, "Invalid worker script path");
return;
}
};
let mut boot_ctx = RequestCtx::new(Vec::new(), None, Vec::new(), None, None, 80, false);
let dispatcher_rx = rx;
let dispatcher_active = active;
let dispatcher_total_requests = total_requests;
let dispatcher_total_errors = total_errors;
let dispatcher_total_exec_time_us = total_exec_time_us;
let request_tx_clone = request_tx.clone();
let dispatcher = std::thread::Builder::new()
.name(format!("bext-php-{}-dispatch", worker_id))
.spawn(move || {
loop {
while done_rx.try_recv().is_ok() {}
let request = match dispatcher_rx.recv_timeout(WORKER_RECV_TIMEOUT) {
Ok(req) => req,
Err(flume::RecvTimeoutError::Timeout) => continue,
Err(flume::RecvTimeoutError::Disconnected) => break,
};
match request {
PhpRequest::Shutdown => {
drop(request_tx_clone);
break;
}
PhpRequest::Execute(req) => {
dispatcher_active.fetch_add(1, Ordering::Relaxed);
dispatcher_total_requests.fetch_add(1, Ordering::Relaxed);
let start = Instant::now();
let mut req_ctx = Box::new(RequestCtx::new(
req.body,
req.cookies.as_deref(),
req.headers,
req.remote_addr.as_deref(),
req.server_name.as_deref(),
req.server_port,
req.https,
));
req_ctx.set_request_info(
&req.method,
&req.uri,
&req.query_string,
req.content_type.as_deref(),
);
let ctx_ptr = Box::into_raw(req_ctx) as *mut u8;
if request_tx_clone.send(SendPtr(ctx_ptr)).is_err() {
let _ = unsafe { Box::from_raw(ctx_ptr as *mut RequestCtx) };
let _ = req.reply.send(PhpResponse::Error("Worker died".into()));
dispatcher_active.fetch_sub(1, Ordering::Relaxed);
dispatcher_total_errors.fetch_add(1, Ordering::Relaxed);
break;
}
match done_rx.recv_timeout(Duration::from_secs(60)) {
Ok(()) => {
let req_ctx = unsafe { Box::from_raw(ctx_ptr as *mut RequestCtx) };
let exec_time_us = start.elapsed().as_micros() as u64;
dispatcher_total_exec_time_us
.fetch_add(exec_time_us, Ordering::Relaxed);
let _ = req.reply.send(PhpResponse::Ok {
status: req_ctx.status_code,
body: req_ctx.output_buf,
headers: req_ctx.response_headers,
exec_time_us,
});
}
Err(_) => {
let _ = req
.reply
.send(PhpResponse::Error("PHP worker timeout".into()));
dispatcher_total_errors.fetch_add(1, Ordering::Relaxed);
}
}
dispatcher_active.fetch_sub(1, Ordering::Relaxed);
}
}
}
});
let dispatcher = match dispatcher {
Ok(handle) => Some(handle),
Err(e) => {
tracing::error!(worker = worker_id, error = %e, "Failed to spawn dispatcher thread");
return;
}
};
let exit_status = unsafe {
ffi::bext_php_execute_worker(
&mut boot_ctx as *mut RequestCtx as *mut ffi::BextRequestCtx,
c_script.as_ptr(),
)
};
tracing::info!(worker = worker_id, exit_status, "PHP worker script exited");
drop(request_tx);
if let Some(d) = dispatcher {
let _ = d.join();
}
WORKER_THREAD_STATE.with(|state| {
*state.borrow_mut() = None;
});
}
#[allow(clippy::too_many_arguments)]
fn execute_classic(
script_path: &str,
method: &str,
uri: &str,
query_string: &str,
content_type: Option<&str>,
body: Vec<u8>,
cookies: Option<&str>,
headers: Vec<(String, String)>,
remote_addr: Option<&str>,
server_name: Option<&str>,
server_port: u16,
https: bool,
) -> PhpResponse {
let c_script = match CString::new(script_path) {
Ok(s) => s,
Err(e) => return PhpResponse::Error(format!("Invalid script path: {}", e)),
};
let c_method = match CString::new(method) {
Ok(s) => s,
Err(e) => return PhpResponse::Error(format!("Invalid method: {}", e)),
};
let c_uri = match CString::new(uri) {
Ok(s) => s,
Err(e) => return PhpResponse::Error(format!("Invalid URI: {}", e)),
};
let c_query = match CString::new(query_string) {
Ok(s) => s,
Err(e) => return PhpResponse::Error(format!("Invalid query string: {}", e)),
};
let c_content_type = content_type.and_then(|ct| CString::new(ct).ok());
let content_length = body.len() as i64;
let mut req_ctx = RequestCtx::new(
body,
cookies,
headers,
remote_addr,
server_name,
server_port,
https,
);
let start = Instant::now();
let status = unsafe {
ffi::bext_php_execute_script(
&mut req_ctx as *mut RequestCtx as *mut ffi::BextRequestCtx,
c_script.as_ptr(),
c_method.as_ptr(),
c_uri.as_ptr(),
c_query.as_ptr(),
c_content_type
.as_ref()
.map(|c| c.as_ptr())
.unwrap_or(std::ptr::null()),
content_length,
)
};
let exec_time_us = start.elapsed().as_micros() as u64;
if status < 0 {
return PhpResponse::Error("PHP execution failed (request startup error)".into());
}
PhpResponse::Ok {
status: status as u16,
body: req_ctx.output_buf,
headers: req_ctx.response_headers,
exec_time_us,
}
}