Skip to main content

folk_ext/
lib.rs

1//! Folk PHP extension core — server lifecycle + worker bridge.
2//!
3//! Uses `std::sync` channels for worker communication (no tokio dependency
4//! on the worker thread side). Supports multi-worker via ZTS threads.
5
6pub mod bridge;
7pub mod registry;
8pub mod runtime;
9pub mod worker;
10pub mod zts;
11pub mod zval_convert;
12
13use std::sync::{Arc, Barrier, Mutex, OnceLock};
14use std::thread;
15
16use anyhow::Context as _;
17use ext_php_rs::binary::Binary;
18use ext_php_rs::prelude::*;
19use folk_api::Plugin;
20use folk_core::config::FolkConfig;
21use tracing::info;
22
23use crate::registry::InProcessRegistry;
24use crate::runtime::{ExtensionRuntime, WorkerTxSide};
25
26pub use folk_core;
27
28static REGISTRY: OnceLock<Arc<InProcessRegistry>> = OnceLock::new();
29static TOKIO_HANDLE: OnceLock<tokio::runtime::Handle> = OnceLock::new();
30static PROJECT_ROOT: OnceLock<std::path::PathBuf> = OnceLock::new();
31
32/// Returns the project root directory (CWD at server start).
33pub fn project_root() -> Option<&'static std::path::Path> {
34    PROJECT_ROOT.get().map(std::path::PathBuf::as_path)
35}
36
37/// ZTS worker thread handles — joined on shutdown to prevent SIGSEGV.
38static ZTS_WORKERS: OnceLock<Mutex<Vec<thread::JoinHandle<()>>>> = OnceLock::new();
39
40/// Register a ZTS worker thread handle for graceful shutdown.
41pub fn register_zts_worker(handle: thread::JoinHandle<()>) {
42    let workers = ZTS_WORKERS.get_or_init(|| Mutex::new(Vec::new()));
43    workers.lock().unwrap().push(handle);
44}
45
46/// Join all ZTS worker threads. Called before main thread exits.
47pub fn join_zts_workers() {
48    if let Some(workers) = ZTS_WORKERS.get() {
49        let handles: Vec<_> = workers.lock().unwrap().drain(..).collect();
50        for handle in handles {
51            let _ = handle.join();
52        }
53        tracing::info!("all ZTS worker threads joined");
54    }
55}
56
57// --- Public Rust API ---
58
59pub fn version() -> String {
60    format!("folk-ext {}", env!("CARGO_PKG_VERSION"))
61}
62
63/// Start the server with plugins. Non-blocking.
64///
65/// Creates one channel pair for the main PHP thread (worker #1).
66/// Additional workers (count > 1) are spawned as ZTS threads by the runtime.
67pub fn start_server(config: FolkConfig, plugins: Vec<Box<dyn Plugin>>) -> anyhow::Result<()> {
68    // Save project root (CWD at server start) for ZTS worker threads.
69    let cwd = std::env::current_dir().context("cannot determine current directory")?;
70    let _ = PROJECT_ROOT.set(cwd);
71
72    let worker_count = config.workers.count;
73    let is_zts = zts::is_zts();
74
75    if worker_count > 1 && !is_zts {
76        tracing::warn!(
77            worker_count,
78            "multi-worker requested but PHP is NTS; only 1 worker will be used"
79        );
80    }
81
82    // Create one channel pair for the main thread worker.
83    let (task_tx, task_rx) = std::sync::mpsc::sync_channel::<bridge::TaskRequest>(8);
84    let (ready_tx, ready_rx) = std::sync::mpsc::sync_channel::<()>(1);
85
86    // Install main thread as worker #1.
87    bridge::init_worker_state(1, task_rx, ready_tx);
88
89    // Tx side goes to the runtime (for dispatching to main thread worker).
90    let tx_sides = vec![WorkerTxSide { task_tx, ready_rx }];
91
92    let registry = InProcessRegistry::new();
93    REGISTRY.set(registry.clone()).ok();
94
95    let workers_config = config.workers.clone();
96    let barrier = Arc::new(Barrier::new(2));
97    let barrier_inner = barrier.clone();
98
99    thread::Builder::new()
100        .name("folk-tokio".into())
101        .spawn(move || {
102            let rt = tokio::runtime::Builder::new_multi_thread()
103                .enable_all()
104                .build()
105                .expect("failed to create tokio runtime");
106
107            TOKIO_HANDLE.set(rt.handle().clone()).ok();
108            barrier_inner.wait();
109
110            rt.block_on(async move {
111                // Runtime gets the pre-connected channel for worker #1.
112                // Additional workers (ZTS) will be spawned on demand.
113                let ext_runtime = Arc::new(ExtensionRuntime::new(workers_config, tx_sides));
114
115                let mut server = folk_core::server::FolkServer::new(config, ext_runtime);
116                server.set_rpc_registrar(registry);
117
118                for plugin in plugins {
119                    server.register_plugin(plugin);
120                }
121
122                if let Err(e) = server.run().await {
123                    tracing::error!(error = ?e, "server error");
124                }
125            });
126        })?;
127
128    barrier.wait();
129    info!(
130        worker_count,
131        is_zts, "folk server started, main process is worker #1"
132    );
133    Ok(())
134}
135
136pub fn call_method(method: &str, payload: bytes::Bytes) -> anyhow::Result<bytes::Bytes> {
137    let registry = REGISTRY
138        .get()
139        .ok_or_else(|| anyhow::anyhow!("server not started"))?;
140    let handle = TOKIO_HANDLE
141        .get()
142        .ok_or_else(|| anyhow::anyhow!("runtime not available"))?;
143
144    handle.block_on(registry.call(method, payload))
145}
146
147// --- PHP wrappers (standalone mode only) ---
148
149#[cfg(feature = "standalone")]
150#[php_class]
151#[php(name = "Folk\\Server")]
152#[derive(Debug)]
153pub struct Server {
154    config_path: String,
155}
156
157#[cfg(feature = "standalone")]
158#[php_impl]
159impl Server {
160    pub fn __construct(config_path: String) -> Self {
161        Self { config_path }
162    }
163
164    pub fn start(&self) -> PhpResult<()> {
165        let config = FolkConfig::load_from(&self.config_path)
166            .map_err(|e| PhpException::default(format!("Config error: {e}")))?;
167
168        start_server(config, vec![])
169            .map_err(|e| PhpException::default(format!("Start error: {e}")))?;
170
171        Ok(())
172    }
173}
174
175#[cfg(feature = "standalone")]
176#[php_function]
177pub fn folk_version() -> String {
178    version()
179}
180
181#[cfg(feature = "standalone")]
182#[php_function]
183#[allow(clippy::needless_pass_by_value)] // ext-php-rs requires owned types
184pub fn folk_call(method: String, payload: Binary<u8>) -> PhpResult<Binary<u8>> {
185    let data: Vec<u8> = payload.into();
186    let result = call_method(&method, bytes::Bytes::from(data))
187        .map_err(|e| PhpException::default(format!("folk_call({method}): {e}")))?;
188
189    Ok(Binary::new(result.to_vec()))
190}
191
192#[cfg(feature = "standalone")]
193#[php_function]
194pub fn folk_worker_ready() -> PhpResult<bool> {
195    bridge::do_ready().map_err(|e| PhpException::default(format!("folk_worker_ready: {e}")))
196}
197
198#[cfg(feature = "standalone")]
199#[php_function]
200pub fn folk_worker_recv() -> PhpResult<Option<Vec<Binary<u8>>>> {
201    match bridge::do_recv() {
202        Ok(Some((method, payload))) => Ok(Some(vec![
203            Binary::new(method.into_bytes()),
204            Binary::new(payload),
205        ])),
206        Ok(None) => Ok(None),
207        Err(e) => Err(PhpException::default(format!("folk_worker_recv: {e}"))),
208    }
209}
210
211#[cfg(feature = "standalone")]
212#[php_function]
213pub fn folk_worker_send(result: Binary<u8>) -> PhpResult<()> {
214    let data: Vec<u8> = result.into();
215    bridge::do_send(&data).map_err(|e| PhpException::default(format!("folk_worker_send: {e}")))
216}
217
218#[cfg(feature = "standalone")]
219#[php_function]
220#[allow(clippy::needless_pass_by_value)] // ext-php-rs requires owned types
221pub fn folk_worker_send_error(message: String) -> PhpResult<()> {
222    bridge::do_send_error(&message)
223        .map_err(|e| PhpException::default(format!("folk_worker_send_error: {e}")))
224}
225
226/// Returns true if the current thread is a ZTS worker thread
227/// (has bridge state initialized by the runtime).
228#[cfg(feature = "standalone")]
229#[php_function]
230pub fn folk_is_worker_thread() -> bool {
231    bridge::has_worker_state()
232}
233
234/// Returns the id of the request currently being handled (`""` if none).
235///
236/// A UUID v7, stable for the duration of a single request — useful for
237/// correlating PHP application logs with Rust-side access logs.
238#[cfg(feature = "standalone")]
239#[php_function]
240pub fn folk_request_id() -> String {
241    bridge::current_request_id()
242        .map(|id| id.to_string())
243        .unwrap_or_default()
244}
245
246/// Run the zero-copy dispatch loop.
247///
248/// Blocks until the channel is closed (server shutdown). Calls the named
249/// PHP function directly for each request — no JSON encode/decode.
250///
251/// The PHP function must have signature:
252/// `function(string $method, array $params): array`
253#[cfg(feature = "standalone")]
254#[php_function]
255#[allow(clippy::needless_pass_by_value)]
256pub fn folk_worker_run(dispatch_fn: String) -> PhpResult<()> {
257    bridge::run_dispatch_loop(&dispatch_fn)
258        .map_err(|e| PhpException::default(format!("folk_worker_run: {e}")))
259}
260
261#[cfg(feature = "standalone")]
262#[php_module]
263pub fn get_module(module: ModuleBuilder) -> ModuleBuilder {
264    module
265        .class::<Server>()
266        .function(wrap_function!(folk_version))
267        .function(wrap_function!(folk_call))
268        .function(wrap_function!(folk_worker_ready))
269        .function(wrap_function!(folk_worker_recv))
270        .function(wrap_function!(folk_worker_send))
271        .function(wrap_function!(folk_worker_send_error))
272        .function(wrap_function!(folk_is_worker_thread))
273        .function(wrap_function!(folk_request_id))
274        .function(wrap_function!(folk_worker_run))
275}