#![allow(unsafe_code)]
use std::sync::atomic::{AtomicU32, Ordering};
use folk_protocol::RpcMessage;
use rmpv::Value;
use tokio::sync::mpsc;
use tracing::{debug, error, info, warn};
use crate::php::PhpInstance;
static NEXT_WORKER_ID: AtomicU32 = AtomicU32::new(1);
pub(crate) enum WorkerCommand {
Task(RpcMessage),
Terminate,
}
pub(crate) fn spawn_worker_thread(
script: Option<String>,
) -> (
std::thread::JoinHandle<()>,
mpsc::UnboundedSender<WorkerCommand>,
mpsc::UnboundedReceiver<RpcMessage>,
mpsc::UnboundedReceiver<RpcMessage>,
u32,
) {
let worker_id = NEXT_WORKER_ID.fetch_add(1, Ordering::Relaxed);
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel::<WorkerCommand>();
let (task_resp_tx, task_resp_rx) = mpsc::unbounded_channel::<RpcMessage>();
let (control_tx, control_rx) = mpsc::unbounded_channel::<RpcMessage>();
let handle = std::thread::Builder::new()
.name(format!("folk-embed-{worker_id}"))
.stack_size(256 * 1024 * 1024) .spawn(move || {
worker_main(worker_id, script, cmd_rx, task_resp_tx, control_tx);
})
.expect("failed to spawn worker thread");
(handle, cmd_tx, task_resp_rx, control_rx, worker_id)
}
#[allow(clippy::needless_pass_by_value)]
fn worker_main(
worker_id: u32,
script: Option<String>,
mut cmd_rx: mpsc::UnboundedReceiver<WorkerCommand>,
task_resp_tx: mpsc::UnboundedSender<RpcMessage>,
control_tx: mpsc::UnboundedSender<RpcMessage>,
) {
info!(worker_id, "embed worker starting");
unsafe { std::env::set_var("FOLK_RUNTIME", "embed") };
let mut php = PhpInstance::attach();
if let Some(ref script_path) = script {
if let Err(e) = bootstrap_script(&mut php, script_path) {
error!(worker_id, error = ?e, "failed to load bootstrap script");
return;
}
}
let ready_msg = RpcMessage::notify(
"control.ready",
Value::Map(vec![(
Value::String("pid".into()),
Value::Integer(worker_id.into()),
)]),
);
if control_tx.send(ready_msg).is_err() {
return;
}
info!(worker_id, "embed worker ready");
while let Some(cmd) = cmd_rx.blocking_recv() {
match cmd {
WorkerCommand::Terminate => {
debug!(worker_id, "received terminate signal");
break;
},
WorkerCommand::Task(request) => {
let response = handle_request(&mut php, &request);
if task_resp_tx.send(response).is_err() {
warn!(worker_id, "response channel closed");
break;
}
},
}
}
info!(worker_id, "embed worker shutting down");
}
fn handle_request(php: &mut PhpInstance, request: &RpcMessage) -> RpcMessage {
let (msgid, method, params) = match request {
RpcMessage::Request {
msgid,
method,
params,
} => (*msgid, method.as_str(), params),
_ => {
return RpcMessage::notify("error", Value::String("expected Request".into()));
},
};
let params_bytes = rmp_serde::to_vec(params).unwrap_or_default();
let result = dispatch_to_php(php, method, ¶ms_bytes);
match result {
Ok(response_bytes) => {
let response_val: Value = rmp_serde::from_slice(&response_bytes).unwrap_or(Value::Nil);
RpcMessage::response_ok(msgid, response_val)
},
Err(e) => {
warn!("PHP dispatch error: {e}");
RpcMessage::response_err(msgid, Value::String(format!("{e}").into()))
},
}
}
fn dispatch_to_php(php: &mut PhpInstance, method: &str, params: &[u8]) -> anyhow::Result<Vec<u8>> {
let result = php.call_binary("folk_embed_dispatch", method, params);
match result {
Ok(bytes) if bytes.is_empty() => {
Ok(params.to_vec())
},
Ok(bytes) => Ok(bytes),
Err(e) => {
let msg = format!("{e}");
if msg.contains("call failed") {
Ok(params.to_vec())
} else {
Err(e)
}
},
}
}
fn bootstrap_script(php: &mut PhpInstance, path: &str) -> anyhow::Result<()> {
php.request_startup()?;
let abs_path = std::path::Path::new(path)
.canonicalize()
.unwrap_or_else(|_| std::path::PathBuf::from(path));
let path_str = abs_path.to_str().unwrap_or(path);
match php.execute_script(path_str) {
Ok(_) => info!(path = %abs_path.display(), "bootstrap loaded"),
Err(e) => {
eprintln!("[folk-embed] BOOTSTRAP FAILED: {e}");
return Err(e);
},
}
Ok(())
}