Skip to main content

folk_ext/
bridge.rs

1//! PHP bridge: thread-local worker state for channel communication.
2//!
3//! Uses `std::sync` channels (not tokio) so worker threads don't need
4//! a tokio runtime. This also works correctly across `fork()`.
5
6use std::cell::RefCell;
7use std::sync::mpsc;
8
9use tracing::debug;
10
11/// A request sent from the server to a worker thread.
12pub struct TaskRequest {
13    /// Unique, monotonic per-request id (0 is reserved for "no active request").
14    pub request_id: u64,
15    pub method: String,
16    pub payload: serde_json::Value,
17    /// Reply channel. Uses `tokio::sync::oneshot` which does NOT require
18    /// a tokio runtime to send — it's a pure atomic operation.
19    pub reply: tokio::sync::oneshot::Sender<anyhow::Result<serde_json::Value>>,
20}
21
22/// Thread-local state for the current worker.
23struct WorkerState {
24    worker_id: u32,
25    task_rx: mpsc::Receiver<TaskRequest>,
26    ready_tx: Option<mpsc::SyncSender<()>>,
27    current_reply: Option<tokio::sync::oneshot::Sender<anyhow::Result<serde_json::Value>>>,
28    /// Id of the request currently being handled on this thread (0 if none).
29    /// Exposed to PHP via `folk_request_id()`.
30    current_request_id: u64,
31}
32
33thread_local! {
34    static WORKER: RefCell<Option<WorkerState>> = const { RefCell::new(None) };
35}
36
37/// Initialize thread-local worker state.
38pub fn init_worker_state(
39    worker_id: u32,
40    task_rx: mpsc::Receiver<TaskRequest>,
41    ready_tx: mpsc::SyncSender<()>,
42) {
43    WORKER.with(|w| {
44        *w.borrow_mut() = Some(WorkerState {
45            worker_id,
46            task_rx,
47            ready_tx: Some(ready_tx),
48            current_reply: None,
49            current_request_id: 0,
50        });
51    });
52}
53
54/// Clean up thread-local state.
55pub fn cleanup_worker_state() {
56    WORKER.with(|w| {
57        *w.borrow_mut() = None;
58    });
59}
60
61/// Returns true if this thread has worker bridge state initialized.
62pub fn has_worker_state() -> bool {
63    WORKER.with(|w| w.borrow().is_some())
64}
65
66/// Id of the request currently being handled on this thread.
67///
68/// Returns `0` when no request is in flight (or this isn't a worker thread).
69/// Exposed to PHP via the `folk_request_id()` native function.
70pub fn current_request_id() -> u64 {
71    WORKER.with(|w| {
72        w.borrow()
73            .as_ref()
74            .map_or(0, |state| state.current_request_id)
75    })
76}
77
78/// Signal ready. Returns Ok(true) if sent, Ok(false) if already called.
79pub fn do_ready() -> Result<bool, &'static str> {
80    WORKER.with(|w| {
81        let mut state = w.borrow_mut();
82        let state = state.as_mut().ok_or("not in a worker thread")?;
83
84        if let Some(tx) = state.ready_tx.take() {
85            let _ = tx.send(());
86            debug!(worker_id = state.worker_id, "worker signaled ready");
87            Ok(true)
88        } else {
89            Ok(false)
90        }
91    })
92}
93
94/// Block until a request arrives. Returns `(method, payload_json_bytes)` or `None` on shutdown.
95///
96/// Internally receives `serde_json::Value` from the channel and serializes to JSON bytes
97/// for PHP consumption. PHP calls `json_decode()` on these bytes.
98pub fn do_recv() -> Result<Option<(String, Vec<u8>)>, &'static str> {
99    WORKER.with(|w| {
100        let mut state = w.borrow_mut();
101        let state = state.as_mut().ok_or("not in a worker thread")?;
102
103        if let Ok(req) = state.task_rx.recv() {
104            let method = req.method.clone();
105            // Value → JSON bytes for PHP (only serialization on the hot path)
106            let payload_bytes = serde_json::to_vec(&req.payload).unwrap_or_default();
107            state.current_request_id = req.request_id;
108            state.current_reply = Some(req.reply);
109            Ok(Some((method, payload_bytes)))
110        } else {
111            debug!(worker_id = state.worker_id, "recv: channel closed");
112            Ok(None)
113        }
114    })
115}
116
117/// Send a successful response (raw JSON bytes from PHP).
118///
119/// Internally deserializes JSON bytes from PHP into `serde_json::Value`
120/// for zero-copy return through the channel.
121pub fn do_send(data: &[u8]) -> Result<(), &'static str> {
122    WORKER.with(|w| {
123        let mut state = w.borrow_mut();
124        let state = state.as_mut().ok_or("not in a worker thread")?;
125
126        let reply = state.current_reply.take().ok_or("no pending request")?;
127        // JSON bytes → Value (only deserialization on the hot path)
128        let value: serde_json::Value =
129            serde_json::from_slice(data).unwrap_or(serde_json::Value::Null);
130        let _ = reply.send(Ok(value));
131        Ok(())
132    })
133}
134
135/// Run the dispatch loop directly from Rust, calling PHP via `call_user_function`.
136///
137/// This is the zero-copy path: `serde_json::Value` → zval → PHP handler → zval → Value.
138/// No JSON encode/decode at all.
139///
140/// `dispatch_fn` is the name of a PHP function with signature:
141/// `function(string $method, array $params): array`
142pub fn run_dispatch_loop(dispatch_fn: &str) -> Result<(), &'static str> {
143    WORKER.with(|w| {
144        // Signal ready first.
145        {
146            let mut state = w.borrow_mut();
147            let state = state.as_mut().ok_or("not in a worker thread")?;
148            if let Some(tx) = state.ready_tx.take() {
149                let _ = tx.send(());
150                debug!(
151                    worker_id = state.worker_id,
152                    "worker signaled ready (dispatch loop)"
153                );
154            }
155        }
156
157        // Restore VCWD to project root after php_execute_script set it
158        // to dirname(script). Without this, Composer proxy scripts like
159        // vendor/bin/folk-server leave VCWD in vendor/bin/.
160        if let Some(root) = crate::project_root() {
161            let _ = crate::zts::chdir(&root.to_string_lossy());
162        }
163
164        // Main dispatch loop.
165        loop {
166            let req = {
167                let mut state = w.borrow_mut();
168                let state = state.as_mut().ok_or("not in a worker thread")?;
169                if let Ok(req) = state.task_rx.recv() {
170                    // Expose the id to PHP (folk_request_id()) for the duration
171                    // of this call. call_dispatch runs OUTSIDE this borrow, so a
172                    // reentrant folk_request_id() from PHP can borrow WORKER safely.
173                    state.current_request_id = req.request_id;
174                    req
175                } else {
176                    debug!(worker_id = state.worker_id, "dispatch loop: channel closed");
177                    // Only the main thread (worker #1) should join ZTS workers.
178                    // ZTS workers must NOT join — they'd deadlock trying to join themselves.
179                    if state.worker_id == 1 {
180                        crate::join_zts_workers();
181                    }
182                    return Ok(());
183                }
184            };
185
186            // Call PHP handler directly: Value → zval → PHP → zval → Value.
187            let result = crate::zts::call_dispatch(dispatch_fn, &req.method, &req.payload);
188
189            match result {
190                Ok(value) => {
191                    let _ = req.reply.send(Ok(value));
192                },
193                Err(e) => {
194                    let _ = req.reply.send(Err(e));
195                },
196            }
197
198            // Clear the active request id between requests.
199            if let Ok(mut state) = w.try_borrow_mut() {
200                if let Some(state) = state.as_mut() {
201                    state.current_request_id = 0;
202                }
203            }
204        }
205    })
206}
207
208/// Send an error response.
209pub fn do_send_error(message: &str) -> Result<(), &'static str> {
210    WORKER.with(|w| {
211        let mut state = w.borrow_mut();
212        let state = state.as_mut().ok_or("not in a worker thread")?;
213
214        let reply = state.current_reply.take().ok_or("no pending request")?;
215        let _ = reply.send(Err(anyhow::anyhow!("{message}")));
216        Ok(())
217    })
218}