Skip to main content

moduvex_runtime/executor/
worker.rs

1//! Per-worker state for the multi-threaded executor.
2//!
3//! Each worker thread owns a `StealableQueue` and runs a `run_worker` loop:
4//!
5//! ```text
6//! run_worker(id, shared_state)
7//!   ├─ check own StealableQueue (local, fast path)
8//!   ├─ check GlobalQueue (cross-thread injection)
9//!   ├─ steal from random victim in WorkStealingPool
10//!   └─ park on reactor (I/O + timer deadline)
11//! ```
12//!
13//! Worker 0 is always the main thread (drives the root future).
14//! Workers 1..N are spawned background threads.
15
16use std::collections::HashMap;
17use std::sync::atomic::{AtomicBool, Ordering};
18use std::sync::{Arc, Mutex};
19use std::task::Context;
20
21use crate::platform::sys::{create_pipe, events_with_capacity, Interest};
22use crate::reactor::{with_reactor, with_reactor_mut};
23use crate::time::{next_timer_deadline, tick_timer_wheel};
24
25#[cfg(unix)]
26use crate::signal::{on_signal_readable, SIGNAL_TOKEN};
27
28use super::scheduler::{GlobalQueue, LocalQueue};
29use super::task::{Task, TaskHeader, STATE_CANCELLED, STATE_COMPLETED};
30use super::waker::{make_waker_with_notifier, WorkerNotifier};
31use super::work_stealing::{StealableQueue, WorkStealingPool};
32
33/// Sentinel reactor token for the self-pipe read end.
34/// Must not collide with user tokens (which start at 0).
35const WAKE_TOKEN: usize = usize::MAX - 1;
36
37// ── WorkerThread ──────────────────────────────────────────────────────────────
38
39/// Per-worker thread state.
40///
41/// Each worker owns its own stealable queue and self-pipe for reactor wakeup.
42/// Workers share GlobalQueue, WorkStealingPool, task map, and shutdown flag.
43pub(crate) struct WorkerThread {
44    /// Unique worker index (0 = main thread).
45    pub worker_id: usize,
46    /// This worker's stealable queue, also registered with the pool.
47    pub stealable: Arc<StealableQueue>,
48    /// Local buffer for dequeued tasks (avoid repeated lock contention).
49    pub local: LocalQueue,
50    /// Shared cross-thread injection queue.
51    pub global: Arc<GlobalQueue>,
52    /// Pool of all worker queues for stealing.
53    pub steal_pool: Arc<WorkStealingPool>,
54    /// Shared task ownership map (key = TaskHeader ptr addr).
55    pub tasks: Arc<Mutex<HashMap<usize, Task>>>,
56    /// Shared shutdown signal.
57    pub shutdown: Arc<AtomicBool>,
58    /// Notifier for unparking workers when tasks are enqueued.
59    notifier: Arc<WorkerNotifier>,
60    /// Read end of the self-pipe (registered with reactor).
61    wake_rx: i32,
62    /// Write end of the self-pipe (written by wakers to unpark).
63    wake_tx: i32,
64}
65
66impl WorkerThread {
67    /// Construct a worker, registering its self-pipe with the thread's reactor.
68    pub(crate) fn new(
69        worker_id: usize,
70        global: Arc<GlobalQueue>,
71        steal_pool: Arc<WorkStealingPool>,
72        tasks: Arc<Mutex<HashMap<usize, Task>>>,
73        shutdown: Arc<AtomicBool>,
74        notifier: Arc<WorkerNotifier>,
75    ) -> std::io::Result<Self> {
76        let (wake_rx, wake_tx) = create_pipe()?;
77        with_reactor(|r| r.register(wake_rx, WAKE_TOKEN, Interest::READABLE))?;
78        let stealable = Arc::new(StealableQueue::new());
79        Ok(Self {
80            worker_id,
81            stealable,
82            local: LocalQueue::new(),
83            global,
84            steal_pool,
85            tasks,
86            shutdown,
87            notifier,
88            wake_rx,
89            wake_tx,
90        })
91    }
92
93    /// The write end of the self-pipe — used to wake this worker from the reactor.
94    pub(crate) fn wake_tx(&self) -> i32 {
95        self.wake_tx
96    }
97
98    /// Pop the next task header to run: local → global → steal.
99    fn next_task(&mut self) -> Option<Arc<TaskHeader>> {
100        // 1. Local queue first (fast path, no locking).
101        if let Some(h) = self.local.pop() {
102            return Some(h);
103        }
104        // 2. Drain from stealable queue into local.
105        {
106            let mut sq = self.stealable.local_mut();
107            if !sq.is_empty() {
108                let mut batch = Vec::with_capacity(16);
109                sq.drain_front(&mut batch, 16);
110                drop(sq);
111                for h in batch {
112                    if self.local.push(h).is_some() {
113                        // overflow: push back to global (should rarely happen)
114                        // we can't re-push to stealable here without recursion
115                    }
116                }
117                return self.local.pop();
118            }
119        }
120        // 3. Steal from global queue.
121        if let Some(h) = self.global.pop() {
122            return Some(h);
123        }
124        // 4. Work steal from peer workers.
125        let n = self
126            .steal_pool
127            .steal_one(self.worker_id, &mut self.local, &self.global);
128        if n > 0 {
129            return self.local.pop();
130        }
131        None
132    }
133
134    /// Run the worker loop. Returns when `shutdown` is set and all work drained.
135    pub(crate) fn run(&mut self) {
136        loop {
137            // Check shutdown first.
138            if self.shutdown.load(Ordering::Acquire) {
139                // Drain remaining tasks before stopping.
140                self.drain_all_tasks();
141                break;
142            }
143
144            // Tick expired timers.
145            let expired = tick_timer_wheel(std::time::Instant::now());
146            for w in expired {
147                w.wake();
148            }
149
150            // Drain task queues.
151            let mut did_work = false;
152            loop {
153                let Some(header) = self.next_task() else {
154                    break;
155                };
156                did_work = true;
157                self.run_task(header);
158            }
159
160            // Park on reactor when no work found.
161            if !did_work {
162                if self.shutdown.load(Ordering::Acquire) {
163                    self.drain_all_tasks();
164                    break;
165                }
166                self.park();
167            }
168        }
169    }
170
171    /// Run a single task identified by its header.
172    fn run_task(&mut self, header: Arc<TaskHeader>) {
173        let key = Arc::as_ptr(&header) as usize;
174        let state = header.state.load(Ordering::Acquire);
175
176        if state == STATE_CANCELLED {
177            let task = self.tasks.lock().unwrap().remove(&key);
178            if let Some(t) = task {
179                t.cancel();
180            }
181            return;
182        }
183        if state == STATE_COMPLETED {
184            self.tasks.lock().unwrap().remove(&key);
185            return;
186        }
187
188        // Build a waker that re-enqueues on wake and notifies a worker.
189        let waker = make_waker_with_notifier(
190            Arc::clone(&header),
191            Arc::clone(&self.global),
192            Some(Arc::clone(&self.notifier)),
193        );
194        let mut cx = Context::from_waker(&waker);
195
196        // Extract task atomically (single lock) to avoid TOCTOU race.
197        let task = self.tasks.lock().unwrap().remove(&key);
198        if let Some(task) = task {
199            let completed = task.poll_task(&mut cx);
200            if !completed {
201                self.tasks.lock().unwrap().insert(key, task);
202            }
203        }
204    }
205
206    /// Drain all remaining tasks (called on shutdown).
207    fn drain_all_tasks(&mut self) {
208        // Drain local queues.
209        while let Some(h) = self.local.pop() {
210            let _ = h; // let headers drop
211        }
212        // Stealable queue.
213        {
214            let mut sq = self.stealable.local_mut();
215            while sq.pop().is_some() {}
216        }
217    }
218
219    /// Park on the reactor until I/O event or timer fires.
220    fn park(&self) {
221        const MAX_PARK_MS: u64 = 10;
222
223        let timeout_ms = match next_timer_deadline() {
224            None => MAX_PARK_MS,
225            Some(deadline) => {
226                let now = std::time::Instant::now();
227                if deadline <= now {
228                    0
229                } else {
230                    let ms = deadline.duration_since(now).as_millis() as u64;
231                    ms.min(MAX_PARK_MS)
232                }
233            }
234        };
235
236        let mut events = events_with_capacity(64);
237        let _ = with_reactor_mut(|r| r.poll(&mut events, Some(timeout_ms)));
238        self.drain_wake_pipe();
239
240        #[cfg(unix)]
241        {
242            let signal_fired = events.iter().any(|ev| ev.token == SIGNAL_TOKEN && ev.readable);
243            if signal_fired {
244                on_signal_readable();
245            }
246        }
247    }
248
249    /// Drain all pending bytes from the self-pipe read end (non-blocking).
250    #[cfg(unix)]
251    fn drain_wake_pipe(&self) {
252        let mut buf = [0u8; 64];
253        loop {
254            // SAFETY: `wake_rx` is a valid O_NONBLOCK fd we own.
255            let n = unsafe { libc::read(self.wake_rx, buf.as_mut_ptr() as *mut _, buf.len()) };
256            if n <= 0 {
257                break;
258            }
259        }
260    }
261
262    #[cfg(not(unix))]
263    fn drain_wake_pipe(&self) {}
264}
265
266impl Drop for WorkerThread {
267    fn drop(&mut self) {
268        let _ = with_reactor(|r| r.deregister(self.wake_rx));
269        // SAFETY: we own wake_rx and wake_tx exclusively.
270        #[cfg(unix)]
271        unsafe {
272            libc::close(self.wake_rx);
273            libc::close(self.wake_tx);
274        }
275    }
276}
277
278// ── Thread-local current worker ───────────────────────────────────────────────
279
280// Thread-local: write end of the current worker's self-pipe.
281// Used by root wakers on worker threads to unpark the reactor park.
282thread_local! {
283    pub(crate) static CURRENT_WORKER_WAKE_TX: std::cell::Cell<i32> =
284        const { std::cell::Cell::new(-1) };
285}
286
287/// Set the current worker's wake_tx in the thread-local.
288pub(crate) fn set_current_worker_wake_tx(fd: i32) {
289    CURRENT_WORKER_WAKE_TX.with(|c| c.set(fd));
290}
291
292/// Clear the current worker's wake_tx.
293pub(crate) fn clear_current_worker_wake_tx() {
294    CURRENT_WORKER_WAKE_TX.with(|c| c.set(-1));
295}