folk-runtime-embed 0.1.4

Embedded PHP runtime for Folk — PHP interpreter runs in-process via FFI
Documentation
//! Worker thread: runs PHP in a dedicated OS thread.
//!
//! Each worker thread owns an isolated PHP runtime and processes
//! requests one at a time. Communication with the async world is
//! via tokio mpsc channels.
#![allow(unsafe_code)]

use std::sync::atomic::{AtomicU32, Ordering};

use folk_protocol::RpcMessage;
use rmpv::Value;
use tokio::sync::mpsc;
use tracing::{debug, error, info, warn};

use crate::php::PhpInstance;

static NEXT_WORKER_ID: AtomicU32 = AtomicU32::new(1);

/// Messages sent from the handle to the worker thread.
pub(crate) enum WorkerCommand {
    /// Execute an RPC request.
    Task(RpcMessage),
    /// Shut down the worker.
    Terminate,
}

/// Start a worker thread. Returns the thread join handle and channels.
///
/// The worker immediately sends `control.ready` on `control_tx` after
/// booting PHP, then enters its request loop.
pub(crate) fn spawn_worker_thread(
    script: Option<String>,
) -> (
    std::thread::JoinHandle<()>,
    mpsc::UnboundedSender<WorkerCommand>,
    mpsc::UnboundedReceiver<RpcMessage>,
    mpsc::UnboundedReceiver<RpcMessage>,
    u32,
) {
    let worker_id = NEXT_WORKER_ID.fetch_add(1, Ordering::Relaxed);

    let (cmd_tx, cmd_rx) = mpsc::unbounded_channel::<WorkerCommand>();
    let (task_resp_tx, task_resp_rx) = mpsc::unbounded_channel::<RpcMessage>();
    let (control_tx, control_rx) = mpsc::unbounded_channel::<RpcMessage>();

    let handle = std::thread::Builder::new()
        .name(format!("folk-embed-{worker_id}"))
        .spawn(move || {
            worker_main(worker_id, script, cmd_rx, task_resp_tx, control_tx);
        })
        .expect("failed to spawn worker thread");

    (handle, cmd_tx, task_resp_rx, control_rx, worker_id)
}

#[allow(clippy::needless_pass_by_value)]
fn worker_main(
    worker_id: u32,
    script: Option<String>,
    mut cmd_rx: mpsc::UnboundedReceiver<WorkerCommand>,
    task_resp_tx: mpsc::UnboundedSender<RpcMessage>,
    control_tx: mpsc::UnboundedSender<RpcMessage>,
) {
    info!(worker_id, "embed worker starting");

    // Set environment so PHP bootstrap detects embed mode.
    // SAFETY: called at thread start before any PHP code runs.
    unsafe { std::env::set_var("FOLK_RUNTIME", "embed") };

    // Boot PHP
    let mut php = match PhpInstance::boot_custom_sapi() {
        Ok(php) => php,
        Err(e) => {
            error!(worker_id, error = ?e, "failed to boot PHP");
            return;
        },
    };

    // Load bootstrap script if provided
    if let Some(ref script_path) = script {
        if let Err(e) = bootstrap_script(&mut php, script_path) {
            error!(worker_id, error = ?e, "failed to load bootstrap script");
            return;
        }
    }

    // Signal ready
    let ready_msg = RpcMessage::notify(
        "control.ready",
        Value::Map(vec![(
            Value::String("pid".into()),
            Value::Integer(worker_id.into()),
        )]),
    );
    if control_tx.send(ready_msg).is_err() {
        return;
    }

    info!(worker_id, "embed worker ready");

    // Request loop
    while let Some(cmd) = cmd_rx.blocking_recv() {
        match cmd {
            WorkerCommand::Terminate => {
                debug!(worker_id, "received terminate signal");
                break;
            },
            WorkerCommand::Task(request) => {
                let response = handle_request(&mut php, &request);
                if task_resp_tx.send(response).is_err() {
                    warn!(worker_id, "response channel closed");
                    break;
                }
            },
        }
    }

    info!(worker_id, "embed worker shutting down");
    // PhpInstance::drop() handles cleanup
}

fn handle_request(php: &mut PhpInstance, request: &RpcMessage) -> RpcMessage {
    let (msgid, method, params) = match request {
        RpcMessage::Request {
            msgid,
            method,
            params,
        } => (*msgid, method.as_str(), params),
        _ => {
            return RpcMessage::notify("error", Value::String("expected Request".into()));
        },
    };

    // Start a new request cycle
    if let Err(e) = php.request_startup() {
        return RpcMessage::response_err(
            msgid,
            Value::String(format!("request_startup failed: {e}").into()),
        );
    }

    // Serialize params to msgpack for PHP
    let params_bytes = rmp_serde::to_vec(params).unwrap_or_default();

    // Call the dispatch function with SIGSEGV + fatal error protection.
    let result = dispatch_to_php(php, method, &params_bytes);

    // Always shutdown request — even after fatal errors.
    // folk_request_shutdown_safe wraps in zend_try, so it's safe.
    php.request_shutdown();

    match result {
        Ok(response_bytes) => {
            let response_val: Value = rmp_serde::from_slice(&response_bytes).unwrap_or(Value::Nil);
            RpcMessage::response_ok(msgid, response_val)
        },
        Err(e) => {
            warn!("PHP dispatch error: {e}");
            RpcMessage::response_err(msgid, Value::String(format!("{e}").into()))
        },
    }
}

/// Call PHP dispatch function with method and raw binary params.
///
/// The PHP side must define `folk_embed_dispatch(string $method, string $params): string`
/// where `$params` is raw binary (msgpack). No base64 encoding — data passes
/// directly via FFI pointers.
fn dispatch_to_php(php: &mut PhpInstance, method: &str, params: &[u8]) -> anyhow::Result<Vec<u8>> {
    let result = php.call_binary("folk_embed_dispatch", method, params);

    match result {
        Ok(bytes) if bytes.is_empty() => {
            // No dispatch function or empty response — echo params (for testing)
            Ok(params.to_vec())
        },
        Ok(bytes) => Ok(bytes),
        Err(e) => {
            // Check if function doesn't exist (call failed, not fatal)
            let msg = format!("{e}");
            if msg.contains("call failed") {
                // No dispatch function defined — return params as echo
                Ok(params.to_vec())
            } else {
                Err(e)
            }
        },
    }
}

/// Load and eval a bootstrap PHP script.
fn bootstrap_script(php: &mut PhpInstance, path: &str) -> anyhow::Result<()> {
    php.request_startup()?;

    // Use require_once to load the script
    let code = format!("require_once '{path}';");
    php.eval(&code)?;

    // Don't shutdown request yet — keep the bootstrap state (functions, classes)
    // Actually we need to shutdown to match the per-request lifecycle.
    // The loaded code (functions, classes) persists across requests in embed mode
    // because they're compiled into OPcache / function table.
    php.request_shutdown();

    Ok(())
}