Skip to main content

folk_ext/
bridge.rs

1//! PHP bridge: thread-local worker state for channel communication.
2//!
3//! Uses `std::sync` channels (not tokio) so worker threads don't need
4//! a tokio runtime. This also works correctly across `fork()`.
5
6use std::cell::RefCell;
7use std::sync::mpsc;
8
9use tracing::debug;
10
11/// A request sent from the server to a worker thread.
12pub struct TaskRequest {
13    pub method: String,
14    pub payload: serde_json::Value,
15    /// Reply channel. Uses `tokio::sync::oneshot` which does NOT require
16    /// a tokio runtime to send — it's a pure atomic operation.
17    pub reply: tokio::sync::oneshot::Sender<anyhow::Result<serde_json::Value>>,
18}
19
20/// Thread-local state for the current worker.
21struct WorkerState {
22    worker_id: u32,
23    task_rx: mpsc::Receiver<TaskRequest>,
24    ready_tx: Option<mpsc::SyncSender<()>>,
25    current_reply: Option<tokio::sync::oneshot::Sender<anyhow::Result<serde_json::Value>>>,
26}
27
28thread_local! {
29    static WORKER: RefCell<Option<WorkerState>> = const { RefCell::new(None) };
30}
31
32/// Initialize thread-local worker state.
33pub fn init_worker_state(
34    worker_id: u32,
35    task_rx: mpsc::Receiver<TaskRequest>,
36    ready_tx: mpsc::SyncSender<()>,
37) {
38    WORKER.with(|w| {
39        *w.borrow_mut() = Some(WorkerState {
40            worker_id,
41            task_rx,
42            ready_tx: Some(ready_tx),
43            current_reply: None,
44        });
45    });
46}
47
48/// Clean up thread-local state.
49pub fn cleanup_worker_state() {
50    WORKER.with(|w| {
51        *w.borrow_mut() = None;
52    });
53}
54
55/// Returns true if this thread has worker bridge state initialized.
56pub fn has_worker_state() -> bool {
57    WORKER.with(|w| w.borrow().is_some())
58}
59
60/// Signal ready. Returns Ok(true) if sent, Ok(false) if already called.
61pub fn do_ready() -> Result<bool, &'static str> {
62    WORKER.with(|w| {
63        let mut state = w.borrow_mut();
64        let state = state.as_mut().ok_or("not in a worker thread")?;
65
66        if let Some(tx) = state.ready_tx.take() {
67            let _ = tx.send(());
68            debug!(worker_id = state.worker_id, "worker signaled ready");
69            Ok(true)
70        } else {
71            Ok(false)
72        }
73    })
74}
75
76/// Block until a request arrives. Returns `(method, payload_json_bytes)` or `None` on shutdown.
77///
78/// Internally receives `serde_json::Value` from the channel and serializes to JSON bytes
79/// for PHP consumption. PHP calls `json_decode()` on these bytes.
80pub fn do_recv() -> Result<Option<(String, Vec<u8>)>, &'static str> {
81    WORKER.with(|w| {
82        let mut state = w.borrow_mut();
83        let state = state.as_mut().ok_or("not in a worker thread")?;
84
85        if let Ok(req) = state.task_rx.recv() {
86            let method = req.method.clone();
87            // Value → JSON bytes for PHP (only serialization on the hot path)
88            let payload_bytes = serde_json::to_vec(&req.payload).unwrap_or_default();
89            state.current_reply = Some(req.reply);
90            Ok(Some((method, payload_bytes)))
91        } else {
92            debug!(worker_id = state.worker_id, "recv: channel closed");
93            Ok(None)
94        }
95    })
96}
97
98/// Send a successful response (raw JSON bytes from PHP).
99///
100/// Internally deserializes JSON bytes from PHP into `serde_json::Value`
101/// for zero-copy return through the channel.
102pub fn do_send(data: &[u8]) -> Result<(), &'static str> {
103    WORKER.with(|w| {
104        let mut state = w.borrow_mut();
105        let state = state.as_mut().ok_or("not in a worker thread")?;
106
107        let reply = state.current_reply.take().ok_or("no pending request")?;
108        // JSON bytes → Value (only deserialization on the hot path)
109        let value: serde_json::Value =
110            serde_json::from_slice(data).unwrap_or(serde_json::Value::Null);
111        let _ = reply.send(Ok(value));
112        Ok(())
113    })
114}
115
116/// Run the dispatch loop directly from Rust, calling PHP via `call_user_function`.
117///
118/// This is the zero-copy path: `serde_json::Value` → zval → PHP handler → zval → Value.
119/// No JSON encode/decode at all.
120///
121/// `dispatch_fn` is the name of a PHP function with signature:
122/// `function(string $method, array $params): array`
123pub fn run_dispatch_loop(dispatch_fn: &str) -> Result<(), &'static str> {
124    WORKER.with(|w| {
125        // Signal ready first.
126        {
127            let mut state = w.borrow_mut();
128            let state = state.as_mut().ok_or("not in a worker thread")?;
129            if let Some(tx) = state.ready_tx.take() {
130                let _ = tx.send(());
131                debug!(
132                    worker_id = state.worker_id,
133                    "worker signaled ready (dispatch loop)"
134                );
135            }
136        }
137
138        // Main dispatch loop.
139        loop {
140            let req = {
141                let mut state = w.borrow_mut();
142                let state = state.as_mut().ok_or("not in a worker thread")?;
143                if let Ok(req) = state.task_rx.recv() {
144                    req
145                } else {
146                    debug!(worker_id = state.worker_id, "dispatch loop: channel closed");
147                    // Only the main thread (worker #1) should join ZTS workers.
148                    // ZTS workers must NOT join — they'd deadlock trying to join themselves.
149                    if state.worker_id == 1 {
150                        crate::join_zts_workers();
151                    }
152                    return Ok(());
153                }
154            };
155
156            // Call PHP handler directly: Value → zval → PHP → zval → Value.
157            let result = crate::zts::call_dispatch(dispatch_fn, &req.method, &req.payload);
158
159            match result {
160                Ok(value) => {
161                    let _ = req.reply.send(Ok(value));
162                },
163                Err(e) => {
164                    let _ = req.reply.send(Err(e));
165                },
166            }
167        }
168    })
169}
170
171/// Send an error response.
172pub fn do_send_error(message: &str) -> Result<(), &'static str> {
173    WORKER.with(|w| {
174        let mut state = w.borrow_mut();
175        let state = state.as_mut().ok_or("not in a worker thread")?;
176
177        let reply = state.current_reply.take().ok_or("no pending request")?;
178        let _ = reply.send(Err(anyhow::anyhow!("{message}")));
179        Ok(())
180    })
181}