1use std::cell::RefCell;
7use std::sync::mpsc;
8
9use tracing::debug;
10
11pub struct TaskRequest {
13 pub request_id: u64,
15 pub method: String,
16 pub payload: serde_json::Value,
17 pub reply: tokio::sync::oneshot::Sender<anyhow::Result<serde_json::Value>>,
20}
21
22struct WorkerState {
24 worker_id: u32,
25 task_rx: mpsc::Receiver<TaskRequest>,
26 ready_tx: Option<mpsc::SyncSender<()>>,
27 current_reply: Option<tokio::sync::oneshot::Sender<anyhow::Result<serde_json::Value>>>,
28 current_request_id: u64,
31}
32
33thread_local! {
34 static WORKER: RefCell<Option<WorkerState>> = const { RefCell::new(None) };
35}
36
37pub fn init_worker_state(
39 worker_id: u32,
40 task_rx: mpsc::Receiver<TaskRequest>,
41 ready_tx: mpsc::SyncSender<()>,
42) {
43 WORKER.with(|w| {
44 *w.borrow_mut() = Some(WorkerState {
45 worker_id,
46 task_rx,
47 ready_tx: Some(ready_tx),
48 current_reply: None,
49 current_request_id: 0,
50 });
51 });
52}
53
54pub fn cleanup_worker_state() {
56 WORKER.with(|w| {
57 *w.borrow_mut() = None;
58 });
59}
60
61pub fn has_worker_state() -> bool {
63 WORKER.with(|w| w.borrow().is_some())
64}
65
66pub fn current_request_id() -> u64 {
71 WORKER.with(|w| {
72 w.borrow()
73 .as_ref()
74 .map_or(0, |state| state.current_request_id)
75 })
76}
77
78pub fn do_ready() -> Result<bool, &'static str> {
80 WORKER.with(|w| {
81 let mut state = w.borrow_mut();
82 let state = state.as_mut().ok_or("not in a worker thread")?;
83
84 if let Some(tx) = state.ready_tx.take() {
85 let _ = tx.send(());
86 debug!(worker_id = state.worker_id, "worker signaled ready");
87 Ok(true)
88 } else {
89 Ok(false)
90 }
91 })
92}
93
94pub fn do_recv() -> Result<Option<(String, Vec<u8>)>, &'static str> {
99 WORKER.with(|w| {
100 let mut state = w.borrow_mut();
101 let state = state.as_mut().ok_or("not in a worker thread")?;
102
103 if let Ok(req) = state.task_rx.recv() {
104 let method = req.method.clone();
105 let payload_bytes = serde_json::to_vec(&req.payload).unwrap_or_default();
107 state.current_request_id = req.request_id;
108 state.current_reply = Some(req.reply);
109 Ok(Some((method, payload_bytes)))
110 } else {
111 debug!(worker_id = state.worker_id, "recv: channel closed");
112 Ok(None)
113 }
114 })
115}
116
117pub fn do_send(data: &[u8]) -> Result<(), &'static str> {
122 WORKER.with(|w| {
123 let mut state = w.borrow_mut();
124 let state = state.as_mut().ok_or("not in a worker thread")?;
125
126 let reply = state.current_reply.take().ok_or("no pending request")?;
127 let value: serde_json::Value =
129 serde_json::from_slice(data).unwrap_or(serde_json::Value::Null);
130 let _ = reply.send(Ok(value));
131 Ok(())
132 })
133}
134
135pub fn run_dispatch_loop(dispatch_fn: &str) -> Result<(), &'static str> {
143 WORKER.with(|w| {
144 {
146 let mut state = w.borrow_mut();
147 let state = state.as_mut().ok_or("not in a worker thread")?;
148 if let Some(tx) = state.ready_tx.take() {
149 let _ = tx.send(());
150 debug!(
151 worker_id = state.worker_id,
152 "worker signaled ready (dispatch loop)"
153 );
154 }
155 }
156
157 if let Some(root) = crate::project_root() {
161 let _ = crate::zts::chdir(&root.to_string_lossy());
162 }
163
164 loop {
166 let req = {
167 let mut state = w.borrow_mut();
168 let state = state.as_mut().ok_or("not in a worker thread")?;
169 if let Ok(req) = state.task_rx.recv() {
170 state.current_request_id = req.request_id;
174 req
175 } else {
176 debug!(worker_id = state.worker_id, "dispatch loop: channel closed");
177 if state.worker_id == 1 {
180 crate::join_zts_workers();
181 }
182 return Ok(());
183 }
184 };
185
186 let result = crate::zts::call_dispatch(dispatch_fn, &req.method, &req.payload);
188
189 match result {
190 Ok(value) => {
191 let _ = req.reply.send(Ok(value));
192 },
193 Err(e) => {
194 let _ = req.reply.send(Err(e));
195 },
196 }
197
198 if let Ok(mut state) = w.try_borrow_mut() {
200 if let Some(state) = state.as_mut() {
201 state.current_request_id = 0;
202 }
203 }
204 }
205 })
206}
207
208pub fn do_send_error(message: &str) -> Result<(), &'static str> {
210 WORKER.with(|w| {
211 let mut state = w.borrow_mut();
212 let state = state.as_mut().ok_or("not in a worker thread")?;
213
214 let reply = state.current_reply.take().ok_or("no pending request")?;
215 let _ = reply.send(Err(anyhow::anyhow!("{message}")));
216 Ok(())
217 })
218}