Skip to main content

folk_ext/
bridge.rs

1//! PHP bridge: thread-local worker state for channel communication.
2//!
3//! Uses `std::sync` channels (not tokio) so worker threads don't need
4//! a tokio runtime. This also works correctly across `fork()`.
5
6use std::cell::RefCell;
7use std::sync::Arc;
8use std::sync::mpsc;
9
10use tracing::debug;
11
12/// A request sent from the server to a worker thread.
13pub struct TaskRequest {
14    /// Globally-unique per-request id (UUID v7). Empty `""` means "no request".
15    pub request_id: Arc<str>,
16    pub method: String,
17    pub payload: serde_json::Value,
18    /// Reply channel. Uses `tokio::sync::oneshot` which does NOT require
19    /// a tokio runtime to send — it's a pure atomic operation.
20    pub reply: tokio::sync::oneshot::Sender<anyhow::Result<serde_json::Value>>,
21}
22
23/// Thread-local state for the current worker.
24struct WorkerState {
25    worker_id: u32,
26    task_rx: mpsc::Receiver<TaskRequest>,
27    ready_tx: Option<mpsc::SyncSender<()>>,
28    current_reply: Option<tokio::sync::oneshot::Sender<anyhow::Result<serde_json::Value>>>,
29    /// Id of the request currently being handled on this thread (`None` if none).
30    /// Exposed to PHP via `folk_request_id()`.
31    current_request_id: Option<Arc<str>>,
32}
33
34thread_local! {
35    static WORKER: RefCell<Option<WorkerState>> = const { RefCell::new(None) };
36}
37
38/// Initialize thread-local worker state.
39pub fn init_worker_state(
40    worker_id: u32,
41    task_rx: mpsc::Receiver<TaskRequest>,
42    ready_tx: mpsc::SyncSender<()>,
43) {
44    WORKER.with(|w| {
45        *w.borrow_mut() = Some(WorkerState {
46            worker_id,
47            task_rx,
48            ready_tx: Some(ready_tx),
49            current_reply: None,
50            current_request_id: None,
51        });
52    });
53}
54
55/// Clean up thread-local state.
56pub fn cleanup_worker_state() {
57    WORKER.with(|w| {
58        *w.borrow_mut() = None;
59    });
60}
61
62/// Returns true if this thread has worker bridge state initialized.
63pub fn has_worker_state() -> bool {
64    WORKER.with(|w| w.borrow().is_some())
65}
66
67/// Id of the request currently being handled on this thread.
68///
69/// Returns `None` when no request is in flight (or this isn't a worker thread).
70/// Exposed to PHP via the `folk_request_id()` native function.
71pub fn current_request_id() -> Option<Arc<str>> {
72    WORKER.with(|w| {
73        w.borrow()
74            .as_ref()
75            .and_then(|state| state.current_request_id.clone())
76    })
77}
78
79/// Signal ready. Returns Ok(true) if sent, Ok(false) if already called.
80pub fn do_ready() -> Result<bool, &'static str> {
81    WORKER.with(|w| {
82        let mut state = w.borrow_mut();
83        let state = state.as_mut().ok_or("not in a worker thread")?;
84
85        if let Some(tx) = state.ready_tx.take() {
86            let _ = tx.send(());
87            debug!(worker_id = state.worker_id, "worker signaled ready");
88            Ok(true)
89        } else {
90            Ok(false)
91        }
92    })
93}
94
95/// Block until a request arrives. Returns `(method, payload_json_bytes)` or `None` on shutdown.
96///
97/// Internally receives `serde_json::Value` from the channel and serializes to JSON bytes
98/// for PHP consumption. PHP calls `json_decode()` on these bytes.
99pub fn do_recv() -> Result<Option<(String, Vec<u8>)>, &'static str> {
100    WORKER.with(|w| {
101        let mut state = w.borrow_mut();
102        let state = state.as_mut().ok_or("not in a worker thread")?;
103
104        if let Ok(req) = state.task_rx.recv() {
105            let method = req.method.clone();
106            // Value → JSON bytes for PHP (only serialization on the hot path)
107            let payload_bytes = serde_json::to_vec(&req.payload).unwrap_or_default();
108            state.current_request_id = Some(req.request_id);
109            state.current_reply = Some(req.reply);
110            Ok(Some((method, payload_bytes)))
111        } else {
112            debug!(worker_id = state.worker_id, "recv: channel closed");
113            Ok(None)
114        }
115    })
116}
117
118/// Send a successful response (raw JSON bytes from PHP).
119///
120/// Internally deserializes JSON bytes from PHP into `serde_json::Value`
121/// for zero-copy return through the channel.
122pub fn do_send(data: &[u8]) -> Result<(), &'static str> {
123    WORKER.with(|w| {
124        let mut state = w.borrow_mut();
125        let state = state.as_mut().ok_or("not in a worker thread")?;
126
127        let reply = state.current_reply.take().ok_or("no pending request")?;
128        state.current_request_id = None;
129        // JSON bytes → Value (only deserialization on the hot path)
130        let result = serde_json::from_slice(data).map_err(|e| {
131            tracing::error!("PHP returned malformed JSON: {e}");
132            anyhow::anyhow!("PHP returned malformed JSON: {e}")
133        });
134        let _ = reply.send(result);
135        Ok(())
136    })
137}
138
139/// Run the dispatch loop directly from Rust, calling PHP via `call_user_function`.
140///
141/// This is the zero-copy path: `serde_json::Value` → zval → PHP handler → zval → Value.
142/// No JSON encode/decode at all.
143///
144/// `dispatch_fn` is the name of a PHP function with signature:
145/// `function(string $method, array $params): array`
146pub fn run_dispatch_loop(dispatch_fn: &str) -> Result<(), &'static str> {
147    WORKER.with(|w| {
148        // Signal ready first.
149        {
150            let mut state = w.borrow_mut();
151            let state = state.as_mut().ok_or("not in a worker thread")?;
152            if let Some(tx) = state.ready_tx.take() {
153                let _ = tx.send(());
154                debug!(
155                    worker_id = state.worker_id,
156                    "worker signaled ready (dispatch loop)"
157                );
158            }
159        }
160
161        // Restore VCWD to project root after php_execute_script set it
162        // to dirname(script). Without this, Composer proxy scripts like
163        // vendor/bin/folk-server leave VCWD in vendor/bin/.
164        if let Some(root) = crate::project_root() {
165            let _ = crate::zts::chdir(&root.to_string_lossy());
166        }
167
168        // Main dispatch loop.
169        loop {
170            let req = {
171                let mut state = w.borrow_mut();
172                let state = state.as_mut().ok_or("not in a worker thread")?;
173                if let Ok(req) = state.task_rx.recv() {
174                    // Expose the id to PHP (folk_request_id()) for the duration
175                    // of this call. call_dispatch runs OUTSIDE this borrow, so a
176                    // reentrant folk_request_id() from PHP can borrow WORKER safely.
177                    state.current_request_id = Some(req.request_id.clone());
178                    req
179                } else {
180                    debug!(worker_id = state.worker_id, "dispatch loop: channel closed");
181                    // Only the main thread (worker #1) should join ZTS workers.
182                    // ZTS workers must NOT join — they'd deadlock trying to join themselves.
183                    if state.worker_id == 1 {
184                        crate::join_zts_workers();
185                    }
186                    return Ok(());
187                }
188            };
189
190            // Call PHP handler directly: Value → zval → PHP → zval → Value.
191            let result = crate::zts::call_dispatch(dispatch_fn, &req.method, &req.payload);
192
193            match result {
194                Ok(value) => {
195                    let _ = req.reply.send(Ok(value));
196                },
197                Err(e) => {
198                    let _ = req.reply.send(Err(e));
199                },
200            }
201
202            // Clear the active request id between requests.
203            if let Ok(mut state) = w.try_borrow_mut() {
204                if let Some(state) = state.as_mut() {
205                    state.current_request_id = None;
206                }
207            }
208        }
209    })
210}
211
212/// Send an error response.
213pub fn do_send_error(message: &str) -> Result<(), &'static str> {
214    WORKER.with(|w| {
215        let mut state = w.borrow_mut();
216        let state = state.as_mut().ok_or("not in a worker thread")?;
217
218        let reply = state.current_reply.take().ok_or("no pending request")?;
219        state.current_request_id = None;
220        let _ = reply.send(Err(anyhow::anyhow!("{message}")));
221        Ok(())
222    })
223}