1use std::cell::RefCell;
7use std::sync::mpsc;
8
9use tracing::debug;
10
11pub struct TaskRequest {
13 pub method: String,
14 pub payload: serde_json::Value,
15 pub reply: tokio::sync::oneshot::Sender<anyhow::Result<serde_json::Value>>,
18}
19
20struct WorkerState {
22 worker_id: u32,
23 task_rx: mpsc::Receiver<TaskRequest>,
24 ready_tx: Option<mpsc::SyncSender<()>>,
25 current_reply: Option<tokio::sync::oneshot::Sender<anyhow::Result<serde_json::Value>>>,
26}
27
28thread_local! {
29 static WORKER: RefCell<Option<WorkerState>> = const { RefCell::new(None) };
30}
31
32pub fn init_worker_state(
34 worker_id: u32,
35 task_rx: mpsc::Receiver<TaskRequest>,
36 ready_tx: mpsc::SyncSender<()>,
37) {
38 WORKER.with(|w| {
39 *w.borrow_mut() = Some(WorkerState {
40 worker_id,
41 task_rx,
42 ready_tx: Some(ready_tx),
43 current_reply: None,
44 });
45 });
46}
47
48pub fn cleanup_worker_state() {
50 WORKER.with(|w| {
51 *w.borrow_mut() = None;
52 });
53}
54
55pub fn has_worker_state() -> bool {
57 WORKER.with(|w| w.borrow().is_some())
58}
59
60pub fn do_ready() -> Result<bool, &'static str> {
62 WORKER.with(|w| {
63 let mut state = w.borrow_mut();
64 let state = state.as_mut().ok_or("not in a worker thread")?;
65
66 if let Some(tx) = state.ready_tx.take() {
67 let _ = tx.send(());
68 debug!(worker_id = state.worker_id, "worker signaled ready");
69 Ok(true)
70 } else {
71 Ok(false)
72 }
73 })
74}
75
76pub fn do_recv() -> Result<Option<(String, Vec<u8>)>, &'static str> {
81 WORKER.with(|w| {
82 let mut state = w.borrow_mut();
83 let state = state.as_mut().ok_or("not in a worker thread")?;
84
85 if let Ok(req) = state.task_rx.recv() {
86 let method = req.method.clone();
87 let payload_bytes = serde_json::to_vec(&req.payload).unwrap_or_default();
89 state.current_reply = Some(req.reply);
90 Ok(Some((method, payload_bytes)))
91 } else {
92 debug!(worker_id = state.worker_id, "recv: channel closed");
93 Ok(None)
94 }
95 })
96}
97
98pub fn do_send(data: &[u8]) -> Result<(), &'static str> {
103 WORKER.with(|w| {
104 let mut state = w.borrow_mut();
105 let state = state.as_mut().ok_or("not in a worker thread")?;
106
107 let reply = state.current_reply.take().ok_or("no pending request")?;
108 let value: serde_json::Value =
110 serde_json::from_slice(data).unwrap_or(serde_json::Value::Null);
111 let _ = reply.send(Ok(value));
112 Ok(())
113 })
114}
115
116pub fn run_dispatch_loop(dispatch_fn: &str) -> Result<(), &'static str> {
124 WORKER.with(|w| {
125 {
127 let mut state = w.borrow_mut();
128 let state = state.as_mut().ok_or("not in a worker thread")?;
129 if let Some(tx) = state.ready_tx.take() {
130 let _ = tx.send(());
131 debug!(
132 worker_id = state.worker_id,
133 "worker signaled ready (dispatch loop)"
134 );
135 }
136 }
137
138 loop {
140 let req = {
141 let mut state = w.borrow_mut();
142 let state = state.as_mut().ok_or("not in a worker thread")?;
143 if let Ok(req) = state.task_rx.recv() {
144 req
145 } else {
146 debug!(worker_id = state.worker_id, "dispatch loop: channel closed");
147 if state.worker_id == 1 {
150 crate::join_zts_workers();
151 }
152 return Ok(());
153 }
154 };
155
156 let result = crate::zts::call_dispatch(dispatch_fn, &req.method, &req.payload);
158
159 match result {
160 Ok(value) => {
161 let _ = req.reply.send(Ok(value));
162 },
163 Err(e) => {
164 let _ = req.reply.send(Err(e));
165 },
166 }
167 }
168 })
169}
170
171pub fn do_send_error(message: &str) -> Result<(), &'static str> {
173 WORKER.with(|w| {
174 let mut state = w.borrow_mut();
175 let state = state.as_mut().ok_or("not in a worker thread")?;
176
177 let reply = state.current_reply.take().ok_or("no pending request")?;
178 let _ = reply.send(Err(anyhow::anyhow!("{message}")));
179 Ok(())
180 })
181}