1use std::cell::RefCell;
7use std::sync::Arc;
8use std::sync::mpsc;
9
10use tracing::debug;
11
12pub struct TaskRequest {
14 pub request_id: Arc<str>,
16 pub method: String,
17 pub payload: serde_json::Value,
18 pub reply: tokio::sync::oneshot::Sender<anyhow::Result<serde_json::Value>>,
21}
22
23struct WorkerState {
25 worker_id: u32,
26 task_rx: mpsc::Receiver<TaskRequest>,
27 ready_tx: Option<mpsc::SyncSender<()>>,
28 current_reply: Option<tokio::sync::oneshot::Sender<anyhow::Result<serde_json::Value>>>,
29 current_request_id: Option<Arc<str>>,
32}
33
34thread_local! {
35 static WORKER: RefCell<Option<WorkerState>> = const { RefCell::new(None) };
36}
37
38pub fn init_worker_state(
40 worker_id: u32,
41 task_rx: mpsc::Receiver<TaskRequest>,
42 ready_tx: mpsc::SyncSender<()>,
43) {
44 WORKER.with(|w| {
45 *w.borrow_mut() = Some(WorkerState {
46 worker_id,
47 task_rx,
48 ready_tx: Some(ready_tx),
49 current_reply: None,
50 current_request_id: None,
51 });
52 });
53}
54
55pub fn cleanup_worker_state() {
57 WORKER.with(|w| {
58 *w.borrow_mut() = None;
59 });
60}
61
62pub fn has_worker_state() -> bool {
64 WORKER.with(|w| w.borrow().is_some())
65}
66
67pub fn current_request_id() -> Option<Arc<str>> {
72 WORKER.with(|w| {
73 w.borrow()
74 .as_ref()
75 .and_then(|state| state.current_request_id.clone())
76 })
77}
78
79pub fn do_ready() -> Result<bool, &'static str> {
81 WORKER.with(|w| {
82 let mut state = w.borrow_mut();
83 let state = state.as_mut().ok_or("not in a worker thread")?;
84
85 if let Some(tx) = state.ready_tx.take() {
86 let _ = tx.send(());
87 debug!(worker_id = state.worker_id, "worker signaled ready");
88 Ok(true)
89 } else {
90 Ok(false)
91 }
92 })
93}
94
95pub fn do_recv() -> Result<Option<(String, Vec<u8>)>, &'static str> {
100 WORKER.with(|w| {
101 let mut state = w.borrow_mut();
102 let state = state.as_mut().ok_or("not in a worker thread")?;
103
104 if let Ok(req) = state.task_rx.recv() {
105 let method = req.method.clone();
106 let payload_bytes = serde_json::to_vec(&req.payload).unwrap_or_default();
108 state.current_request_id = Some(req.request_id);
109 state.current_reply = Some(req.reply);
110 Ok(Some((method, payload_bytes)))
111 } else {
112 debug!(worker_id = state.worker_id, "recv: channel closed");
113 Ok(None)
114 }
115 })
116}
117
118pub fn do_send(data: &[u8]) -> Result<(), &'static str> {
123 WORKER.with(|w| {
124 let mut state = w.borrow_mut();
125 let state = state.as_mut().ok_or("not in a worker thread")?;
126
127 let reply = state.current_reply.take().ok_or("no pending request")?;
128 state.current_request_id = None;
129 let result = serde_json::from_slice(data).map_err(|e| {
131 tracing::error!("PHP returned malformed JSON: {e}");
132 anyhow::anyhow!("PHP returned malformed JSON: {e}")
133 });
134 let _ = reply.send(result);
135 Ok(())
136 })
137}
138
139pub fn run_dispatch_loop(dispatch_fn: &str) -> Result<(), &'static str> {
147 WORKER.with(|w| {
148 {
150 let mut state = w.borrow_mut();
151 let state = state.as_mut().ok_or("not in a worker thread")?;
152 if let Some(tx) = state.ready_tx.take() {
153 let _ = tx.send(());
154 debug!(
155 worker_id = state.worker_id,
156 "worker signaled ready (dispatch loop)"
157 );
158 }
159 }
160
161 if let Some(root) = crate::project_root() {
165 let _ = crate::zts::chdir(&root.to_string_lossy());
166 }
167
168 loop {
170 let req = {
171 let mut state = w.borrow_mut();
172 let state = state.as_mut().ok_or("not in a worker thread")?;
173 if let Ok(req) = state.task_rx.recv() {
174 state.current_request_id = Some(req.request_id.clone());
178 req
179 } else {
180 debug!(worker_id = state.worker_id, "dispatch loop: channel closed");
181 if state.worker_id == 1 {
184 crate::join_zts_workers();
185 }
186 return Ok(());
187 }
188 };
189
190 let result = crate::zts::call_dispatch(dispatch_fn, &req.method, &req.payload);
192
193 match result {
194 Ok(value) => {
195 let _ = req.reply.send(Ok(value));
196 },
197 Err(e) => {
198 let _ = req.reply.send(Err(e));
199 },
200 }
201
202 if let Ok(mut state) = w.try_borrow_mut() {
204 if let Some(state) = state.as_mut() {
205 state.current_request_id = None;
206 }
207 }
208 }
209 })
210}
211
212pub fn do_send_error(message: &str) -> Result<(), &'static str> {
214 WORKER.with(|w| {
215 let mut state = w.borrow_mut();
216 let state = state.as_mut().ok_or("not in a worker thread")?;
217
218 let reply = state.current_reply.take().ok_or("no pending request")?;
219 state.current_request_id = None;
220 let _ = reply.send(Err(anyhow::anyhow!("{message}")));
221 Ok(())
222 })
223}