1use std::cell::RefCell;
7use std::sync::Arc;
8use std::sync::mpsc;
9
10use tracing::debug;
11
12pub struct TaskRequest {
14 pub request_id: Arc<str>,
16 pub method: String,
17 pub payload: serde_json::Value,
18 pub reply: tokio::sync::oneshot::Sender<anyhow::Result<serde_json::Value>>,
21}
22
23struct WorkerState {
25 worker_id: u32,
26 task_rx: mpsc::Receiver<TaskRequest>,
27 ready_tx: Option<mpsc::SyncSender<()>>,
28 current_reply: Option<tokio::sync::oneshot::Sender<anyhow::Result<serde_json::Value>>>,
29 current_request_id: Option<Arc<str>>,
32}
33
34thread_local! {
35 static WORKER: RefCell<Option<WorkerState>> = const { RefCell::new(None) };
36}
37
38pub fn init_worker_state(
40 worker_id: u32,
41 task_rx: mpsc::Receiver<TaskRequest>,
42 ready_tx: mpsc::SyncSender<()>,
43) {
44 WORKER.with(|w| {
45 *w.borrow_mut() = Some(WorkerState {
46 worker_id,
47 task_rx,
48 ready_tx: Some(ready_tx),
49 current_reply: None,
50 current_request_id: None,
51 });
52 });
53}
54
55pub fn cleanup_worker_state() {
57 WORKER.with(|w| {
58 *w.borrow_mut() = None;
59 });
60}
61
62pub fn has_worker_state() -> bool {
64 WORKER.with(|w| w.borrow().is_some())
65}
66
67pub fn current_request_id() -> Option<Arc<str>> {
72 WORKER.with(|w| {
73 w.borrow()
74 .as_ref()
75 .and_then(|state| state.current_request_id.clone())
76 })
77}
78
79pub fn do_ready() -> Result<bool, &'static str> {
81 WORKER.with(|w| {
82 let mut state = w.borrow_mut();
83 let state = state.as_mut().ok_or("not in a worker thread")?;
84
85 if let Some(tx) = state.ready_tx.take() {
86 let _ = tx.send(());
87 debug!(worker_id = state.worker_id, "worker signaled ready");
88 Ok(true)
89 } else {
90 Ok(false)
91 }
92 })
93}
94
95pub fn do_recv() -> Result<Option<(String, Vec<u8>)>, &'static str> {
100 WORKER.with(|w| {
101 let mut state = w.borrow_mut();
102 let state = state.as_mut().ok_or("not in a worker thread")?;
103
104 if let Ok(req) = state.task_rx.recv() {
105 let method = req.method.clone();
106 let payload_bytes = serde_json::to_vec(&req.payload).unwrap_or_default();
108 state.current_request_id = Some(req.request_id);
109 state.current_reply = Some(req.reply);
110 Ok(Some((method, payload_bytes)))
111 } else {
112 debug!(worker_id = state.worker_id, "recv: channel closed");
113 Ok(None)
114 }
115 })
116}
117
118pub fn do_send(data: &[u8]) -> Result<(), &'static str> {
123 WORKER.with(|w| {
124 let mut state = w.borrow_mut();
125 let state = state.as_mut().ok_or("not in a worker thread")?;
126
127 let reply = state.current_reply.take().ok_or("no pending request")?;
128 let value: serde_json::Value =
130 serde_json::from_slice(data).unwrap_or(serde_json::Value::Null);
131 let _ = reply.send(Ok(value));
132 Ok(())
133 })
134}
135
136pub fn run_dispatch_loop(dispatch_fn: &str) -> Result<(), &'static str> {
144 WORKER.with(|w| {
145 {
147 let mut state = w.borrow_mut();
148 let state = state.as_mut().ok_or("not in a worker thread")?;
149 if let Some(tx) = state.ready_tx.take() {
150 let _ = tx.send(());
151 debug!(
152 worker_id = state.worker_id,
153 "worker signaled ready (dispatch loop)"
154 );
155 }
156 }
157
158 if let Some(root) = crate::project_root() {
162 let _ = crate::zts::chdir(&root.to_string_lossy());
163 }
164
165 loop {
167 let req = {
168 let mut state = w.borrow_mut();
169 let state = state.as_mut().ok_or("not in a worker thread")?;
170 if let Ok(req) = state.task_rx.recv() {
171 state.current_request_id = Some(req.request_id.clone());
175 req
176 } else {
177 debug!(worker_id = state.worker_id, "dispatch loop: channel closed");
178 if state.worker_id == 1 {
181 crate::join_zts_workers();
182 }
183 return Ok(());
184 }
185 };
186
187 let result = crate::zts::call_dispatch(dispatch_fn, &req.method, &req.payload);
189
190 match result {
191 Ok(value) => {
192 let _ = req.reply.send(Ok(value));
193 },
194 Err(e) => {
195 let _ = req.reply.send(Err(e));
196 },
197 }
198
199 if let Ok(mut state) = w.try_borrow_mut() {
201 if let Some(state) = state.as_mut() {
202 state.current_request_id = None;
203 }
204 }
205 }
206 })
207}
208
209pub fn do_send_error(message: &str) -> Result<(), &'static str> {
211 WORKER.with(|w| {
212 let mut state = w.borrow_mut();
213 let state = state.as_mut().ok_or("not in a worker thread")?;
214
215 let reply = state.current_reply.take().ok_or("no pending request")?;
216 let _ = reply.send(Err(anyhow::anyhow!("{message}")));
217 Ok(())
218 })
219}