Skip to main content

folk_ext/
lib.rs

1//! Folk PHP extension core — server lifecycle + worker bridge.
2//!
3//! Uses `std::sync` channels for worker communication (no tokio dependency
4//! on the worker thread side). Supports multi-worker via ZTS threads.
5
6pub mod bridge;
7pub mod registry;
8pub mod runtime;
9pub mod worker;
10pub mod zts;
11pub mod zval_convert;
12
13use std::sync::{Arc, Barrier, Mutex, OnceLock};
14use std::thread;
15
16use ext_php_rs::binary::Binary;
17use ext_php_rs::prelude::*;
18use folk_api::Plugin;
19use folk_core::config::FolkConfig;
20use tracing::info;
21
22use crate::registry::InProcessRegistry;
23use crate::runtime::{ExtensionRuntime, WorkerTxSide};
24
25pub use folk_core;
26
27static REGISTRY: OnceLock<Arc<InProcessRegistry>> = OnceLock::new();
28static TOKIO_HANDLE: OnceLock<tokio::runtime::Handle> = OnceLock::new();
29static PROJECT_ROOT: OnceLock<std::path::PathBuf> = OnceLock::new();
30
31/// Returns the project root directory (CWD at server start).
32pub fn project_root() -> Option<&'static std::path::Path> {
33    PROJECT_ROOT.get().map(std::path::PathBuf::as_path)
34}
35
36/// ZTS worker thread handles — joined on shutdown to prevent SIGSEGV.
37static ZTS_WORKERS: OnceLock<Mutex<Vec<thread::JoinHandle<()>>>> = OnceLock::new();
38
39/// Register a ZTS worker thread handle for graceful shutdown.
40pub fn register_zts_worker(handle: thread::JoinHandle<()>) {
41    let workers = ZTS_WORKERS.get_or_init(|| Mutex::new(Vec::new()));
42    workers.lock().unwrap().push(handle);
43}
44
45/// Join all ZTS worker threads. Called before main thread exits.
46pub fn join_zts_workers() {
47    if let Some(workers) = ZTS_WORKERS.get() {
48        let handles: Vec<_> = workers.lock().unwrap().drain(..).collect();
49        for handle in handles {
50            let _ = handle.join();
51        }
52        tracing::info!("all ZTS worker threads joined");
53    }
54}
55
56// --- Public Rust API ---
57
58pub fn version() -> String {
59    format!("folk-ext {}", env!("CARGO_PKG_VERSION"))
60}
61
62/// Start the server with plugins. Non-blocking.
63///
64/// Creates one channel pair for the main PHP thread (worker #1).
65/// Additional workers (count > 1) are spawned as ZTS threads by the runtime.
66pub fn start_server(config: FolkConfig, plugins: Vec<Box<dyn Plugin>>) -> anyhow::Result<()> {
67    // Save project root (CWD at server start) for ZTS worker threads.
68    let _ = PROJECT_ROOT.set(std::env::current_dir().unwrap_or_default());
69
70    let worker_count = config.workers.count;
71    let is_zts = zts::is_zts();
72
73    if worker_count > 1 && !is_zts {
74        tracing::warn!(
75            worker_count,
76            "multi-worker requested but PHP is NTS; only 1 worker will be used"
77        );
78    }
79
80    // Create one channel pair for the main thread worker.
81    let (task_tx, task_rx) = std::sync::mpsc::sync_channel::<bridge::TaskRequest>(8);
82    let (ready_tx, ready_rx) = std::sync::mpsc::sync_channel::<()>(1);
83
84    // Install main thread as worker #1.
85    bridge::init_worker_state(1, task_rx, ready_tx);
86
87    // Tx side goes to the runtime (for dispatching to main thread worker).
88    let tx_sides = vec![WorkerTxSide { task_tx, ready_rx }];
89
90    let registry = InProcessRegistry::new();
91    REGISTRY.set(registry.clone()).ok();
92
93    let workers_config = config.workers.clone();
94    let barrier = Arc::new(Barrier::new(2));
95    let barrier_inner = barrier.clone();
96
97    thread::Builder::new()
98        .name("folk-tokio".into())
99        .spawn(move || {
100            let rt = tokio::runtime::Builder::new_multi_thread()
101                .enable_all()
102                .build()
103                .expect("failed to create tokio runtime");
104
105            TOKIO_HANDLE.set(rt.handle().clone()).ok();
106            barrier_inner.wait();
107
108            rt.block_on(async move {
109                // Runtime gets the pre-connected channel for worker #1.
110                // Additional workers (ZTS) will be spawned on demand.
111                let ext_runtime = Arc::new(ExtensionRuntime::new(workers_config, tx_sides));
112
113                let mut server = folk_core::server::FolkServer::new(config, ext_runtime);
114                server.set_rpc_registrar(registry);
115
116                for plugin in plugins {
117                    server.register_plugin(plugin);
118                }
119
120                if let Err(e) = server.run().await {
121                    tracing::error!(error = ?e, "server error");
122                }
123            });
124        })?;
125
126    barrier.wait();
127    info!(
128        worker_count,
129        is_zts, "folk server started, main process is worker #1"
130    );
131    Ok(())
132}
133
134pub fn call_method(method: &str, payload: bytes::Bytes) -> anyhow::Result<bytes::Bytes> {
135    let registry = REGISTRY
136        .get()
137        .ok_or_else(|| anyhow::anyhow!("server not started"))?;
138    let handle = TOKIO_HANDLE
139        .get()
140        .ok_or_else(|| anyhow::anyhow!("runtime not available"))?;
141
142    handle.block_on(registry.call(method, payload))
143}
144
145// --- PHP wrappers (standalone mode only) ---
146
147#[cfg(feature = "standalone")]
148#[php_class]
149#[php(name = "Folk\\Server")]
150#[derive(Debug)]
151pub struct Server {
152    config_path: String,
153}
154
155#[cfg(feature = "standalone")]
156#[php_impl]
157impl Server {
158    pub fn __construct(config_path: String) -> Self {
159        Self { config_path }
160    }
161
162    pub fn start(&self) -> PhpResult<()> {
163        let config = FolkConfig::load_from(&self.config_path)
164            .map_err(|e| PhpException::default(format!("Config error: {e}")))?;
165
166        start_server(config, vec![])
167            .map_err(|e| PhpException::default(format!("Start error: {e}")))?;
168
169        Ok(())
170    }
171}
172
173#[cfg(feature = "standalone")]
174#[php_function]
175pub fn folk_version() -> String {
176    version()
177}
178
179#[cfg(feature = "standalone")]
180#[php_function]
181#[allow(clippy::needless_pass_by_value)] // ext-php-rs requires owned types
182pub fn folk_call(method: String, payload: Binary<u8>) -> PhpResult<Binary<u8>> {
183    let data: Vec<u8> = payload.into();
184    let result = call_method(&method, bytes::Bytes::from(data))
185        .map_err(|e| PhpException::default(format!("folk_call({method}): {e}")))?;
186
187    Ok(Binary::new(result.to_vec()))
188}
189
190#[cfg(feature = "standalone")]
191#[php_function]
192pub fn folk_worker_ready() -> PhpResult<bool> {
193    bridge::do_ready().map_err(|e| PhpException::default(format!("folk_worker_ready: {e}")))
194}
195
196#[cfg(feature = "standalone")]
197#[php_function]
198pub fn folk_worker_recv() -> PhpResult<Option<Vec<Binary<u8>>>> {
199    match bridge::do_recv() {
200        Ok(Some((method, payload))) => Ok(Some(vec![
201            Binary::new(method.into_bytes()),
202            Binary::new(payload),
203        ])),
204        Ok(None) => Ok(None),
205        Err(e) => Err(PhpException::default(format!("folk_worker_recv: {e}"))),
206    }
207}
208
209#[cfg(feature = "standalone")]
210#[php_function]
211pub fn folk_worker_send(result: Binary<u8>) -> PhpResult<()> {
212    let data: Vec<u8> = result.into();
213    bridge::do_send(&data).map_err(|e| PhpException::default(format!("folk_worker_send: {e}")))
214}
215
216#[cfg(feature = "standalone")]
217#[php_function]
218#[allow(clippy::needless_pass_by_value)] // ext-php-rs requires owned types
219pub fn folk_worker_send_error(message: String) -> PhpResult<()> {
220    bridge::do_send_error(&message)
221        .map_err(|e| PhpException::default(format!("folk_worker_send_error: {e}")))
222}
223
224/// Returns true if the current thread is a ZTS worker thread
225/// (has bridge state initialized by the runtime).
226#[cfg(feature = "standalone")]
227#[php_function]
228pub fn folk_is_worker_thread() -> bool {
229    bridge::has_worker_state()
230}
231
232/// Returns the id of the request currently being handled (0 if none).
233///
234/// Stable for the duration of a single request — useful for correlating PHP
235/// application logs with Rust-side access logs.
236#[cfg(feature = "standalone")]
237#[php_function]
238#[allow(clippy::cast_possible_wrap)] // request ids never approach i64::MAX in practice
239pub fn folk_request_id() -> i64 {
240    bridge::current_request_id() as i64
241}
242
243/// Run the zero-copy dispatch loop.
244///
245/// Blocks until the channel is closed (server shutdown). Calls the named
246/// PHP function directly for each request — no JSON encode/decode.
247///
248/// The PHP function must have signature:
249/// `function(string $method, array $params): array`
250#[cfg(feature = "standalone")]
251#[php_function]
252#[allow(clippy::needless_pass_by_value)]
253pub fn folk_worker_run(dispatch_fn: String) -> PhpResult<()> {
254    bridge::run_dispatch_loop(&dispatch_fn)
255        .map_err(|e| PhpException::default(format!("folk_worker_run: {e}")))
256}
257
258#[cfg(feature = "standalone")]
259#[php_module]
260pub fn get_module(module: ModuleBuilder) -> ModuleBuilder {
261    module
262        .class::<Server>()
263        .function(wrap_function!(folk_version))
264        .function(wrap_function!(folk_call))
265        .function(wrap_function!(folk_worker_ready))
266        .function(wrap_function!(folk_worker_recv))
267        .function(wrap_function!(folk_worker_send))
268        .function(wrap_function!(folk_worker_send_error))
269        .function(wrap_function!(folk_is_worker_thread))
270        .function(wrap_function!(folk_request_id))
271        .function(wrap_function!(folk_worker_run))
272}