folk-runtime-embed 0.1.14

Embedded PHP runtime for Folk — PHP interpreter runs in-process via FFI
Documentation
//! Worker thread: runs PHP in a dedicated OS thread.
//!
//! Each worker thread owns an isolated PHP runtime and processes
//! requests one at a time. Communication with the async world is
//! via tokio mpsc channels.
#![allow(unsafe_code)]

use std::sync::atomic::{AtomicU32, Ordering};

use folk_protocol::RpcMessage;
use rmpv::Value;
use tokio::sync::mpsc;
use tracing::{debug, error, info, warn};

use crate::php::PhpInstance;

static NEXT_WORKER_ID: AtomicU32 = AtomicU32::new(1);

/// Messages sent from the handle to the worker thread.
pub(crate) enum WorkerCommand {
    /// Execute an RPC request.
    Task(RpcMessage),
    /// Shut down the worker.
    Terminate,
}

/// Start a worker thread. Returns the thread join handle and channels.
///
/// The worker immediately sends `control.ready` on `control_tx` after
/// booting PHP, then enters its request loop.
pub(crate) fn spawn_worker_thread(
    script: Option<String>,
) -> (
    std::thread::JoinHandle<()>,
    mpsc::UnboundedSender<WorkerCommand>,
    mpsc::UnboundedReceiver<RpcMessage>,
    mpsc::UnboundedReceiver<RpcMessage>,
    u32,
) {
    let worker_id = NEXT_WORKER_ID.fetch_add(1, Ordering::Relaxed);

    let (cmd_tx, cmd_rx) = mpsc::unbounded_channel::<WorkerCommand>();
    let (task_resp_tx, task_resp_rx) = mpsc::unbounded_channel::<RpcMessage>();
    let (control_tx, control_rx) = mpsc::unbounded_channel::<RpcMessage>();

    let handle = std::thread::Builder::new()
        .name(format!("folk-embed-{worker_id}"))
        .stack_size(256 * 1024 * 1024) // 256MB — PHP compiler needs deep stack for autoloader compilation
        .spawn(move || {
            worker_main(worker_id, script, cmd_rx, task_resp_tx, control_tx);
        })
        .expect("failed to spawn worker thread");

    (handle, cmd_tx, task_resp_rx, control_rx, worker_id)
}

#[allow(clippy::needless_pass_by_value)]
fn worker_main(
    worker_id: u32,
    script: Option<String>,
    mut cmd_rx: mpsc::UnboundedReceiver<WorkerCommand>,
    task_resp_tx: mpsc::UnboundedSender<RpcMessage>,
    control_tx: mpsc::UnboundedSender<RpcMessage>,
) {
    info!(worker_id, "embed worker starting");

    // Set environment so PHP bootstrap detects embed mode.
    // SAFETY: called at thread start before any PHP code runs.
    unsafe { std::env::set_var("FOLK_RUNTIME", "embed") };

    // Attach to the shared PHP module (initialized once by EmbedRuntime)
    let mut php = PhpInstance::attach();

    // Load bootstrap script if provided
    if let Some(ref script_path) = script {
        if let Err(e) = bootstrap_script(&mut php, script_path) {
            error!(worker_id, error = ?e, "failed to load bootstrap script");
            return;
        }
    }

    // Signal ready
    let ready_msg = RpcMessage::notify(
        "control.ready",
        Value::Map(vec![(
            Value::String("pid".into()),
            Value::Integer(worker_id.into()),
        )]),
    );
    if control_tx.send(ready_msg).is_err() {
        return;
    }

    info!(worker_id, "embed worker ready");

    // Request loop
    while let Some(cmd) = cmd_rx.blocking_recv() {
        match cmd {
            WorkerCommand::Terminate => {
                debug!(worker_id, "received terminate signal");
                break;
            },
            WorkerCommand::Task(request) => {
                let response = handle_request(&mut php, &request);
                if task_resp_tx.send(response).is_err() {
                    warn!(worker_id, "response channel closed");
                    break;
                }
            },
        }
    }

    info!(worker_id, "embed worker shutting down");
    // PhpInstance::drop() handles cleanup
}

fn handle_request(php: &mut PhpInstance, request: &RpcMessage) -> RpcMessage {
    let (msgid, method, params) = match request {
        RpcMessage::Request {
            msgid,
            method,
            params,
        } => (*msgid, method.as_str(), params),
        _ => {
            return RpcMessage::notify("error", Value::String("expected Request".into()));
        },
    };

    // No request_startup/shutdown per request — the bootstrap request
    // stays alive so user-defined functions (folk_embed_dispatch) persist.
    // This gives us zero per-request PHP lifecycle overhead.

    // Serialize params to msgpack for PHP
    let params_bytes = rmp_serde::to_vec(params).unwrap_or_default();

    // Call the dispatch function with SIGSEGV + fatal error protection.
    let result = dispatch_to_php(php, method, &params_bytes);

    match result {
        Ok(response_bytes) => {
            let response_val: Value = rmp_serde::from_slice(&response_bytes).unwrap_or(Value::Nil);
            RpcMessage::response_ok(msgid, response_val)
        },
        Err(e) => {
            warn!("PHP dispatch error: {e}");
            RpcMessage::response_err(msgid, Value::String(format!("{e}").into()))
        },
    }
}

/// Call PHP dispatch function with method and raw binary params.
///
/// The PHP side must define `folk_embed_dispatch(string $method, string $params): string`
/// where `$params` is raw binary (msgpack). No base64 encoding — data passes
/// directly via FFI pointers.
fn dispatch_to_php(php: &mut PhpInstance, method: &str, params: &[u8]) -> anyhow::Result<Vec<u8>> {
    let result = php.call_binary("folk_embed_dispatch", method, params);

    match result {
        Ok(bytes) if bytes.is_empty() => {
            // No dispatch function or empty response — echo params (for testing)
            Ok(params.to_vec())
        },
        Ok(bytes) => Ok(bytes),
        Err(e) => {
            // Check if function doesn't exist (call failed, not fatal)
            let msg = format!("{e}");
            if msg.contains("call failed") {
                // No dispatch function defined — return params as echo
                Ok(params.to_vec())
            } else {
                Err(e)
            }
        },
    }
}

/// Load and eval a bootstrap PHP script.
///
/// Starts a PHP request and loads the script. Does NOT call request_shutdown —
/// the bootstrap request stays alive for the worker's lifetime so that
/// user-defined functions (like `folk_embed_dispatch`) persist.
/// Without OPcache, php_request_shutdown destroys all eval'd code.
fn bootstrap_script(php: &mut PhpInstance, path: &str) -> anyhow::Result<()> {
    php.request_startup()?;

    // Resolve to absolute path so require works regardless of cwd
    let abs_path = std::path::Path::new(path)
        .canonicalize()
        .unwrap_or_else(|_| std::path::PathBuf::from(path));

    // Use execute_script (php_execute_script) instead of eval("require_once ...")
    // to avoid recursive compilation issues with zend_eval_string.
    let path_str = abs_path.to_str().unwrap_or(path);
    match php.execute_script(path_str) {
        Ok(_) => info!(path = %abs_path.display(), "bootstrap loaded"),
        Err(e) => {
            eprintln!("[folk-embed] BOOTSTRAP FAILED: {e}");
            return Err(e);
        },
    }

    // Verify dispatch function exists after bootstrap
    // Do NOT shutdown — keep bootstrap state (classes, functions) alive.
    // The dispatch function registered by EmbedLoop::run() must survive.

    Ok(())
}