moduvex_runtime/executor/
worker.rs1use std::collections::HashMap;
17use std::sync::atomic::{AtomicBool, Ordering};
18use std::sync::{Arc, Mutex};
19use std::task::Context;
20
21use crate::platform::sys::{create_pipe, events_with_capacity, Interest};
22use crate::reactor::{with_reactor, with_reactor_mut};
23use crate::time::{next_timer_deadline, tick_timer_wheel};
24
25#[cfg(unix)]
26use crate::signal::{on_signal_readable, SIGNAL_TOKEN};
27
28use super::scheduler::{GlobalQueue, LocalQueue};
29use super::task::{Task, TaskHeader, STATE_CANCELLED, STATE_COMPLETED};
30use super::waker::{make_waker_with_notifier, WorkerNotifier};
31use super::work_stealing::{StealableQueue, WorkStealingPool};
32
33const WAKE_TOKEN: usize = usize::MAX - 1;
36
37pub(crate) struct WorkerThread {
44 pub worker_id: usize,
46 pub stealable: Arc<StealableQueue>,
48 pub local: LocalQueue,
50 pub global: Arc<GlobalQueue>,
52 pub steal_pool: Arc<WorkStealingPool>,
54 pub tasks: Arc<Mutex<HashMap<usize, Task>>>,
56 pub shutdown: Arc<AtomicBool>,
58 notifier: Arc<WorkerNotifier>,
60 wake_rx: i32,
62 wake_tx: i32,
64}
65
66impl WorkerThread {
67 pub(crate) fn new(
69 worker_id: usize,
70 global: Arc<GlobalQueue>,
71 steal_pool: Arc<WorkStealingPool>,
72 tasks: Arc<Mutex<HashMap<usize, Task>>>,
73 shutdown: Arc<AtomicBool>,
74 notifier: Arc<WorkerNotifier>,
75 ) -> std::io::Result<Self> {
76 let (wake_rx, wake_tx) = create_pipe()?;
77 with_reactor(|r| r.register(wake_rx, WAKE_TOKEN, Interest::READABLE))?;
78 let stealable = Arc::new(StealableQueue::new());
79 Ok(Self {
80 worker_id,
81 stealable,
82 local: LocalQueue::new(),
83 global,
84 steal_pool,
85 tasks,
86 shutdown,
87 notifier,
88 wake_rx,
89 wake_tx,
90 })
91 }
92
93 pub(crate) fn wake_tx(&self) -> i32 {
95 self.wake_tx
96 }
97
98 fn next_task(&mut self) -> Option<Arc<TaskHeader>> {
100 if let Some(h) = self.local.pop() {
102 return Some(h);
103 }
104 {
106 let mut sq = self.stealable.local_mut();
107 if !sq.is_empty() {
108 let mut batch = Vec::with_capacity(16);
109 sq.drain_front(&mut batch, 16);
110 drop(sq);
111 for h in batch {
112 if self.local.push(h).is_some() {
113 }
116 }
117 return self.local.pop();
118 }
119 }
120 if let Some(h) = self.global.pop() {
122 return Some(h);
123 }
124 let n = self
126 .steal_pool
127 .steal_one(self.worker_id, &mut self.local, &self.global);
128 if n > 0 {
129 return self.local.pop();
130 }
131 None
132 }
133
134 pub(crate) fn run(&mut self) {
136 loop {
137 if self.shutdown.load(Ordering::Acquire) {
139 self.drain_all_tasks();
141 break;
142 }
143
144 let expired = tick_timer_wheel(std::time::Instant::now());
146 for w in expired {
147 w.wake();
148 }
149
150 let mut did_work = false;
152 loop {
153 let Some(header) = self.next_task() else {
154 break;
155 };
156 did_work = true;
157 self.run_task(header);
158 }
159
160 if !did_work {
162 if self.shutdown.load(Ordering::Acquire) {
163 self.drain_all_tasks();
164 break;
165 }
166 self.park();
167 }
168 }
169 }
170
171 fn run_task(&mut self, header: Arc<TaskHeader>) {
173 let key = Arc::as_ptr(&header) as usize;
174 let state = header.state.load(Ordering::Acquire);
175
176 if state == STATE_CANCELLED {
177 let task = self.tasks.lock().unwrap().remove(&key);
178 if let Some(t) = task {
179 t.cancel();
180 }
181 return;
182 }
183 if state == STATE_COMPLETED {
184 self.tasks.lock().unwrap().remove(&key);
185 return;
186 }
187
188 let waker = make_waker_with_notifier(
190 Arc::clone(&header),
191 Arc::clone(&self.global),
192 Some(Arc::clone(&self.notifier)),
193 );
194 let mut cx = Context::from_waker(&waker);
195
196 let task = self.tasks.lock().unwrap().remove(&key);
198 if let Some(task) = task {
199 let completed = task.poll_task(&mut cx);
200 if !completed {
201 self.tasks.lock().unwrap().insert(key, task);
202 }
203 }
204 }
205
206 fn drain_all_tasks(&mut self) {
208 while let Some(h) = self.local.pop() {
210 let _ = h; }
212 {
214 let mut sq = self.stealable.local_mut();
215 while sq.pop().is_some() {}
216 }
217 }
218
219 fn park(&self) {
221 const MAX_PARK_MS: u64 = 10;
222
223 let timeout_ms = match next_timer_deadline() {
224 None => MAX_PARK_MS,
225 Some(deadline) => {
226 let now = std::time::Instant::now();
227 if deadline <= now {
228 0
229 } else {
230 let ms = deadline.duration_since(now).as_millis() as u64;
231 ms.min(MAX_PARK_MS)
232 }
233 }
234 };
235
236 let mut events = events_with_capacity(64);
237 let _ = with_reactor_mut(|r| r.poll(&mut events, Some(timeout_ms)));
238 self.drain_wake_pipe();
239
240 #[cfg(unix)]
241 {
242 let signal_fired = events.iter().any(|ev| ev.token == SIGNAL_TOKEN && ev.readable);
243 if signal_fired {
244 on_signal_readable();
245 }
246 }
247 }
248
249 #[cfg(unix)]
251 fn drain_wake_pipe(&self) {
252 let mut buf = [0u8; 64];
253 loop {
254 let n = unsafe { libc::read(self.wake_rx, buf.as_mut_ptr() as *mut _, buf.len()) };
256 if n <= 0 {
257 break;
258 }
259 }
260 }
261
262 #[cfg(not(unix))]
263 fn drain_wake_pipe(&self) {}
264}
265
266impl Drop for WorkerThread {
267 fn drop(&mut self) {
268 let _ = with_reactor(|r| r.deregister(self.wake_rx));
269 #[cfg(unix)]
271 unsafe {
272 libc::close(self.wake_rx);
273 libc::close(self.wake_tx);
274 }
275 }
276}
277
278thread_local! {
283 pub(crate) static CURRENT_WORKER_WAKE_TX: std::cell::Cell<i32> =
284 const { std::cell::Cell::new(-1) };
285}
286
287pub(crate) fn set_current_worker_wake_tx(fd: i32) {
289 CURRENT_WORKER_WAKE_TX.with(|c| c.set(fd));
290}
291
292pub(crate) fn clear_current_worker_wake_tx() {
294 CURRENT_WORKER_WAKE_TX.with(|c| c.set(-1));
295}