Skip to main content

folk_ext/
bridge.rs

1//! PHP bridge: thread-local worker state for channel communication.
2//!
3//! Uses `std::sync` channels (not tokio) so worker threads don't need
4//! a tokio runtime. This also works correctly across `fork()`.
5
6use std::cell::RefCell;
7use std::sync::Arc;
8use std::sync::mpsc;
9
10use tracing::debug;
11
12/// A request sent from the server to a worker thread.
13pub struct TaskRequest {
14    /// Globally-unique per-request id (UUID v7). Empty `""` means "no request".
15    pub request_id: Arc<str>,
16    pub method: String,
17    pub payload: serde_json::Value,
18    /// Reply channel. Uses `tokio::sync::oneshot` which does NOT require
19    /// a tokio runtime to send — it's a pure atomic operation.
20    pub reply: tokio::sync::oneshot::Sender<anyhow::Result<serde_json::Value>>,
21}
22
23/// Thread-local state for the current worker.
24struct WorkerState {
25    worker_id: u32,
26    task_rx: mpsc::Receiver<TaskRequest>,
27    ready_tx: Option<mpsc::SyncSender<()>>,
28    current_reply: Option<tokio::sync::oneshot::Sender<anyhow::Result<serde_json::Value>>>,
29    /// Id of the request currently being handled on this thread (`None` if none).
30    /// Exposed to PHP via `folk_request_id()`.
31    current_request_id: Option<Arc<str>>,
32}
33
34thread_local! {
35    static WORKER: RefCell<Option<WorkerState>> = const { RefCell::new(None) };
36}
37
38/// Initialize thread-local worker state.
39pub fn init_worker_state(
40    worker_id: u32,
41    task_rx: mpsc::Receiver<TaskRequest>,
42    ready_tx: mpsc::SyncSender<()>,
43) {
44    WORKER.with(|w| {
45        *w.borrow_mut() = Some(WorkerState {
46            worker_id,
47            task_rx,
48            ready_tx: Some(ready_tx),
49            current_reply: None,
50            current_request_id: None,
51        });
52    });
53}
54
55/// Clean up thread-local state.
56pub fn cleanup_worker_state() {
57    WORKER.with(|w| {
58        *w.borrow_mut() = None;
59    });
60}
61
62/// Returns true if this thread has worker bridge state initialized.
63pub fn has_worker_state() -> bool {
64    WORKER.with(|w| w.borrow().is_some())
65}
66
67/// Id of the request currently being handled on this thread.
68///
69/// Returns `None` when no request is in flight (or this isn't a worker thread).
70/// Exposed to PHP via the `folk_request_id()` native function.
71pub fn current_request_id() -> Option<Arc<str>> {
72    WORKER.with(|w| {
73        w.borrow()
74            .as_ref()
75            .and_then(|state| state.current_request_id.clone())
76    })
77}
78
79/// Signal ready. Returns Ok(true) if sent, Ok(false) if already called.
80pub fn do_ready() -> Result<bool, &'static str> {
81    WORKER.with(|w| {
82        let mut state = w.borrow_mut();
83        let state = state.as_mut().ok_or("not in a worker thread")?;
84
85        if let Some(tx) = state.ready_tx.take() {
86            let _ = tx.send(());
87            debug!(worker_id = state.worker_id, "worker signaled ready");
88            Ok(true)
89        } else {
90            Ok(false)
91        }
92    })
93}
94
95/// Block until a request arrives. Returns `(method, payload_json_bytes)` or `None` on shutdown.
96///
97/// Internally receives `serde_json::Value` from the channel and serializes to JSON bytes
98/// for PHP consumption. PHP calls `json_decode()` on these bytes.
99pub fn do_recv() -> Result<Option<(String, Vec<u8>)>, &'static str> {
100    WORKER.with(|w| {
101        let mut state = w.borrow_mut();
102        let state = state.as_mut().ok_or("not in a worker thread")?;
103
104        if let Ok(req) = state.task_rx.recv() {
105            let method = req.method.clone();
106            // Value → JSON bytes for PHP (only serialization on the hot path)
107            let payload_bytes = serde_json::to_vec(&req.payload).unwrap_or_default();
108            state.current_request_id = Some(req.request_id);
109            state.current_reply = Some(req.reply);
110            Ok(Some((method, payload_bytes)))
111        } else {
112            debug!(worker_id = state.worker_id, "recv: channel closed");
113            Ok(None)
114        }
115    })
116}
117
118/// Send a successful response (raw JSON bytes from PHP).
119///
120/// Internally deserializes JSON bytes from PHP into `serde_json::Value`
121/// for zero-copy return through the channel.
122pub fn do_send(data: &[u8]) -> Result<(), &'static str> {
123    WORKER.with(|w| {
124        let mut state = w.borrow_mut();
125        let state = state.as_mut().ok_or("not in a worker thread")?;
126
127        let reply = state.current_reply.take().ok_or("no pending request")?;
128        state.current_request_id = None;
129        // JSON bytes → Value (only deserialization on the hot path)
130        let value: serde_json::Value =
131            serde_json::from_slice(data).unwrap_or(serde_json::Value::Null);
132        let _ = reply.send(Ok(value));
133        Ok(())
134    })
135}
136
137/// Run the dispatch loop directly from Rust, calling PHP via `call_user_function`.
138///
139/// This is the zero-copy path: `serde_json::Value` → zval → PHP handler → zval → Value.
140/// No JSON encode/decode at all.
141///
142/// `dispatch_fn` is the name of a PHP function with signature:
143/// `function(string $method, array $params): array`
144pub fn run_dispatch_loop(dispatch_fn: &str) -> Result<(), &'static str> {
145    WORKER.with(|w| {
146        // Signal ready first.
147        {
148            let mut state = w.borrow_mut();
149            let state = state.as_mut().ok_or("not in a worker thread")?;
150            if let Some(tx) = state.ready_tx.take() {
151                let _ = tx.send(());
152                debug!(
153                    worker_id = state.worker_id,
154                    "worker signaled ready (dispatch loop)"
155                );
156            }
157        }
158
159        // Restore VCWD to project root after php_execute_script set it
160        // to dirname(script). Without this, Composer proxy scripts like
161        // vendor/bin/folk-server leave VCWD in vendor/bin/.
162        if let Some(root) = crate::project_root() {
163            let _ = crate::zts::chdir(&root.to_string_lossy());
164        }
165
166        // Main dispatch loop.
167        loop {
168            let req = {
169                let mut state = w.borrow_mut();
170                let state = state.as_mut().ok_or("not in a worker thread")?;
171                if let Ok(req) = state.task_rx.recv() {
172                    // Expose the id to PHP (folk_request_id()) for the duration
173                    // of this call. call_dispatch runs OUTSIDE this borrow, so a
174                    // reentrant folk_request_id() from PHP can borrow WORKER safely.
175                    state.current_request_id = Some(req.request_id.clone());
176                    req
177                } else {
178                    debug!(worker_id = state.worker_id, "dispatch loop: channel closed");
179                    // Only the main thread (worker #1) should join ZTS workers.
180                    // ZTS workers must NOT join — they'd deadlock trying to join themselves.
181                    if state.worker_id == 1 {
182                        crate::join_zts_workers();
183                    }
184                    return Ok(());
185                }
186            };
187
188            // Call PHP handler directly: Value → zval → PHP → zval → Value.
189            let result = crate::zts::call_dispatch(dispatch_fn, &req.method, &req.payload);
190
191            match result {
192                Ok(value) => {
193                    let _ = req.reply.send(Ok(value));
194                },
195                Err(e) => {
196                    let _ = req.reply.send(Err(e));
197                },
198            }
199
200            // Clear the active request id between requests.
201            if let Ok(mut state) = w.try_borrow_mut() {
202                if let Some(state) = state.as_mut() {
203                    state.current_request_id = None;
204                }
205            }
206        }
207    })
208}
209
210/// Send an error response.
211pub fn do_send_error(message: &str) -> Result<(), &'static str> {
212    WORKER.with(|w| {
213        let mut state = w.borrow_mut();
214        let state = state.as_mut().ok_or("not in a worker thread")?;
215
216        let reply = state.current_reply.take().ok_or("no pending request")?;
217        state.current_request_id = None;
218        let _ = reply.send(Err(anyhow::anyhow!("{message}")));
219        Ok(())
220    })
221}