Skip to main content

folk_ext/
bridge.rs

1//! PHP bridge: thread-local worker state for channel communication.
2//!
3//! Uses `std::sync` channels (not tokio) so worker threads don't need
4//! a tokio runtime. This also works correctly across `fork()`.
5
6use std::cell::RefCell;
7use std::sync::Arc;
8use std::sync::mpsc;
9
10use tracing::debug;
11
12/// A request sent from the server to a worker thread.
13pub struct TaskRequest {
14    /// Globally-unique per-request id (UUID v7). Empty `""` means "no request".
15    pub request_id: Arc<str>,
16    pub method: String,
17    pub payload: serde_json::Value,
18    /// Reply channel. Uses `tokio::sync::oneshot` which does NOT require
19    /// a tokio runtime to send — it's a pure atomic operation.
20    pub reply: tokio::sync::oneshot::Sender<anyhow::Result<serde_json::Value>>,
21}
22
23/// Thread-local state for the current worker.
24struct WorkerState {
25    worker_id: u32,
26    task_rx: mpsc::Receiver<TaskRequest>,
27    ready_tx: Option<mpsc::SyncSender<()>>,
28    current_reply: Option<tokio::sync::oneshot::Sender<anyhow::Result<serde_json::Value>>>,
29    /// Id of the request currently being handled on this thread (`None` if none).
30    /// Exposed to PHP via `folk_request_id()`.
31    current_request_id: Option<Arc<str>>,
32}
33
34thread_local! {
35    static WORKER: RefCell<Option<WorkerState>> = const { RefCell::new(None) };
36}
37
38/// Initialize thread-local worker state.
39pub fn init_worker_state(
40    worker_id: u32,
41    task_rx: mpsc::Receiver<TaskRequest>,
42    ready_tx: mpsc::SyncSender<()>,
43) {
44    WORKER.with(|w| {
45        *w.borrow_mut() = Some(WorkerState {
46            worker_id,
47            task_rx,
48            ready_tx: Some(ready_tx),
49            current_reply: None,
50            current_request_id: None,
51        });
52    });
53}
54
55/// Clean up thread-local state.
56pub fn cleanup_worker_state() {
57    WORKER.with(|w| {
58        *w.borrow_mut() = None;
59    });
60}
61
62/// Returns true if this thread has worker bridge state initialized.
63pub fn has_worker_state() -> bool {
64    WORKER.with(|w| w.borrow().is_some())
65}
66
67/// Id of the request currently being handled on this thread.
68///
69/// Returns `None` when no request is in flight (or this isn't a worker thread).
70/// Exposed to PHP via the `folk_request_id()` native function.
71pub fn current_request_id() -> Option<Arc<str>> {
72    WORKER.with(|w| {
73        w.borrow()
74            .as_ref()
75            .and_then(|state| state.current_request_id.clone())
76    })
77}
78
79/// Signal ready. Returns Ok(true) if sent, Ok(false) if already called.
80pub fn do_ready() -> Result<bool, &'static str> {
81    WORKER.with(|w| {
82        let mut state = w.borrow_mut();
83        let state = state.as_mut().ok_or("not in a worker thread")?;
84
85        if let Some(tx) = state.ready_tx.take() {
86            let _ = tx.send(());
87            debug!(worker_id = state.worker_id, "worker signaled ready");
88            Ok(true)
89        } else {
90            Ok(false)
91        }
92    })
93}
94
95/// Block until a request arrives. Returns `(method, payload_json_bytes)` or `None` on shutdown.
96///
97/// Internally receives `serde_json::Value` from the channel and serializes to JSON bytes
98/// for PHP consumption. PHP calls `json_decode()` on these bytes.
99pub fn do_recv() -> Result<Option<(String, Vec<u8>)>, &'static str> {
100    WORKER.with(|w| {
101        let mut state = w.borrow_mut();
102        let state = state.as_mut().ok_or("not in a worker thread")?;
103
104        if let Ok(req) = state.task_rx.recv() {
105            let method = req.method.clone();
106            // Value → JSON bytes for PHP (only serialization on the hot path)
107            let payload_bytes = serde_json::to_vec(&req.payload).unwrap_or_default();
108            state.current_request_id = Some(req.request_id);
109            state.current_reply = Some(req.reply);
110            Ok(Some((method, payload_bytes)))
111        } else {
112            debug!(worker_id = state.worker_id, "recv: channel closed");
113            Ok(None)
114        }
115    })
116}
117
118/// Send a successful response (raw JSON bytes from PHP).
119///
120/// Internally deserializes JSON bytes from PHP into `serde_json::Value`
121/// for zero-copy return through the channel.
122pub fn do_send(data: &[u8]) -> Result<(), &'static str> {
123    WORKER.with(|w| {
124        let mut state = w.borrow_mut();
125        let state = state.as_mut().ok_or("not in a worker thread")?;
126
127        let reply = state.current_reply.take().ok_or("no pending request")?;
128        // JSON bytes → Value (only deserialization on the hot path)
129        let value: serde_json::Value =
130            serde_json::from_slice(data).unwrap_or(serde_json::Value::Null);
131        let _ = reply.send(Ok(value));
132        Ok(())
133    })
134}
135
136/// Run the dispatch loop directly from Rust, calling PHP via `call_user_function`.
137///
138/// This is the zero-copy path: `serde_json::Value` → zval → PHP handler → zval → Value.
139/// No JSON encode/decode at all.
140///
141/// `dispatch_fn` is the name of a PHP function with signature:
142/// `function(string $method, array $params): array`
143pub fn run_dispatch_loop(dispatch_fn: &str) -> Result<(), &'static str> {
144    WORKER.with(|w| {
145        // Signal ready first.
146        {
147            let mut state = w.borrow_mut();
148            let state = state.as_mut().ok_or("not in a worker thread")?;
149            if let Some(tx) = state.ready_tx.take() {
150                let _ = tx.send(());
151                debug!(
152                    worker_id = state.worker_id,
153                    "worker signaled ready (dispatch loop)"
154                );
155            }
156        }
157
158        // Restore VCWD to project root after php_execute_script set it
159        // to dirname(script). Without this, Composer proxy scripts like
160        // vendor/bin/folk-server leave VCWD in vendor/bin/.
161        if let Some(root) = crate::project_root() {
162            let _ = crate::zts::chdir(&root.to_string_lossy());
163        }
164
165        // Main dispatch loop.
166        loop {
167            let req = {
168                let mut state = w.borrow_mut();
169                let state = state.as_mut().ok_or("not in a worker thread")?;
170                if let Ok(req) = state.task_rx.recv() {
171                    // Expose the id to PHP (folk_request_id()) for the duration
172                    // of this call. call_dispatch runs OUTSIDE this borrow, so a
173                    // reentrant folk_request_id() from PHP can borrow WORKER safely.
174                    state.current_request_id = Some(req.request_id.clone());
175                    req
176                } else {
177                    debug!(worker_id = state.worker_id, "dispatch loop: channel closed");
178                    // Only the main thread (worker #1) should join ZTS workers.
179                    // ZTS workers must NOT join — they'd deadlock trying to join themselves.
180                    if state.worker_id == 1 {
181                        crate::join_zts_workers();
182                    }
183                    return Ok(());
184                }
185            };
186
187            // Call PHP handler directly: Value → zval → PHP → zval → Value.
188            let result = crate::zts::call_dispatch(dispatch_fn, &req.method, &req.payload);
189
190            match result {
191                Ok(value) => {
192                    let _ = req.reply.send(Ok(value));
193                },
194                Err(e) => {
195                    let _ = req.reply.send(Err(e));
196                },
197            }
198
199            // Clear the active request id between requests.
200            if let Ok(mut state) = w.try_borrow_mut() {
201                if let Some(state) = state.as_mut() {
202                    state.current_request_id = None;
203                }
204            }
205        }
206    })
207}
208
209/// Send an error response.
210pub fn do_send_error(message: &str) -> Result<(), &'static str> {
211    WORKER.with(|w| {
212        let mut state = w.borrow_mut();
213        let state = state.as_mut().ok_or("not in a worker thread")?;
214
215        let reply = state.current_reply.take().ok_or("no pending request")?;
216        let _ = reply.send(Err(anyhow::anyhow!("{message}")));
217        Ok(())
218    })
219}