maniac_runtime/runtime/
worker.rs

1use super::deque::{Stealer, Worker as YieldWorker};
2use super::summary::Summary;
3use super::task::{FutureAllocator, GeneratorRunMode, Task, TaskArena, TaskHandle, TaskSlot};
4use super::timer::{Timer, TimerHandle};
5use super::timer_wheel::TimerWheel;
6use super::waker::WorkerWaker;
7use crate::PopError;
8use crate::runtime::mpsc;
9use crate::runtime::ticker::{TickHandler, TickHandlerRegistration};
10use crate::{PushError, utils};
11use std::cell::{Cell, UnsafeCell};
12use std::ptr::{self, NonNull};
13use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering};
14use std::sync::{Arc, Mutex};
15use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
16use std::thread;
17use std::time::{Duration, Instant};
18
19use crate::utils::bits;
20use rand::RngCore;
21
22#[cfg(unix)]
23use libc;
24
25const DEFAULT_QUEUE_SEG_SHIFT: usize = 8;
26const DEFAULT_QUEUE_NUM_SEGS_SHIFT: usize = 8;
27const MAX_WORKER_LIMIT: usize = 512;
28
29const DEFAULT_WAKE_BURST: usize = 4;
30const FULL_SUMMARY_SCAN_CADENCE_MASK: u64 = 1024 * 8 - 1;
31const DEFAULT_TICK_DURATION_NS: u64 = 1 << 20; // ~1.05ms, power of two as required by TimerWheel
32const TIMER_TICKS_PER_WHEEL: usize = 1024 * 1;
33const TIMER_EXPIRE_BUDGET: usize = 4096;
34const MESSAGE_BATCH_SIZE: usize = 4096;
35
36// Worker status is now managed via WorkerWaker:
37// - WorkerWaker.summary: mpsc queue signals (bits 0-63)
38// - WorkerWaker.status bit 63: yield queue has items
39// - WorkerWaker.status bit 62: partition cache has work
40
41/// Trait for cross-worker operations that don't depend on const parameters
42trait CrossWorkerOps: Send + Sync {
43    fn post_cancel_message(&self, from_worker_id: u32, to_worker_id: u32, timer_id: u64) -> bool;
44}
45
46thread_local! {
47    static CURRENT_TIMER_WHEEL: Cell<*mut TimerWheel<TimerHandle>> = Cell::new(ptr::null_mut());
48    static CURRENT_TASK: Cell<*mut Task> = Cell::new(ptr::null_mut());
49    static CROSS_WORKER_OPS: Cell<Option<&'static dyn CrossWorkerOps>> = const { Cell::new(None) };
50    // Generator pinning: set to true when task wants to pin the current generator
51    static GENERATOR_PIN_REQUESTED: Cell<bool> = const { Cell::new(false) };
52    // When pinning is requested, store which task should receive the generator
53    static GENERATOR_PIN_TASK: Cell<*mut Task> = const { Cell::new(ptr::null_mut()) };
54}
55
56pub struct WorkerTLS {}
57
58// pub fn current_task<'a>() -> Option<&'a mut Task> {
59//     let worker = CURRENT_WORKER.with(|cell| cell.get());
60//     if worker.is_null() {
61//         return None;
62//     }
63//     let worker = unsafe { &*worker };
64//     let task = worker.current_task;
65//     if task.is_null() {
66//         return None;
67//     }
68//     unsafe { Some(&mut *task) }
69// }
70
71#[inline]
72pub fn is_worker_thread() -> bool {
73    current_task().is_some()
74}
75
76#[inline]
77pub fn current_task<'a>() -> Option<&'a mut Task> {
78    let task = CURRENT_TASK.with(|cell| cell.get());
79    if task.is_null() {
80        return None;
81    } else {
82        unsafe { Some(&mut *task) }
83    }
84}
85
86pub fn current_timer_wheel<'a>() -> Option<&'a mut TimerWheel<TimerHandle>> {
87    let timer_wheel = CURRENT_TIMER_WHEEL.with(|cell| cell.get());
88    if timer_wheel.is_null() {
89        None
90    } else {
91        unsafe { Some(&mut *timer_wheel) }
92    }
93}
94
95/// Request that the current generator be pinned to the current task.
96///
97/// This MUST be called from within a task that is executing inside a Worker's generator.
98/// When called, it signals the Worker to transfer ownership of the current generator
99/// to the task, allowing the task to have its own stackful coroutine execution context.
100///
101/// # Returns
102/// `true` if the pin request was successfully registered, `false` if not in a task context.
103///
104/// # Example
105/// ```ignore
106/// use maniac_runtime::runtime::worker::pin_current_generator;
107///
108/// // Inside an async task running on a worker:
109/// async {
110///     // Request to pin the current generator
111///     if pin_current_generator() {
112///         // Generator will be pinned after this returns
113///         // Task will continue execution with its own generator
114///     }
115/// }
116/// ```
117pub fn pin_current_generator() -> bool {
118    let task_ptr = CURRENT_TASK.with(|cell| cell.get());
119    if task_ptr.is_null() {
120        return false;
121    }
122
123    // Set the flag so the generator will yield with pin signal
124    GENERATOR_PIN_REQUESTED.with(|cell| cell.set(true));
125    true
126}
127
128/// Remote scheduling request for a timer.
129#[derive(Clone, Copy, Debug)]
130pub struct TimerSchedule {
131    handle: TimerHandle,
132    deadline_ns: u64,
133}
134
135impl TimerSchedule {
136    pub fn new(handle: TimerHandle, deadline_ns: u64) -> Self {
137        Self {
138            handle,
139            deadline_ns,
140        }
141    }
142
143    #[inline(always)]
144    pub fn handle(&self) -> TimerHandle {
145        self.handle
146    }
147
148    #[inline(always)]
149    pub fn deadline_ns(&self) -> u64 {
150        self.deadline_ns
151    }
152
153    #[inline(always)]
154    pub fn into_parts(self) -> (TimerHandle, u64) {
155        (self.handle, self.deadline_ns)
156    }
157}
158
159unsafe impl Send for TimerSchedule {}
160unsafe impl Sync for TimerSchedule {}
161
162/// Batch of scheduling requests.
163#[derive(Debug, Clone)]
164pub struct TimerBatch {
165    entries: Box<[TimerSchedule]>,
166}
167
168impl TimerBatch {
169    pub fn new(entries: Vec<TimerSchedule>) -> Self {
170        Self {
171            entries: entries.into_boxed_slice(),
172        }
173    }
174
175    pub fn empty() -> Self {
176        Self {
177            entries: Box::new([]),
178        }
179    }
180
181    pub fn len(&self) -> usize {
182        self.entries.len()
183    }
184
185    pub fn is_empty(&self) -> bool {
186        self.entries.is_empty()
187    }
188
189    pub fn iter(&self) -> impl Iterator<Item = &TimerSchedule> {
190        self.entries.iter()
191    }
192
193    pub fn into_vec(self) -> Vec<TimerSchedule> {
194        self.entries.into_vec()
195    }
196}
197
198/// Worker control-plane messages.
199#[derive(Clone, Debug)]
200pub enum WorkerMessage {
201    ScheduleTimer {
202        timer: TimerSchedule,
203    },
204    ScheduleBatch {
205        timers: TimerBatch,
206    },
207    CancelTimer {
208        worker_id: u32,
209        timer_id: u64,
210    },
211    WorkerCountChanged {
212        new_worker_count: u16,
213    },
214
215    /// Rebalance task partitions (reassign which leaves this worker processes)
216    RebalancePartitions {
217        partition_start: usize,
218        partition_end: usize,
219    },
220
221    /// Migrate specific tasks to another worker
222    MigrateTasks {
223        task_handles: Vec<TaskHandle>,
224    },
225
226    /// Request worker to report its current health metrics
227    ReportHealth,
228
229    /// Graceful shutdown: finish current task then exit
230    GracefulShutdown,
231
232    /// Immediate shutdown
233    Shutdown,
234
235    Noop,
236}
237
238unsafe impl Send for WorkerMessage {}
239unsafe impl Sync for WorkerMessage {}
240
241#[derive(Clone, Copy, Debug)]
242pub struct WorkerServiceConfig {
243    pub tick_duration: Duration,
244    pub min_workers: usize,
245    pub max_workers: usize,
246}
247
248impl Default for WorkerServiceConfig {
249    fn default() -> Self {
250        Self {
251            tick_duration: Duration::from_nanos(DEFAULT_TICK_DURATION_NS),
252            min_workers: utils::num_cpus(),
253            max_workers: utils::num_cpus(),
254        }
255    }
256}
257
258pub struct WorkerService<
259    const P: usize = DEFAULT_QUEUE_SEG_SHIFT,
260    const NUM_SEGS_P2: usize = DEFAULT_QUEUE_NUM_SEGS_SHIFT,
261> {
262    // Core ownership - WorkerService owns the arena and coordinates work via SummaryTree
263    arena: TaskArena,
264    summary_tree: Summary,
265
266    config: WorkerServiceConfig,
267    tick_duration: Duration,
268    tick_duration_ns: u64,
269    clock_ns: Arc<AtomicU64>,
270
271    // Per-worker WorkerWakers - each tracks all three work types:
272    // - summary: mpsc queue signals (bits 0-63 for different signal words)
273    // - status bit 63: yield queue has items
274    // - status bit 62: task partition has work
275    wakers: Box<[Arc<WorkerWaker>]>,
276
277    worker_actives: Box<[AtomicU64]>,
278    worker_now_ns: Box<[AtomicU64]>,
279    worker_shutdowns: Box<[AtomicBool]>,
280    worker_threads: Box<[Mutex<Option<thread::JoinHandle<()>>>]>,
281    worker_thread_handles: Box<[Mutex<Option<crate::runtime::preemption::WorkerThreadHandle>>]>,
282    worker_stats: Box<[WorkerStats]>,
283    worker_count: Arc<AtomicUsize>,
284    worker_max_id: AtomicUsize,
285    receivers: Box<[UnsafeCell<mpsc::Receiver<WorkerMessage, P, NUM_SEGS_P2>>]>,
286    senders: Box<[mpsc::Sender<WorkerMessage, P, NUM_SEGS_P2>]>,
287    tick_senders: Box<[mpsc::Sender<WorkerMessage, P, NUM_SEGS_P2>]>,
288    yield_queues: Box<[YieldWorker<TaskHandle>]>,
289    yield_stealers: Box<[Stealer<TaskHandle>]>,
290    timers: Box<[UnsafeCell<TimerWheel<TimerHandle>>]>,
291    shutdown: AtomicBool,
292    register_mutex: Mutex<()>,
293    // RAII guard for tick service registration - automatically unregisters on drop
294    tick_registration: Mutex<Option<TickHandlerRegistration>>,
295    // Flag to request immediate partition rebalancing on next tick (when workers spawn/exit)
296    rebalance_requested: AtomicBool,
297    // Tick-related counters for on_tick logic
298    tick_health_check_interval: u64,
299    tick_partition_rebalance_interval: u64,
300    tick_scaling_check_interval: u64,
301}
302
303unsafe impl<const P: usize, const NUM_SEGS_P2: usize> Send for WorkerService<P, NUM_SEGS_P2> {}
304
305// SAFETY: WorkerService contains UnsafeCells for receivers and timers, but each worker
306// has exclusive access to its own index. The Arc-wrapped WorkerService is shared between
307// threads, but the UnsafeCell data is partitioned by worker_id.
308unsafe impl<const P: usize, const NUM_SEGS_P2: usize> Sync for WorkerService<P, NUM_SEGS_P2> {}
309
310impl<const P: usize, const NUM_SEGS_P2: usize> WorkerService<P, NUM_SEGS_P2> {
311    pub fn start(
312        arena: TaskArena,
313        config: WorkerServiceConfig,
314        tick_service: &Arc<super::ticker::TickService>,
315    ) -> Arc<Self> {
316        let tick_duration = config.tick_duration;
317        let tick_duration_ns = normalize_tick_duration_ns(config.tick_duration);
318        let min_workers = config.min_workers.max(1);
319        let max_workers = config.max_workers.max(min_workers).min(MAX_WORKER_LIMIT);
320
321        // Create per-worker WorkerWakers
322        let mut wakers = Vec::with_capacity(max_workers);
323        for _ in 0..max_workers {
324            wakers.push(Arc::new(WorkerWaker::new()));
325        }
326        let wakers = wakers.into_boxed_slice();
327
328        // Create worker_count early so we can pass it to SummaryTree (single source of truth)
329        let worker_count = Arc::new(AtomicUsize::new(0));
330
331        // Create SummaryTree with reference to wakers and worker_count
332        let summary_tree = Summary::new(
333            arena.config().leaf_count,
334            arena.layout().signals_per_leaf,
335            &wakers,
336            &worker_count,
337        );
338
339        let mut worker_actives = Vec::with_capacity(max_workers);
340        for _ in 0..max_workers {
341            worker_actives.push(AtomicU64::new(0));
342        }
343        let mut worker_now_ns = Vec::with_capacity(max_workers);
344        for _ in 0..max_workers {
345            worker_now_ns.push(AtomicU64::new(0));
346        }
347        let mut worker_shutdowns = Vec::with_capacity(max_workers);
348        for _ in 0..max_workers {
349            worker_shutdowns.push(AtomicBool::new(false));
350        }
351        let mut worker_threads = Vec::with_capacity(max_workers);
352        for _ in 0..max_workers {
353            worker_threads.push(Mutex::new(None));
354        }
355        let mut worker_thread_handles = Vec::with_capacity(max_workers);
356        for _ in 0..max_workers {
357            worker_thread_handles.push(Mutex::new(None));
358        }
359        let mut worker_stats = Vec::with_capacity(max_workers);
360        for _ in 0..max_workers {
361            worker_stats.push(WorkerStats::default());
362        }
363
364        let mut receivers = Vec::with_capacity(max_workers);
365        let mut senders = Vec::<mpsc::Sender<WorkerMessage, P, NUM_SEGS_P2>>::with_capacity(
366            max_workers * max_workers,
367        );
368        for worker_id in 0..max_workers {
369            // Use the worker's WorkerWaker for mpsc queue
370            let rx = mpsc::new_with_waker(Arc::clone(&wakers[worker_id]));
371            receivers.push(UnsafeCell::new(rx));
372        }
373        for worker_id in 0..max_workers {
374            for other_worker_id in 0..max_workers {
375                senders.push(
376                    unsafe { &*receivers[worker_id].get() }
377                        .create_sender_with_config(0)
378                        .expect("ran out of producer slots"),
379                )
380            }
381        }
382
383        // Create tick_senders for on_tick communication
384        let mut tick_senders = Vec::with_capacity(max_workers);
385        for worker_id in 0..max_workers {
386            tick_senders.push(
387                unsafe { &*receivers[worker_id].get() }
388                    .create_sender_with_config(0)
389                    .expect("ran out of producer slots"),
390            );
391        }
392
393        let mut yield_queues = Vec::with_capacity(max_workers);
394        for _ in 0..max_workers {
395            yield_queues.push(YieldWorker::new_fifo());
396        }
397        let mut yield_stealers = Vec::with_capacity(max_workers);
398        for worker_id in 0..max_workers {
399            yield_stealers.push(yield_queues[worker_id].stealer());
400        }
401        let mut timers = Vec::with_capacity(max_workers);
402        for worker_id in 0..max_workers {
403            timers.push(UnsafeCell::new(TimerWheel::new(
404                tick_duration,
405                TIMER_TICKS_PER_WHEEL,
406                worker_id as u32,
407            )));
408        }
409
410        let service = Arc::new(Self {
411            arena,
412            summary_tree,
413            config,
414            tick_duration: tick_duration,
415            tick_duration_ns,
416            clock_ns: Arc::new(AtomicU64::new(0)),
417            wakers,
418            worker_actives: worker_actives.into_boxed_slice(),
419            worker_now_ns: worker_now_ns.into_boxed_slice(),
420            worker_shutdowns: worker_shutdowns.into_boxed_slice(),
421            worker_threads: worker_threads.into_boxed_slice(),
422            worker_thread_handles: worker_thread_handles.into_boxed_slice(),
423            worker_stats: worker_stats.into_boxed_slice(),
424            worker_count, // Use the worker_count we created earlier and passed to SummaryTree
425            worker_max_id: AtomicUsize::new(0),
426            receivers: receivers.into_boxed_slice(),
427            senders: senders.into_boxed_slice(),
428            tick_senders: tick_senders.into_boxed_slice(),
429            yield_queues: yield_queues.into_boxed_slice(),
430            yield_stealers: yield_stealers.into_boxed_slice(),
431            timers: timers.into_boxed_slice(),
432            shutdown: AtomicBool::new(false),
433            register_mutex: Mutex::new(()),
434            tick_registration: Mutex::new(None),
435            rebalance_requested: AtomicBool::new(false),
436            tick_health_check_interval: 100,
437            tick_partition_rebalance_interval: 1000,
438            tick_scaling_check_interval: 500,
439        });
440
441        // Spawn min_workers on startup with pre-set count to avoid rebalancing
442        // Each worker recalculates partitions when worker_count changes, so we
443        // pre-set it to the final value before spawning any workers
444        service.worker_count.store(min_workers, Ordering::SeqCst);
445        // SummaryTree now references worker_count directly - no need to set it separately
446
447        let mut spawned = 0;
448        for _ in 0..min_workers {
449            if service.spawn_worker_internal(&service, false).is_err() {
450                break;
451            }
452            spawned += 1;
453        }
454
455        // Adjust worker_count if we failed to spawn all min_workers
456        if spawned < min_workers {
457            service.worker_count.store(spawned, Ordering::SeqCst);
458            // SummaryTree now references worker_count directly - no need to set it separately
459        }
460
461        // Register this service with the TickService and get RAII guard
462        let registration = {
463            let handler: Arc<dyn TickHandler> = Arc::clone(&service) as Arc<dyn TickHandler>;
464            tick_service
465                .register(handler)
466                .expect("Failed to register with TickService")
467        };
468
469        // Store the registration in the service
470        // SAFETY: We use raw pointer manipulation because we can't get a mutable reference to an Arc.
471        // This is safe because:
472        // 1. We're still in the constructor before returning the Arc
473        // 2. No other thread has access to this service yet
474        // 3. We're only writing to _tick_registration once during construction
475        unsafe {
476            let service_ptr = Arc::as_ptr(&service) as *mut WorkerService<P, NUM_SEGS_P2>;
477            *(*service_ptr).tick_registration.lock().unwrap() = Some(registration);
478        }
479
480        service
481    }
482
483    /// Get a reference to the ExecutorArena owned by this service.
484    #[inline]
485    pub fn arena(&self) -> &TaskArena {
486        &self.arena
487    }
488
489    /// Get a reference to the SummaryTree owned by this service.
490    #[inline]
491    pub fn summary(&self) -> &Summary {
492        &self.summary_tree
493    }
494
495    /// Reserve a task slot using the SummaryTree.
496    #[inline]
497    pub fn reserve_task(&self) -> Option<TaskHandle> {
498        if self.arena.is_closed() {
499            return None;
500        }
501        let (leaf_idx, signal_idx, bit) = self.summary_tree.reserve_task()?;
502        let handle = match self.arena.handle_for_location(leaf_idx, signal_idx, bit) {
503            Some(handle) => handle,
504            None => {
505                self.summary_tree
506                    .release_task_in_leaf(leaf_idx, signal_idx, bit as usize);
507                return None;
508            }
509        };
510        self.arena.increment_total_tasks();
511        Some(handle)
512    }
513
514    /// Release a task slot back to the SummaryTree.
515    #[inline]
516    pub fn release_task(&self, handle: TaskHandle) {
517        let task = handle.task();
518        self.summary_tree.release_task_in_leaf(
519            task.leaf_idx() as usize,
520            task.signal_idx() as usize,
521            task.signal_bit() as usize,
522        );
523        self.arena.decrement_total_tasks();
524    }
525
526    pub fn spawn_worker(&self) -> Result<(), PushError<()>>
527    where
528        Self: Sized,
529    {
530        // This method requires an Arc<Self> context to spawn threads safely
531        // In the context of TickHandler, this will be called via Arc<WorkerService>
532        // We need to use unsafe here to get the Arc from &self
533        let service_arc = unsafe {
534            // SAFETY: WorkerService is always stored in an Arc when spawn_worker is called
535            // This is guaranteed by the API design where start() returns Arc<Self>
536            // and TickHandler is only implemented for Arc-wrapped services
537            Arc::from_raw(self as *const Self)
538        };
539        let result = self.spawn_worker_internal(&service_arc, true);
540        // Prevent Arc from being dropped (we only borrowed it)
541        std::mem::forget(service_arc);
542        result
543    }
544
545    fn spawn_worker_internal(
546        &self,
547        service: &Arc<Self>,
548        increment_count: bool,
549    ) -> Result<(), PushError<()>> {
550        let _lock = self.register_mutex.lock().expect("register_mutex poisoned");
551        let now_ns = Instant::now().elapsed().as_nanos() as u64;
552
553        // Find first empty slot.
554        let mut worker_id: Option<usize> = None;
555        for id in 0..self.worker_actives.len() {
556            if self.worker_actives[id]
557                .compare_exchange(0, now_ns, Ordering::SeqCst, Ordering::SeqCst)
558                .is_ok()
559            {
560                worker_id = Some(id);
561                break;
562            }
563        }
564        if worker_id.is_none() {
565            return Err(PushError::Full(()));
566        }
567        let worker_id = worker_id.unwrap();
568        // Update worker_max_id to be the highest worker ID + 1 (used as range upper bound)
569        if worker_id >= self.worker_max_id.load(Ordering::SeqCst) {
570            self.worker_max_id.store(worker_id + 1, Ordering::SeqCst);
571        }
572
573        if increment_count {
574            self.worker_count.fetch_add(1, Ordering::SeqCst);
575            // SummaryTree now references worker_count directly - no need to set it separately
576            // Request immediate partition rebalancing on next tick
577            self.rebalance_requested.store(true, Ordering::Release);
578        }
579
580        // Mark worker as active
581        self.worker_actives[worker_id].store(1, Ordering::SeqCst);
582
583        let service_clone = Arc::clone(service);
584        let shutdown = Arc::new(AtomicBool::new(false));
585        let timer_resolution_ns = self.tick_duration().as_nanos().max(1) as u64;
586        let join = std::thread::spawn(move || {
587            let task_slot = TaskSlot::new(std::ptr::null_mut());
588            let task_slot_ptr = &task_slot as *const _ as *mut TaskSlot;
589            let mut timer_output = Vec::with_capacity(MESSAGE_BATCH_SIZE);
590            for _ in 0..MESSAGE_BATCH_SIZE {
591                timer_output.push((
592                    0u64,
593                    0u64,
594                    TimerHandle::new(unsafe { NonNull::new_unchecked(task_slot_ptr) }, 0, 0, 0),
595                ));
596            }
597            let mut message_batch = Vec::with_capacity(MESSAGE_BATCH_SIZE);
598            for _ in 0..MESSAGE_BATCH_SIZE {
599                message_batch.push(WorkerMessage::Noop);
600            }
601
602            let mut w = Worker {
603                service: Arc::clone(&service_clone),
604                wait_strategy: WaitStrategy::default(),
605                shutdown: &service_clone.worker_shutdowns[worker_id],
606                receiver: unsafe {
607                    // SAFETY: Worker thread has exclusive access to its own receiver.
608                    // The service Arc is kept alive for the lifetime of the worker.
609                    &mut *service_clone.receivers[worker_id].get()
610                },
611                yield_queue: &service_clone.yield_queues[worker_id],
612                timer_wheel: unsafe {
613                    // SAFETY: Worker thread has exclusive access to its own timer wheel.
614                    // The service Arc is kept alive for the lifetime of the worker.
615                    &mut *service_clone.timers[worker_id].get()
616                },
617                wake_stats: WakeStats::default(),
618                stats: WorkerStats::default(),
619                partition_start: 0,
620                partition_end: 0,
621                partition_len: 0,
622                cached_worker_count: 0,
623                wake_burst_limit: DEFAULT_WAKE_BURST,
624                worker_id: worker_id as u32,
625                timer_resolution_ns,
626                timer_output,
627                message_batch: message_batch.into_boxed_slice(),
628                rng: crate::utils::Random::new(),
629                current_task: std::ptr::null_mut(),
630                preemption_requested: AtomicBool::new(false),
631                #[cfg(windows)]
632                generator_scope_atomic: std::sync::atomic::AtomicPtr::new(std::ptr::null_mut()),
633            };
634            CURRENT_TIMER_WHEEL.set(w.timer_wheel as *mut TimerWheel<TimerHandle>);
635            // SAFETY: The service Arc is kept alive for the lifetime of the worker thread
636            let ops_ref: &'static dyn CrossWorkerOps =
637                unsafe { &*(&*service_clone as *const dyn CrossWorkerOps) };
638            CROSS_WORKER_OPS.set(Some(ops_ref));
639
640            // Initialize preemption support for this worker thread
641            let _preemption_handle = crate::runtime::preemption::init_worker_preemption().ok(); // Ignore errors - preemption is optional
642
643            // Create a handle to this worker thread so it can be interrupted
644            // On Windows, we pass the preemption flag so APC can access it
645            #[cfg(unix)]
646            let thread_handle_result = crate::runtime::preemption::WorkerThreadHandle::current();
647
648            #[cfg(windows)]
649            let thread_handle_result =
650                crate::runtime::preemption::WorkerThreadHandle::current(&w.preemption_requested);
651
652            #[cfg(not(any(unix, windows)))]
653            let thread_handle_result =
654                crate::runtime::preemption::WorkerThreadHandle::current(&w.preemption_requested);
655
656            if let Ok(thread_handle) = thread_handle_result {
657                *service_clone.worker_thread_handles[worker_id]
658                    .lock()
659                    .expect("worker_thread_handles lock poisoned") = Some(thread_handle);
660            }
661
662            // Catch panics to prevent thread termination from propagating
663            // SAFETY: Worker cleanup code (service counter updates) will still execute after a panic
664            // We use a raw pointer wrapped in AssertUnwindSafe because Worker contains mutable
665            // references that don't implement UnwindSafe, but the cleanup code will still execute.
666            let w_ptr = &mut w as *mut Worker<'_, P, NUM_SEGS_P2>;
667            let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
668                unsafe { (*w_ptr).run() };
669            }));
670            if let Err(e) = result {
671                eprintln!("[Worker {}] Panicked: {:?}", worker_id, e);
672            }
673            // w.run();
674
675            // Worker exited - clean up counters
676            service_clone.worker_count.fetch_sub(1, Ordering::SeqCst);
677            // SummaryTree now references worker_count directly - no need to set it separately
678            service_clone.worker_actives[worker_id].store(0, Ordering::SeqCst);
679            // Request immediate partition rebalancing on next tick
680            service_clone
681                .rebalance_requested
682                .store(true, Ordering::Release);
683        });
684
685        // Store the join handle
686        *self.worker_threads[worker_id]
687            .lock()
688            .expect("worker_threads lock poisoned") = Some(join);
689
690        Ok(())
691    }
692
693    pub(crate) fn post_message(
694        &self,
695        from_worker_id: u32,
696        to_worker_id: u32,
697        message: WorkerMessage,
698    ) -> Result<(), PushError<WorkerMessage>> {
699        // mpsc will automatically set bits in the target worker's WorkerWaker.summary
700        unsafe {
701            self.senders
702                [((from_worker_id as usize) * self.config.max_workers) + (to_worker_id as usize)]
703                .unsafe_try_push(message)
704        }
705    }
706
707    pub(crate) fn try_yield_steal(
708        &self,
709        from_worker_id: usize,
710        limit: usize,
711        next_rand: usize,
712    ) -> (u64, bool) {
713        let queue = &self.yield_queues[from_worker_id];
714        let stealer = &self.yield_stealers[from_worker_id];
715        let max_workers = self.worker_max_id.load(Ordering::Relaxed);
716        let mut index = next_rand;
717        let mut attempts = 0;
718        for _ in 0..max_workers {
719            let worker_id = index % max_workers;
720            if worker_id == from_worker_id {
721                continue;
722            }
723            let steal = stealer.steal_batch_with_limit(queue, limit);
724            if steal.is_success() {
725                return (attempts, true);
726            }
727            index += 1;
728            attempts += 1;
729        }
730        (attempts, false)
731    }
732
733    pub fn tick_duration(&self) -> Duration {
734        self.tick_duration
735    }
736
737    pub fn clock_ns(&self) -> u64 {
738        self.clock_ns.load(Ordering::Acquire)
739    }
740
741    pub fn shutdown(&self) {
742        *self.tick_registration.lock().expect("lock poisoned") = None;
743        if self.shutdown.swap(true, Ordering::Release) {
744            return;
745        }
746
747        #[cfg(debug_assertions)]
748        {
749            eprintln!("[WorkerService] Shutdown initiated, sending shutdown messages...");
750        }
751
752        // Send shutdown messages to all active workers
753        let max_workers = self.worker_max_id.load(Ordering::Relaxed);
754        for worker_id in 0..=max_workers {
755            if worker_id < self.worker_actives.len()
756                && self.worker_actives[worker_id].load(Ordering::Relaxed) != 0
757            {
758                // SAFETY: unsafe_try_push only requires a shared reference
759                let _ = unsafe {
760                    self.tick_senders[worker_id].unsafe_try_push(WorkerMessage::Shutdown)
761                };
762                // Mark work available to wake up the worker
763                self.wakers[worker_id].mark_tasks();
764            }
765        }
766
767        #[cfg(debug_assertions)]
768        {
769            eprintln!("[WorkerService] Joining worker threads...");
770        }
771
772        // Join all worker threads
773        for (idx, worker_thread) in self.worker_threads.iter().enumerate() {
774            if let Some(handle) = worker_thread.lock().unwrap().take() {
775                #[cfg(debug_assertions)]
776                {
777                    eprintln!("[WorkerService] Joining worker thread {}...", idx);
778                }
779                let _ = handle.join();
780                #[cfg(debug_assertions)]
781                {
782                    eprintln!("[WorkerService] Worker thread {} joined", idx);
783                }
784            }
785        }
786
787        #[cfg(debug_assertions)]
788        {
789            eprintln!("[WorkerService] Shutdown complete");
790        }
791    }
792
793    /// Returns the current worker count
794    #[inline]
795    pub fn worker_count(&self) -> usize {
796        self.worker_count.load(Ordering::Relaxed)
797    }
798
799    /// Checks if a specific worker has any work to do
800    #[inline]
801    pub fn worker_has_work(&self, worker_id: usize) -> bool {
802        if worker_id >= self.wakers.len() {
803            return false;
804        }
805        let waker = &self.wakers[worker_id];
806        let summary = waker.snapshot_summary();
807        let status = waker.status();
808        summary != 0 || status != 0
809    }
810
811    /// Supervisor: Health monitoring - check for stuck workers and collect metrics
812    fn supervisor_health_check(&self, now_ns: u64) {
813        let max_workers = self.worker_max_id.load(Ordering::Relaxed);
814        let tick_duration_ns = self.tick_duration_ns;
815        let stuck_threshold_ns = tick_duration_ns * 10; // Worker is stuck if no progress for 10 ticks
816
817        for worker_id in 0..max_workers {
818            let is_active = self.worker_actives[worker_id].load(Ordering::Relaxed);
819            if is_active == 0 {
820                continue; // Worker slot not active
821            }
822
823            let last_update = self.worker_now_ns[worker_id].load(Ordering::Relaxed);
824            let time_since_update = now_ns.saturating_sub(last_update);
825
826            // Detect stuck workers
827            if time_since_update > stuck_threshold_ns {
828                #[cfg(debug_assertions)]
829                {
830                    eprintln!(
831                        "[Supervisor] WARNING: Worker {} appears stuck (no update for {}ns)",
832                        worker_id, time_since_update
833                    );
834                }
835            }
836
837            // Request health report from active workers
838            if worker_id < self.tick_senders.len() {
839                // SAFETY: try_push only requires a shared reference, not a mutable one
840                let _ = unsafe {
841                    self.tick_senders[worker_id].unsafe_try_push(WorkerMessage::ReportHealth)
842                };
843            }
844        }
845    }
846
847    /// Supervisor: Rebalance task partitions across workers
848    fn supervisor_rebalance_partitions(&self) {
849        let active_workers = self.worker_count.load(Ordering::Relaxed);
850        if active_workers == 0 {
851            return;
852        }
853
854        let total_leaves = self.arena.leaf_count();
855        let leaves_per_worker = (total_leaves + active_workers - 1) / active_workers;
856
857        let max_workers = self.worker_max_id.load(Ordering::Relaxed);
858        let mut active_worker_ids = Vec::with_capacity(max_workers);
859
860        // Collect active worker IDs
861        for worker_id in 0..max_workers {
862            if self.worker_actives[worker_id].load(Ordering::Relaxed) != 0 {
863                active_worker_ids.push(worker_id);
864            }
865        }
866
867        // Assign partitions to active workers
868        for (idx, &worker_id) in active_worker_ids.iter().enumerate() {
869            let partition_start = idx * leaves_per_worker;
870            let partition_end = ((idx + 1) * leaves_per_worker).min(total_leaves);
871
872            // Assign contiguous range of leaves [partition_start, partition_end)
873            // SAFETY: unsafe_try_push only requires a shared reference
874            let _ = unsafe {
875                self.tick_senders[worker_id].unsafe_try_push(WorkerMessage::RebalancePartitions {
876                    partition_start,
877                    partition_end,
878                })
879            };
880        }
881
882        #[cfg(debug_assertions)]
883        {
884            eprintln!(
885                "[Supervisor] Rebalanced {} leaves across {} active workers",
886                total_leaves, active_workers
887            );
888        }
889    }
890
891    /// Supervisor: Graceful shutdown coordination
892    fn supervisor_graceful_shutdown(&self) {
893        let _ = self.tick_registration.lock().unwrap().take();
894        let max_workers = self.worker_max_id.load(Ordering::Relaxed);
895
896        #[cfg(debug_assertions)]
897        {
898            eprintln!("[Supervisor] Shutting down, max_workers={}", max_workers);
899        }
900
901        // Send graceful shutdown to all active workers
902        for worker_id in 0..=max_workers {
903            #[cfg(debug_assertions)]
904            {
905                let is_active = worker_id < self.worker_actives.len()
906                    && self.worker_actives[worker_id].load(Ordering::Relaxed) != 0;
907                eprintln!("[Supervisor] Worker {} active={}", worker_id, is_active);
908            }
909            if worker_id < self.worker_actives.len()
910                && self.worker_actives[worker_id].load(Ordering::Relaxed) != 0
911            {
912                #[cfg(debug_assertions)]
913                {
914                    eprintln!("[Supervisor] Sending shutdown to worker {}", worker_id);
915                }
916                // SAFETY: unsafe_try_push only requires a shared reference
917                let _ = unsafe {
918                    self.tick_senders[worker_id].unsafe_try_push(WorkerMessage::GracefulShutdown)
919                };
920                // Mark work available to wake up the worker
921                self.wakers[worker_id].mark_tasks();
922            }
923        }
924
925        #[cfg(debug_assertions)]
926        {
927            eprintln!("[Supervisor] Sent graceful shutdown to all workers");
928        }
929
930        // Wait for workers to finish (with timeout)
931        let shutdown_timeout = std::time::Duration::from_secs(5);
932        let shutdown_start = std::time::Instant::now();
933
934        loop {
935            let active_count = self.worker_count.load(Ordering::Relaxed);
936            if active_count == 0 {
937                break;
938            }
939
940            if shutdown_start.elapsed() > shutdown_timeout {
941                #[cfg(debug_assertions)]
942                {
943                    eprintln!(
944                        "[Supervisor] Graceful shutdown timeout, {} workers still active",
945                        active_count
946                    );
947                }
948                // Force shutdown remaining workers
949                for worker_id in 0..max_workers {
950                    if self.worker_actives[worker_id].load(Ordering::Relaxed) != 0 {
951                        // SAFETY: unsafe_try_push only requires a shared reference
952                        let _ = unsafe {
953                            self.tick_senders[worker_id].unsafe_try_push(WorkerMessage::Shutdown)
954                        };
955                    }
956                }
957                break;
958            }
959
960            std::thread::sleep(std::time::Duration::from_millis(10));
961        }
962    }
963
964    /// Supervisor: Dynamic worker scaling based on load
965    fn supervisor_dynamic_scaling(&self) {
966        let active_workers = self.worker_count.load(Ordering::Relaxed);
967        let min_workers = self.config.min_workers;
968        let max_workers_config = self.config.max_workers;
969
970        // Calculate total work pressure by checking WorkerWakers
971        let mut total_work_signals = 0u64;
972        let max_worker_id = self.worker_max_id.load(Ordering::Relaxed);
973
974        for worker_id in 0..max_worker_id {
975            if self.worker_actives[worker_id].load(Ordering::Relaxed) == 0 {
976                continue;
977            }
978
979            let waker = &self.wakers[worker_id];
980            let summary = waker.snapshot_summary();
981            let status = waker.status();
982
983            // Count work signals: summary bits + status bits
984            total_work_signals += summary.count_ones() as u64 + status.count_ones() as u64;
985        }
986
987        // Scale up: if average work per worker > threshold, add workers
988        if active_workers > 0 {
989            let avg_work_per_worker = total_work_signals / active_workers as u64;
990            let scale_up_threshold = 4; // If avg work signals > 4 per worker, scale up
991
992            if avg_work_per_worker > scale_up_threshold && active_workers < max_workers_config {
993                // Try to spawn a new worker
994                match self.spawn_worker() {
995                    Ok(()) => {
996                        let new_count = self.worker_count.load(Ordering::Relaxed);
997
998                        #[cfg(debug_assertions)]
999                        {
1000                            eprintln!(
1001                                "[Supervisor] Scaled UP: {} -> {} workers (avg_work={})",
1002                                active_workers, new_count, avg_work_per_worker
1003                            );
1004                        }
1005
1006                        // Notify all workers of count change
1007                        for sender in self.tick_senders.iter() {
1008                            // SAFETY: unsafe_try_push only requires a shared reference
1009                            let _ = unsafe {
1010                                sender.unsafe_try_push(WorkerMessage::WorkerCountChanged {
1011                                    new_worker_count: new_count as u16,
1012                                })
1013                            };
1014                        }
1015                    }
1016                    Err(_) => {
1017                        #[cfg(debug_assertions)]
1018                        {
1019                            eprintln!("[Supervisor] Failed to scale up: no available worker slots");
1020                        }
1021                    }
1022                }
1023            }
1024        }
1025
1026        // Scale down: if work pressure is very low and we have more than min_workers
1027        if active_workers > min_workers && total_work_signals == 0 {
1028            // Find a worker to shut down (prefer higher worker IDs)
1029            for worker_id in (0..max_worker_id).rev() {
1030                if self.worker_actives[worker_id].load(Ordering::Relaxed) != 0 {
1031                    // Send graceful shutdown to this worker
1032                    // SAFETY: unsafe_try_push only requires a shared reference
1033                    let _ = unsafe {
1034                        self.tick_senders[worker_id]
1035                            .unsafe_try_push(WorkerMessage::GracefulShutdown)
1036                    };
1037
1038                    #[cfg(debug_assertions)]
1039                    {
1040                        eprintln!(
1041                            "[Supervisor] Scaled DOWN: removed worker {} (no work detected)",
1042                            worker_id
1043                        );
1044                    }
1045                    break; // Only remove one worker at a time
1046                }
1047            }
1048        }
1049    }
1050
1051    /// Supervisor: Task migration for load balancing
1052    /// Migrates tasks from overloaded workers to underloaded workers
1053    #[allow(dead_code)]
1054    fn supervisor_task_migration(&self) {
1055        let max_worker_id = self.worker_max_id.load(Ordering::Relaxed);
1056
1057        // Collect load information for each worker
1058        let mut worker_loads: Vec<(usize, u64)> = Vec::new();
1059
1060        for worker_id in 0..max_worker_id {
1061            if self.worker_actives[worker_id].load(Ordering::Relaxed) == 0 {
1062                continue;
1063            }
1064
1065            let waker = &self.wakers[worker_id];
1066            let summary = waker.snapshot_summary();
1067            let status = waker.status();
1068            let load = summary.count_ones() as u64 + status.count_ones() as u64;
1069
1070            worker_loads.push((worker_id, load));
1071        }
1072
1073        if worker_loads.len() < 2 {
1074            return; // Need at least 2 workers for migration
1075        }
1076
1077        // Sort by load
1078        worker_loads.sort_by_key(|&(_, load)| load);
1079
1080        // Find most loaded and least loaded workers
1081        let (most_loaded_id, most_loaded_load) = worker_loads[worker_loads.len() - 1];
1082        let (least_loaded_id, least_loaded_load) = worker_loads[0];
1083
1084        // Migration threshold: only migrate if imbalance is significant
1085        let imbalance = most_loaded_load.saturating_sub(least_loaded_load);
1086        let migration_threshold = 3;
1087
1088        if imbalance >= migration_threshold {
1089            // In a real implementation, we would:
1090            // 1. Steal tasks from the overloaded worker's yield queue
1091            // 2. Send MigrateTasks message to the underloaded worker
1092            //
1093            // For now, we just log the decision
1094            // TODO: Implement actual task stealing from yield queues
1095
1096            #[cfg(debug_assertions)]
1097            {
1098                eprintln!(
1099                    "[Supervisor] Task migration opportunity: worker {} (load={}) -> worker {} (load={})",
1100                    most_loaded_id, most_loaded_load, least_loaded_id, least_loaded_load
1101                );
1102            }
1103
1104            // We can't easily steal from another worker's yield queue from the supervisor
1105            // This would require exposing the yield_stealers or implementing a different mechanism
1106            // For now, leave this as a placeholder for future implementation
1107            let _ = (most_loaded_id, least_loaded_id); // Suppress unused warnings
1108        }
1109    }
1110
1111    /// Interrupt a specific worker thread to trigger preemptive generator switching.
1112    ///
1113    /// **The worker thread is NOT terminated!** It is only briefly interrupted to:
1114    /// 1. Pin its current generator to the running task (save stack state)
1115    /// 2. Create a new generator for itself
1116    /// 3. Continue executing with the new generator
1117    ///
1118    /// The interrupted task can later be resumed with its pinned generator.
1119    /// This allows true preemptive multitasking at the task level.
1120    ///
1121    /// Can be called from any thread (e.g., a timer thread).
1122    ///
1123    /// # Arguments
1124    /// * `worker_id` - The ID of the worker to interrupt (0-based index)
1125    ///
1126    /// # Returns
1127    /// * `Ok(true)` - Worker was successfully interrupted
1128    /// * `Ok(false)` - Worker doesn't exist (invalid ID)
1129    /// * `Err` - Interrupt operation failed (platform error)
1130    pub fn interrupt_worker(
1131        &self,
1132        worker_id: usize,
1133    ) -> Result<bool, crate::runtime::preemption::PreemptionError> {
1134        if worker_id >= self.worker_thread_handles.len() {
1135            return Ok(false);
1136        }
1137
1138        let handle_guard = self.worker_thread_handles[worker_id]
1139            .lock()
1140            .expect("worker_thread_handles lock poisoned");
1141
1142        if let Some(ref handle) = *handle_guard {
1143            handle.interrupt()?;
1144            Ok(true)
1145        } else {
1146            Ok(false)
1147        }
1148    }
1149}
1150
1151impl<const P: usize, const NUM_SEGS_P2: usize> CrossWorkerOps for WorkerService<P, NUM_SEGS_P2> {
1152    fn post_cancel_message(&self, from_worker_id: u32, to_worker_id: u32, timer_id: u64) -> bool {
1153        self.post_message(
1154            from_worker_id,
1155            to_worker_id,
1156            WorkerMessage::CancelTimer {
1157                worker_id: to_worker_id,
1158                timer_id,
1159            },
1160        )
1161        .is_ok()
1162    }
1163}
1164
1165impl<const P: usize, const NUM_SEGS_P2: usize> TickHandler for WorkerService<P, NUM_SEGS_P2> {
1166    fn tick_duration(&self) -> Duration {
1167        self.tick_duration
1168    }
1169
1170    fn on_tick(&self, tick_count: u64, now_ns: u64) {
1171        // Update clock for all workers
1172        self.clock_ns.store(now_ns, Ordering::Release);
1173        let max_workers = self.worker_max_id.load(Ordering::Relaxed);
1174        for i in 0..=max_workers {
1175            if i >= self.worker_now_ns.len() {
1176                break;
1177            }
1178            self.worker_now_ns[i].store(now_ns, Ordering::Release);
1179            // Update TimerWheel's now_ns
1180            unsafe {
1181                // SAFETY: Each worker has exclusive access to its own TimerWheel.
1182                // The tick thread is the only one updating now_ns on TimerWheels.
1183                let timer_wheel = &mut *self.timers[i].get();
1184                timer_wheel.set_now_ns(now_ns);
1185            }
1186        }
1187
1188        // Perform initial partition rebalance on first tick
1189        if tick_count == 0 {
1190            self.supervisor_rebalance_partitions();
1191        }
1192
1193        // Immediate rebalancing if requested (worker spawned/exited)
1194        if self.rebalance_requested.swap(false, Ordering::AcqRel) {
1195            self.supervisor_rebalance_partitions();
1196        }
1197
1198        // Periodic health monitoring
1199        if tick_count % self.tick_health_check_interval == 0 {
1200            self.supervisor_health_check(now_ns);
1201        }
1202
1203        // Periodic dynamic worker scaling
1204        if tick_count % self.tick_scaling_check_interval == 0 {
1205            self.supervisor_dynamic_scaling();
1206        }
1207
1208        // Periodic partition rebalancing (backup in case flag mechanism fails)
1209        if tick_count % self.tick_partition_rebalance_interval == 0 {
1210            self.supervisor_rebalance_partitions();
1211        }
1212    }
1213
1214    fn on_shutdown(&self) {
1215        // Graceful shutdown: notify all workers
1216        self.supervisor_graceful_shutdown();
1217    }
1218}
1219
1220impl<const P: usize, const NUM_SEGS_P2: usize> Drop for WorkerService<P, NUM_SEGS_P2> {
1221    fn drop(&mut self) {
1222        *self.tick_registration.lock().expect("lock poisoned") = None;
1223        if !self.shutdown.load(Ordering::Acquire) {
1224            self.shutdown();
1225        }
1226    }
1227}
1228
1229fn normalize_tick_duration_ns(duration: Duration) -> u64 {
1230    let nanos = duration.as_nanos().max(1).min(u128::from(u64::MAX)) as u64;
1231    nanos.next_power_of_two()
1232}
1233
1234/// Comprehensive statistics for fast worker.
1235#[derive(Debug, Default, Clone)]
1236pub struct WorkerStats {
1237    /// Number of tasks polled.
1238    pub tasks_polled: u64,
1239
1240    /// Number of tasks that completed.
1241    pub completed_count: u64,
1242
1243    /// Number of tasks that yielded cooperatively.
1244    pub yielded_count: u64,
1245
1246    /// Number of tasks that are waiting (Poll::Pending, not yielded).
1247    pub waiting_count: u64,
1248
1249    /// Number of CAS failures (contention or already executing).
1250    pub cas_failures: u64,
1251
1252    /// Number of empty scans (no tasks available).
1253    pub empty_scans: u64,
1254
1255    /// Number of polls from yield queue (hot path).
1256    pub yield_queue_polls: u64,
1257
1258    /// Number of polls from signals (cold path).
1259    pub signal_polls: u64,
1260
1261    /// Number of work stealing attempts from other workers.
1262    pub steal_attempts: u64,
1263
1264    /// Number of successful work steals.
1265    pub steal_successes: u64,
1266
1267    /// Number of leaf summary checks.
1268    pub leaf_summary_checks: u64,
1269
1270    /// Number of leaf summary hits (summary != 0).
1271    pub leaf_summary_hits: u64,
1272
1273    /// Number of attempts to steal from other workers' leaf partitions.
1274    pub leaf_steal_attempts: u64,
1275
1276    /// Number of successful steals from other workers' leaf partitions.
1277    pub leaf_steal_successes: u64,
1278
1279    pub timer_fires: u64,
1280}
1281
1282/// Health snapshot of a worker at a point in time.
1283/// Used by the supervisor for health monitoring and load balancing decisions.
1284#[derive(Debug, Clone)]
1285pub struct WorkerHealthSnapshot {
1286    pub worker_id: u32,
1287    pub timestamp_ns: u64,
1288    pub stats: WorkerStats,
1289    pub yield_queue_len: usize,
1290    pub mpsc_queue_len: usize,
1291    pub active_leaf_partitions: Vec<usize>,
1292    pub has_work: bool,
1293}
1294
1295#[derive(Clone, Copy, Debug, Default)]
1296pub struct WakeStats {
1297    pub release_calls: u64,
1298    pub released_permits: u64,
1299    pub last_backlog: usize,
1300    pub max_backlog: usize,
1301    pub queue_release_calls: u64,
1302}
1303
1304#[derive(Clone, Copy, Debug)]
1305pub struct WaitStrategy {
1306    pub spin_before_sleep: usize,
1307    pub park_timeout: Option<Duration>,
1308}
1309
1310impl WaitStrategy {
1311    pub fn new(spin_before_sleep: usize, park_timeout: Option<Duration>) -> Self {
1312        Self {
1313            spin_before_sleep,
1314            park_timeout,
1315        }
1316    }
1317
1318    pub fn non_blocking() -> Self {
1319        Self::new(0, Some(Duration::from_secs(0)))
1320    }
1321
1322    pub fn park_immediately() -> Self {
1323        Self::new(0, None)
1324    }
1325}
1326
1327impl Default for WaitStrategy {
1328    fn default() -> Self {
1329        Self::new(0, Some(Duration::from_millis(2000)))
1330    }
1331}
1332
1333pub struct Worker<
1334    'a,
1335    const P: usize = DEFAULT_QUEUE_SEG_SHIFT,
1336    const NUM_SEGS_P2: usize = DEFAULT_QUEUE_NUM_SEGS_SHIFT,
1337> {
1338    service: Arc<WorkerService<P, NUM_SEGS_P2>>,
1339    wait_strategy: WaitStrategy,
1340    shutdown: &'a AtomicBool,
1341    receiver: &'a mut mpsc::Receiver<WorkerMessage, P, NUM_SEGS_P2>,
1342    yield_queue: &'a YieldWorker<TaskHandle>,
1343    timer_wheel: &'a mut TimerWheel<TimerHandle>,
1344    wake_stats: WakeStats,
1345    stats: WorkerStats,
1346    partition_start: usize,
1347    partition_end: usize,
1348    partition_len: usize,
1349    cached_worker_count: usize,
1350    wake_burst_limit: usize,
1351    worker_id: u32,
1352    timer_resolution_ns: u64,
1353    timer_output: Vec<(u64, u64, TimerHandle)>,
1354    message_batch: Box<[WorkerMessage]>,
1355    rng: crate::utils::Random,
1356    pub(crate) current_task: *mut Task,
1357    /// Preemption flag for this worker - set by signal handler (Unix) or timer thread (Windows)
1358    preemption_requested: AtomicBool,
1359    /// Generator scope pointer (Windows only) - for APC callback to force yield
1360    #[cfg(windows)]
1361    generator_scope_atomic: std::sync::atomic::AtomicPtr<()>,
1362}
1363
1364impl<'a, const P: usize, const NUM_SEGS_P2: usize> Worker<'a, P, NUM_SEGS_P2> {
1365    #[inline]
1366    pub fn stats(&self) -> &WorkerStats {
1367        &self.stats
1368    }
1369
1370    /// Checks if this worker has any work to do (tasks, yields, or messages).
1371    /// Returns true if the worker should continue running, false if it can park.
1372    #[inline]
1373    fn has_work(&self) -> bool {
1374        let waker = &self.service.wakers[self.worker_id as usize];
1375
1376        // Check status bits (fast path):
1377        // - bit 63: yield queue has items
1378        // - bit 62: partition cache reports tasks (synced from SummaryTree)
1379        let status = waker.status();
1380        if status != 0 {
1381            return true;
1382        }
1383
1384        // Check summary (cross-worker signals like messages, timers)
1385        let summary = waker.snapshot_summary();
1386        if summary != 0 {
1387            return true;
1388        }
1389
1390        // Check if permits are available (missed wake scenario)
1391        if waker.permits() > 0 {
1392            return true;
1393        }
1394
1395        false
1396    }
1397
1398    pub fn set_wait_strategy(&mut self, strategy: WaitStrategy) {
1399        self.wait_strategy = strategy;
1400    }
1401
1402    #[inline(always)]
1403    pub fn run_once(&mut self) -> bool {
1404        let mut did_work = false;
1405
1406        if self.poll_timers() {
1407            did_work = true;
1408        }
1409
1410        // Process messages first (including shutdown signals)
1411        if self.process_messages() {
1412            did_work = true;
1413        }
1414
1415        // Poll all yielded tasks first - they're ready to run
1416        let yielded = self.poll_yield(self.yield_queue.len());
1417        if yielded > 0 {
1418            did_work = true;
1419        }
1420
1421        // Partition assignment is handled by WorkerService via RebalancePartitions message
1422        if self.try_partition_random() {
1423            did_work = true;
1424        }
1425
1426        // if self.try_partition_random(leaf_count) {
1427        //     did_work = true;
1428        // } else if self.try_partition_linear(leaf_count) {
1429        //     did_work = true;
1430        // }
1431
1432        let rand = self.next_u64();
1433
1434        if !did_work && rand & FULL_SUMMARY_SCAN_CADENCE_MASK == 0 {
1435            let leaf_count = self.service.arena().leaf_count();
1436            if self.try_any_partition_random(leaf_count) {
1437                did_work = true;
1438            }
1439            // else if self.try_any_partition_linear(leaf_count) {
1440            // did_work = true;
1441            // }
1442        }
1443
1444        if !did_work {
1445            if self.poll_yield(self.yield_queue.len() as usize) > 0 {
1446                did_work = true;
1447            }
1448
1449            // if self.poll_yield_steal(1) > 0 {
1450            //     did_work = true;
1451            // }
1452        }
1453
1454        did_work
1455    }
1456
1457    #[inline(always)]
1458    pub fn run_once_exhaustive(&mut self) -> bool {
1459        let mut did_work = false;
1460
1461        if self.poll_timers() {
1462            did_work = true;
1463        }
1464
1465        // Process messages first (including shutdown signals)
1466        if self.process_messages() {
1467            did_work = true;
1468        }
1469
1470        // Poll all yielded tasks first - they're ready to run
1471        let yielded = self.poll_yield(self.yield_queue.len());
1472        if yielded > 0 {
1473            did_work = true;
1474        }
1475
1476        // Partition assignment is handled by WorkerService via RebalancePartitions message
1477        if self.try_partition_random() {
1478            did_work = true;
1479        }
1480
1481        if self.try_partition_random() {
1482            did_work = true;
1483        } else if self.try_partition_linear() {
1484            did_work = true;
1485        }
1486
1487        let rand = self.next_u64();
1488
1489        if !did_work {
1490            let leaf_count = self.service.arena().leaf_count();
1491            if self.try_any_partition_random(leaf_count) {
1492                did_work = true;
1493            } else if self.try_any_partition_linear(leaf_count) {
1494                did_work = true;
1495            }
1496        }
1497
1498        if !did_work {
1499            if self.poll_yield(self.yield_queue.len() as usize) > 0 {
1500                did_work = true;
1501            }
1502            if self.poll_yield_steal(1) > 0 {
1503                did_work = true;
1504            }
1505        }
1506
1507        did_work
1508    }
1509
1510    #[inline]
1511    fn run_last_before_park(&mut self) -> bool {
1512        false
1513    }
1514
1515    pub(crate) fn run(&mut self) {
1516        const STACK_SIZE: usize = 2 * 1024 * 1024; // 2MB stack per generator
1517
1518        // Initialize thread-local preemption for this worker
1519        // On Unix: Signal handler will access this via thread-local
1520        // On Windows: Timer thread will access via WorkerThreadHandle
1521        crate::runtime::preemption::init_worker_thread_preemption(&self.preemption_requested);
1522
1523        // Outer loop: manages generator lifecycle
1524        // Creates new generators when current one gets pinned
1525        while !self.shutdown.load(Ordering::Relaxed) {
1526            // Use raw pointer for generator closure
1527            let worker_ptr = self as *mut Self;
1528
1529            // EXTREMELY UNSAFE: Convert pointer to usize to completely erase type and make it Send
1530            // This is safe because Worker is single-threaded and we never actually send across threads
1531            let worker_addr = worker_ptr as usize;
1532
1533            let mut generator =
1534                crate::generator::Gn::<()>::new_scoped_opt(STACK_SIZE, move |mut scope| -> usize {
1535                    // SAFETY: worker_addr is the address of a valid Worker that lives for the generator's lifetime
1536                    let worker = unsafe { &mut *(worker_addr as *mut Worker<P, NUM_SEGS_P2>) };
1537                    let mut spin_count = 0;
1538                    let waker_id = worker.worker_id as usize;
1539
1540                    // Register this generator's scope for non-cooperative preemption
1541                    let scope_ptr = &mut scope as *mut _ as *mut ();
1542
1543                    // Unix: Store in thread-local for signal handler
1544                    #[cfg(unix)]
1545                    crate::runtime::preemption::set_generator_scope(scope_ptr);
1546
1547                    // Windows: Store in atomic for APC callback
1548                    #[cfg(windows)]
1549                    worker
1550                        .generator_scope_atomic
1551                        .store(scope_ptr, std::sync::atomic::Ordering::Release);
1552
1553                    // Inner loop: runs inside generator context
1554                    loop {
1555                        // Check for shutdown
1556                        if worker.shutdown.load(Ordering::Relaxed) {
1557                            #[cfg(unix)]
1558                            crate::runtime::preemption::clear_generator_scope();
1559                            #[cfg(windows)]
1560                            worker
1561                                .generator_scope_atomic
1562                                .store(std::ptr::null_mut(), std::sync::atomic::Ordering::Release);
1563                            return crate::generator::done();
1564                        }
1565
1566                        // Process one iteration of work
1567                        let mut progress = worker.run_once();
1568
1569                        // Check if a task requested to pin this generator (manual pinning)
1570                        let manual_pin_requested = GENERATOR_PIN_REQUESTED.with(|cell| {
1571                            let requested = cell.get();
1572                            if requested {
1573                                cell.set(false); // Clear flag
1574                                true
1575                            } else {
1576                                false
1577                            }
1578                        });
1579
1580                        // Check if preemption was requested (timer-based / signal-based)
1581                        let preemption_requested =
1582                            crate::runtime::preemption::check_and_clear_preemption(
1583                                &worker.preemption_requested,
1584                            );
1585
1586                        if manual_pin_requested || preemption_requested {
1587                            // Capture the current task
1588                            let task_to_pin = CURRENT_TASK.with(|cell| cell.get());
1589
1590                            if !task_to_pin.is_null() {
1591                                let task = unsafe { &*task_to_pin };
1592
1593                                // Check if task already has a pinned generator
1594                                if task.has_pinned_generator() {
1595                                    // Scenario 2: Task has pinned generator (likely Poll mode)
1596                                    // Change its mode to Switch (interrupted poll on stack)
1597                                    // Don't pin worker generator - just mark task as needing mode change
1598                                    unsafe {
1599                                        task.set_generator_run_mode(GeneratorRunMode::Switch);
1600                                    }
1601                                    // Continue worker loop normally with worker generator
1602                                    scope.yield_(crate::runtime::preemption::GeneratorYieldReason::Cooperative.as_usize());
1603                                    continue;
1604                                } else {
1605                                    // Scenario 1: Task has no pinned generator
1606                                    // Promote worker generator to task with Switch mode
1607                                    GENERATOR_PIN_TASK.with(|cell| cell.set(task_to_pin));
1608
1609                                    // Generator needs to be pinned - yield and we'll break out after
1610                                    scope.yield_(crate::runtime::preemption::GeneratorYieldReason::Cooperative.as_usize());
1611
1612                                    // The signal mask is automatically restored by the kernel when
1613                                    // the signal handler returns, and since we are not in the signal
1614                                    // handler context here (we are in the generator context),
1615                                    // we don't need to manually unblock SIGVTALRM.
1616
1617                                    #[cfg(unix)]
1618                                    crate::runtime::preemption::clear_generator_scope();
1619                                    #[cfg(windows)]
1620                                    worker.generator_scope_atomic.store(
1621                                        std::ptr::null_mut(),
1622                                        std::sync::atomic::Ordering::Release,
1623                                    );
1624                                    return crate::generator::done();
1625                                }
1626                            }
1627                            // If no current task, just continue (shouldn't happen during preemption)
1628                        }
1629
1630                        if !progress {
1631                            spin_count += 1;
1632
1633                            if spin_count >= worker.wait_strategy.spin_before_sleep {
1634                                core::hint::spin_loop();
1635                                progress = worker.run_once_exhaustive();
1636
1637                                if progress {
1638                                    spin_count = 0;
1639                                    scope.yield_(crate::runtime::preemption::GeneratorYieldReason::Cooperative.as_usize());
1640                                    continue;
1641                                }
1642                            } else if spin_count < worker.wait_strategy.spin_before_sleep {
1643                                core::hint::spin_loop();
1644                                if spin_count % 100 == 0 {
1645                                    scope.yield_(crate::runtime::preemption::GeneratorYieldReason::Cooperative.as_usize());
1646                                }
1647                                continue;
1648                            }
1649
1650                            // Calculate park duration considering timer deadlines
1651                            let park_duration = worker.calculate_park_duration();
1652
1653                            // Sync partition summary from SummaryTree before parking
1654                            worker.service.wakers[waker_id].sync_partition_summary(
1655                                worker.partition_start,
1656                                worker.partition_end,
1657                                &worker.service.summary().leaf_words,
1658                            );
1659
1660                            // Park on WorkerWaker with timer-aware timeout
1661                            // On Windows, also use alertable wait for APC-based preemption
1662                            match park_duration {
1663                                Some(duration) if duration.is_zero() => {}
1664                                Some(duration) => {
1665                                    // On Windows, WorkerWaker uses alertable waits to allow APC execution
1666                                    worker.service.wakers[waker_id].acquire_timeout(duration);
1667                                }
1668                                None => {
1669                                    // On Windows, WorkerWaker uses alertable waits to allow APC execution
1670                                    worker.service.wakers[waker_id]
1671                                        .acquire_timeout(Duration::from_millis(250));
1672                                }
1673                            }
1674                            spin_count = 0;
1675                        } else {
1676                            spin_count = 0;
1677                            scope.yield_(crate::runtime::preemption::GeneratorYieldReason::Cooperative.as_usize());
1678                        }
1679                    }
1680                });
1681
1682            // Drive the generator until completion or until a task needs to be pinned
1683            // The generator will yield and set GENERATOR_PIN_TASK if pinning is needed
1684            for _ in &mut generator {
1685                // Just consume yields - the generator handles pin detection internally
1686            }
1687
1688            // Check if a task captured the generator for pinning
1689            // This is set inside the generator loop when preemption/manual pin is detected
1690            let task_ptr = GENERATOR_PIN_TASK.with(|cell| {
1691                let ptr = cell.get();
1692                if !ptr.is_null() {
1693                    cell.set(ptr::null_mut()); // Clear for next time
1694                }
1695                ptr
1696            });
1697
1698            if !task_ptr.is_null() {
1699                // A task requested to pin this generator
1700                // Transfer generator ownership to the task
1701                // SAFETY: task_ptr is valid - it was captured during task execution
1702                let task = unsafe { &*task_ptr };
1703
1704                // Convert generator to a boxed iterator
1705                // SAFETY: The generator's lifetime is tied to the Worker which outlives the task
1706                let boxed_iter: Box<dyn Iterator<Item = usize> + 'static> = unsafe {
1707                    std::mem::transmute(Box::new(generator) as Box<dyn Iterator<Item = usize>>)
1708                };
1709
1710                // Pin with Switch mode - this generator has interrupted poll on stack
1711                unsafe {
1712                    task.pin_generator(boxed_iter, GeneratorRunMode::Switch);
1713                }
1714
1715                // Re-schedule the preempted task in the yield queue for work-stealing
1716                // Preempted tasks are "hot" (CPU-bound) and should be treated like yields
1717                if let Some(slot) = task.slot() {
1718                    let handle = TaskHandle::from_task(task);
1719
1720                    // Mark as yielded so it stays in EXECUTING state
1721                    task.mark_yielded();
1722                    task.record_yield();
1723
1724                    // Enqueue to yield queue - enables work stealing!
1725                    self.enqueue_yield(handle);
1726
1727                    // Update stats
1728                    self.stats.yielded_count += 1;
1729                }
1730
1731                // Continue outer loop to create a new generator
1732            } else {
1733                // Generator completed normally (shutdown)
1734                break;
1735            }
1736        }
1737    }
1738
1739    /// Calculate how long the worker can park, considering both the wait strategy
1740    /// and the next timer deadline.
1741    #[inline]
1742    fn calculate_park_duration(&mut self) -> Option<Duration> {
1743        // Check if we have pending timers
1744        if let Some(next_deadline_ns) = self.timer_wheel.next_deadline() {
1745            let now_ns = self.timer_wheel.now_ns();
1746
1747            if next_deadline_ns > now_ns {
1748                let timer_duration_ns = next_deadline_ns - now_ns;
1749                let timer_duration = Duration::from_nanos(timer_duration_ns);
1750
1751                // Use the minimum of wait_strategy timeout and timer deadline
1752                let duration = match self.wait_strategy.park_timeout {
1753                    Some(strategy_timeout) => Some(strategy_timeout.min(timer_duration)),
1754                    None => Some(timer_duration),
1755                };
1756                return duration;
1757            } else {
1758                // Timer already expired, don't sleep at all
1759                return Some(Duration::ZERO);
1760            }
1761        }
1762
1763        // No timers scheduled - cap at 250ms to check for new timers periodically
1764        const MAX_PARK_DURATION: Duration = Duration::from_millis(250);
1765        match self.wait_strategy.park_timeout {
1766            Some(timeout) => Some(timeout.min(MAX_PARK_DURATION)),
1767            None => Some(MAX_PARK_DURATION),
1768        }
1769    }
1770
1771    fn process_messages(&mut self) -> bool {
1772        let mut progress = false;
1773        loop {
1774            match self.receiver.try_pop() {
1775                Ok(message) => {
1776                    if self.handle_message(message) {
1777                        progress = true;
1778                    }
1779                }
1780                Err(PopError::Empty) | Err(PopError::Timeout) => {
1781                    // Queue is empty - mpsc signal bits will be cleared automatically
1782                    break;
1783                }
1784                Err(PopError::Closed) => break,
1785            }
1786        }
1787        progress
1788    }
1789
1790    #[inline(always)]
1791    fn handle_message(&mut self, message: WorkerMessage) -> bool {
1792        match message {
1793            WorkerMessage::ScheduleTimer { timer } => self.handle_timer_schedule(timer),
1794            WorkerMessage::ScheduleBatch { timers } => {
1795                let mut scheduled = false;
1796                for timer in timers.into_vec() {
1797                    scheduled |= self.handle_timer_schedule(timer);
1798                }
1799                scheduled
1800            }
1801            WorkerMessage::CancelTimer {
1802                worker_id,
1803                timer_id,
1804            } => self.cancel_remote_timer(worker_id, timer_id),
1805            WorkerMessage::WorkerCountChanged { new_worker_count } => {
1806                self.cached_worker_count = new_worker_count as usize;
1807                true
1808            }
1809            WorkerMessage::RebalancePartitions {
1810                partition_start,
1811                partition_end,
1812            } => self.handle_rebalance_partitions(partition_start, partition_end),
1813            WorkerMessage::MigrateTasks { task_handles } => self.handle_migrate_tasks(task_handles),
1814            WorkerMessage::ReportHealth => {
1815                self.handle_report_health();
1816                true
1817            }
1818            WorkerMessage::GracefulShutdown => {
1819                self.handle_graceful_shutdown();
1820                true
1821            }
1822            WorkerMessage::Shutdown => {
1823                self.shutdown.store(true, Ordering::Release);
1824                true
1825            }
1826            WorkerMessage::Noop => true,
1827        }
1828    }
1829
1830    fn handle_timer_schedule(&mut self, timer: TimerSchedule) -> bool {
1831        let (handle, deadline_ns) = timer.into_parts();
1832        if handle.worker_id() != self.worker_id {
1833            return false;
1834        }
1835        self.enqueue_timer_entry(deadline_ns, handle).is_some()
1836    }
1837
1838    fn handle_rebalance_partitions(
1839        &mut self,
1840        partition_start: usize,
1841        partition_end: usize,
1842    ) -> bool {
1843        self.partition_start = partition_start;
1844        self.partition_end = partition_end;
1845        self.partition_len = partition_end.saturating_sub(partition_start);
1846
1847        // Sync partition summary to reflect new partition boundaries.
1848        // This will also update the tasks-available bit based on actual partition state.
1849        let waker_id = self.worker_id as usize;
1850        self.service.wakers[waker_id].sync_partition_summary(
1851            partition_start,
1852            partition_end,
1853            &self.service.summary().leaf_words,
1854        );
1855
1856        true
1857    }
1858
1859    fn handle_migrate_tasks(&mut self, task_handles: Vec<TaskHandle>) -> bool {
1860        // Receive migrated tasks from another worker
1861        // Add them to our yield queue for processing
1862        if task_handles.is_empty() {
1863            return false;
1864        }
1865
1866        let count = task_handles.len();
1867        for handle in task_handles {
1868            self.enqueue_yield(handle);
1869        }
1870
1871        #[cfg(debug_assertions)]
1872        {
1873            eprintln!(
1874                "[Worker {}] Received {} migrated tasks",
1875                self.worker_id, count
1876            );
1877        }
1878
1879        true
1880    }
1881
1882    fn handle_report_health(&mut self) {
1883        // Capture current state snapshot
1884        let snapshot = WorkerHealthSnapshot {
1885            worker_id: self.worker_id,
1886            timestamp_ns: self.timer_wheel.now_ns(),
1887            stats: self.stats.clone(),
1888            yield_queue_len: self.yield_queue.len(),
1889            mpsc_queue_len: 0, // TODO: Add len() method to mpsc::Receiver
1890            active_leaf_partitions: (self.partition_start..self.partition_end).collect(),
1891            has_work: self.has_work(),
1892        };
1893
1894        // For now, just log the health snapshot
1895        // TODO: Send this back to supervisor via a response channel
1896        #[cfg(debug_assertions)]
1897        {
1898            eprintln!(
1899                "[Worker {}] Health: tasks_polled={}, yield_queue={}, mpsc_queue={}, partitions={:?}, has_work={}",
1900                snapshot.worker_id,
1901                snapshot.stats.tasks_polled,
1902                snapshot.yield_queue_len,
1903                snapshot.mpsc_queue_len,
1904                snapshot.active_leaf_partitions,
1905                snapshot.has_work
1906            );
1907        }
1908        let _ = snapshot; // Suppress unused warning in release builds
1909    }
1910
1911    fn handle_graceful_shutdown(&mut self) {
1912        // Graceful shutdown: just process a few more iterations then shutdown
1913        // The worker loop will naturally drain queues during normal operation
1914
1915        #[cfg(debug_assertions)]
1916        {
1917            eprintln!(
1918                "[Worker {}] Received graceful shutdown signal",
1919                self.worker_id
1920            );
1921        }
1922
1923        // Process any remaining messages
1924        loop {
1925            match self.receiver.try_pop() {
1926                Ok(message) => match message {
1927                    WorkerMessage::Shutdown | WorkerMessage::GracefulShutdown => break,
1928                    _ => {
1929                        self.handle_message(message);
1930                    }
1931                },
1932                Err(_) => break,
1933            }
1934        }
1935
1936        // Set shutdown flag - worker loop will finish current work before exiting
1937        self.shutdown.store(true, Ordering::Release);
1938
1939        #[cfg(debug_assertions)]
1940        {
1941            eprintln!("[Worker {}] Set shutdown flag", self.worker_id);
1942        }
1943    }
1944
1945    fn enqueue_timer_entry(&mut self, deadline_ns: u64, handle: TimerHandle) -> Option<u64> {
1946        match self.timer_wheel.schedule_timer(deadline_ns, handle) {
1947            Ok(timer_id) => Some(timer_id),
1948            Err(_) => None, // Log error in debug mode
1949        }
1950    }
1951
1952    fn poll_timers(&mut self) -> bool {
1953        let now_ns = self.timer_wheel.now_ns();
1954        let expired = self
1955            .timer_wheel
1956            .poll(now_ns, TIMER_EXPIRE_BUDGET, &mut self.timer_output);
1957        if expired == 0 {
1958            return false;
1959        }
1960
1961        let mut progress = false;
1962        for idx in 0..expired {
1963            let (timer_id, deadline_ns, handle) = self.timer_output[idx];
1964            if self.process_timer_entry(timer_id, deadline_ns, handle) {
1965                progress = true;
1966            }
1967        }
1968        progress
1969    }
1970
1971    fn process_timer_entry(
1972        &mut self,
1973        timer_id: u64,
1974        _deadline_ns: u64,
1975        handle: TimerHandle,
1976    ) -> bool {
1977        if handle.worker_id() != self.worker_id {
1978            return false;
1979        }
1980
1981        let slot = unsafe { handle.task_slot().as_ref() };
1982        let task_ptr = slot.task_ptr();
1983        if task_ptr.is_null() {
1984            return false;
1985        }
1986
1987        let task = unsafe { &*task_ptr };
1988        if task.global_id() != handle.task_id() {
1989            return false;
1990        }
1991
1992        task.schedule();
1993        // let state = task.state().load(Ordering::Acquire);
1994        //
1995        // if state == TASK_IDLE {
1996        //     if task
1997        //         .state()
1998        //         .compare_exchange(
1999        //             TASK_IDLE,
2000        //             TASK_EXECUTING,
2001        //             Ordering::AcqRel,
2002        //             Ordering::Acquire,
2003        //         )
2004        //         .is_ok()
2005        //     {
2006        //         task.schedule();
2007        //         // self.poll_task(TaskHandle::from_task(task), task);
2008        //     } else {
2009        //         task.schedule();
2010        //     }
2011        // } else if state == TASK_EXECUTING && !task.is_yielded() {
2012        //     task.schedule();
2013        // } else {
2014        //     task.schedule();
2015        // }
2016        self.stats.timer_fires = self.stats.timer_fires.saturating_add(1);
2017        true
2018    }
2019
2020    fn cancel_timer(&mut self, timer: &Timer) -> bool {
2021        if !timer.is_scheduled() {
2022            return false;
2023        }
2024
2025        let Some(worker_id) = timer.worker_id() else {
2026            return false;
2027        };
2028
2029        let timer_id = timer.timer_id();
2030        if timer_id == 0 {
2031            return false;
2032        }
2033
2034        if worker_id == self.worker_id {
2035            if self
2036                .timer_wheel
2037                .cancel_timer(timer_id)
2038                .unwrap_or(None)
2039                .is_some()
2040            {
2041                timer.mark_cancelled(timer_id);
2042                return true;
2043            }
2044            return false;
2045        }
2046
2047        self.service
2048            .post_cancel_message(self.worker_id, worker_id, timer_id)
2049    }
2050
2051    #[inline]
2052    fn cancel_remote_timer(&mut self, worker_id: u32, timer_id: u64) -> bool {
2053        if worker_id != self.worker_id {
2054            return false;
2055        }
2056
2057        self.service
2058            .post_cancel_message(self.worker_id, worker_id, timer_id)
2059    }
2060
2061    #[inline(always)]
2062    pub fn next_u64(&mut self) -> u64 {
2063        self.rng.next()
2064    }
2065
2066    #[inline(always)]
2067    pub fn poll_yield(&mut self, max: usize) -> usize {
2068        let mut count = 0;
2069        while let Some(handle) = self.try_acquire_local_yield() {
2070            self.stats.yield_queue_polls += 1;
2071            self.poll_handle(handle);
2072            count += 1;
2073            if count >= max {
2074                break;
2075            }
2076        }
2077        count
2078    }
2079
2080    #[inline(always)]
2081    pub fn poll_yield_steal(&mut self, max: usize) -> usize {
2082        let mut count = 0;
2083        while let Some(handle) = self.try_steal_yielded() {
2084            self.stats.yield_queue_polls += 1;
2085            self.poll_handle(handle);
2086            count += 1;
2087            if count >= max {
2088                break;
2089            }
2090        }
2091        count
2092    }
2093
2094    #[inline(always)]
2095    pub fn run_until_idle(&mut self) -> usize {
2096        let mut processed = 0;
2097        while self.run_once() {
2098            processed += 1;
2099        }
2100        processed += self.poll_yield_steal(32);
2101        while self.run_once() {
2102            processed += 1;
2103        }
2104        processed
2105    }
2106
2107    #[inline(always)]
2108    pub fn poll_blocking(&mut self, strategy: &WaitStrategy) -> bool {
2109        let mut spins = 0usize;
2110        loop {
2111            if self.run_once() {
2112                return true;
2113            }
2114
2115            // Fast path: check if we have any work before parking
2116            if !self.has_work() {
2117                if strategy.spin_before_sleep > 0 && spins < strategy.spin_before_sleep {
2118                    spins += 1;
2119                    core::hint::spin_loop();
2120                    continue;
2121                }
2122
2123                spins = 0;
2124
2125                // Calculate park duration considering both strategy and timer deadlines
2126                let park_duration = self.calculate_park_duration();
2127
2128                // Sync partition summary from SummaryTree before parking
2129                let waker_id = self.worker_id as usize;
2130                self.service.wakers[waker_id].sync_partition_summary(
2131                    self.partition_start,
2132                    self.partition_end,
2133                    &self.service.summary().leaf_words,
2134                );
2135
2136                match park_duration {
2137                    Some(timeout) if timeout.is_zero() => {
2138                        // Timer ready or zero timeout - don't park
2139                        return false;
2140                    }
2141                    Some(timeout) => {
2142                        // Park with timeout on WorkerWaker
2143                        // On Windows, WorkerWaker uses alertable waits to allow APC execution
2144                        let timed_out = !self.service.wakers[waker_id].acquire_timeout(timeout);
2145
2146                        if timed_out {
2147                            // Timed out - possibly for timer deadline
2148                            return false;
2149                        }
2150                    }
2151                    None => {
2152                        // No timeout - park indefinitely on WorkerWaker
2153                        // On Windows, WorkerWaker uses alertable waits to allow APC execution
2154                        self.service.wakers[waker_id].acquire();
2155                    }
2156                }
2157            }
2158        }
2159    }
2160
2161    #[inline(always)]
2162    fn try_acquire_task(&mut self, leaf_idx: usize) -> Option<TaskHandle> {
2163        let signals_per_leaf = self.service.arena().signals_per_leaf();
2164        if signals_per_leaf == 0 {
2165            return None;
2166        }
2167
2168        let mask = if signals_per_leaf >= 64 {
2169            u64::MAX
2170        } else {
2171            (1u64 << signals_per_leaf) - 1
2172        };
2173
2174        self.stats.leaf_summary_checks = self.stats.leaf_summary_checks.saturating_add(1);
2175        let mut available =
2176            self.service.summary().leaf_words[leaf_idx].load(Ordering::Acquire) & mask;
2177        if available == 0 {
2178            self.stats.empty_scans = self.stats.empty_scans.saturating_add(1);
2179            return None;
2180        }
2181        self.stats.leaf_summary_hits = self.stats.leaf_summary_hits.saturating_add(1);
2182
2183        let signals = self.service.arena().active_signals(leaf_idx);
2184        let mut attempts = signals_per_leaf;
2185
2186        while available != 0 && attempts > 0 {
2187            let start = (self.next_u64() as usize) % signals_per_leaf;
2188
2189            let (signal_idx, signal) = loop {
2190                let candidate = bits::find_nearest(available, start as u64);
2191                if candidate >= 64 {
2192                    self.stats.leaf_summary_checks =
2193                        self.stats.leaf_summary_checks.saturating_add(1);
2194                    available =
2195                        self.service.summary().leaf_words[leaf_idx].load(Ordering::Acquire) & mask;
2196                    if available == 0 {
2197                        self.stats.empty_scans = self.stats.empty_scans.saturating_add(1);
2198                        return None;
2199                    }
2200                    self.stats.leaf_summary_hits = self.stats.leaf_summary_hits.saturating_add(1);
2201                    continue;
2202                }
2203                let bit_index = candidate as usize;
2204                let sig = unsafe { &*self.service.arena().task_signal_ptr(leaf_idx, bit_index) };
2205                let bits = sig.load(Ordering::Acquire);
2206                if bits == 0 {
2207                    available &= !(1u64 << bit_index);
2208                    self.service
2209                        .summary()
2210                        .mark_signal_inactive(leaf_idx, bit_index);
2211                    attempts -= 1;
2212                    continue;
2213                }
2214                break (bit_index, sig);
2215            };
2216
2217            let bits = signal.load(Ordering::Acquire);
2218            let bit_seed = (self.next_u64() & 63) as u64;
2219            let bit_candidate = bits::find_nearest(bits, bit_seed);
2220            let bit_idx = if bit_candidate < 64 {
2221                bit_candidate as u64
2222            } else {
2223                available &= !(1u64 << signal_idx);
2224                attempts -= 1;
2225                continue;
2226            };
2227
2228            let (remaining, acquired) = signal.try_acquire(bit_idx);
2229            if !acquired {
2230                self.stats.cas_failures = self.stats.cas_failures.saturating_add(1);
2231                available &= !(1u64 << signal_idx);
2232                attempts -= 1;
2233                continue;
2234            }
2235
2236            let remaining_mask = remaining;
2237            if remaining_mask == 0 {
2238                self.service
2239                    .summary()
2240                    .mark_signal_inactive(leaf_idx, signal_idx);
2241            }
2242
2243            self.stats.signal_polls = self.stats.signal_polls.saturating_add(1);
2244            let slot_idx = signal_idx * 64 + bit_idx as usize;
2245            let task = unsafe { self.service.arena().task(leaf_idx, slot_idx) };
2246            return Some(TaskHandle::from_task(task));
2247        }
2248
2249        self.stats.empty_scans = self.stats.empty_scans.saturating_add(1);
2250        None
2251    }
2252
2253    #[inline(always)]
2254    fn enqueue_yield(&mut self, handle: TaskHandle) {
2255        let was_empty = self.yield_queue.push_with_status(handle);
2256        if was_empty {
2257            // Set yield_bit in WorkerWaker status
2258            self.service.wakers[self.worker_id as usize].mark_yield();
2259        }
2260    }
2261
2262    #[inline(always)]
2263    fn try_acquire_local_yield(&mut self) -> Option<TaskHandle> {
2264        let (item, was_last) = self.yield_queue.pop_with_status();
2265        if let Some(handle) = item {
2266            if was_last {
2267                // Clear yield_bit in WorkerWaker status
2268                self.service.wakers[self.worker_id as usize].try_unmark_yield();
2269            }
2270            return Some(handle);
2271        }
2272        None
2273    }
2274
2275    #[inline(always)]
2276    fn try_steal_yielded(&mut self) -> Option<TaskHandle> {
2277        let next_rand = self.next_u64();
2278        let (attempts, success) =
2279            self.service
2280                .try_yield_steal(self.worker_id as usize, 1, next_rand as usize);
2281        if success {
2282            self.stats.steal_attempts += attempts;
2283            self.stats.steal_successes += 1;
2284            self.try_acquire_local_yield()
2285        } else {
2286            None
2287        }
2288    }
2289
2290    #[inline(always)]
2291    fn process_leaf(&mut self, leaf_idx: usize) -> bool {
2292        if let Some(handle) = self.try_acquire_task(leaf_idx) {
2293            self.poll_handle(handle);
2294            return true;
2295        }
2296        false
2297    }
2298
2299    #[inline(always)]
2300    fn try_partition_random(&mut self) -> bool {
2301        let partition_len = self.partition_len;
2302
2303        if partition_len.is_power_of_two() {
2304            let mask = partition_len - 1;
2305            for _ in 0..partition_len {
2306                let start_offset = self.next_u64() as usize & mask;
2307                let leaf_idx = self.partition_start + start_offset;
2308                if self.process_leaf(leaf_idx) {
2309                    return true;
2310                }
2311            }
2312        } else {
2313            for _ in 0..partition_len {
2314                let start_offset = self.next_u64() as usize % partition_len;
2315                let leaf_idx = self.partition_start + start_offset;
2316                if self.process_leaf(leaf_idx) {
2317                    return true;
2318                }
2319            }
2320        }
2321
2322        false
2323    }
2324
2325    #[inline(always)]
2326    fn try_partition_linear(&mut self) -> bool {
2327        let partition_start = self.partition_start;
2328        let partition_len = self.partition_len;
2329
2330        for offset in 0..partition_len {
2331            let leaf_idx = partition_start + offset;
2332            if self.process_leaf(leaf_idx) {
2333                return true;
2334            }
2335        }
2336
2337        false
2338    }
2339
2340    #[inline(always)]
2341    fn try_any_partition_random(&mut self, leaf_count: usize) -> bool {
2342        if leaf_count.is_power_of_two() {
2343            let leaf_mask = leaf_count - 1;
2344            for _ in 0..leaf_count {
2345                let leaf_idx = self.next_u64() as usize & leaf_mask;
2346                self.stats.leaf_steal_attempts += 1;
2347                if self.process_leaf(leaf_idx) {
2348                    self.stats.leaf_steal_successes += 1;
2349                    return true;
2350                }
2351            }
2352        } else {
2353            for _ in 0..leaf_count {
2354                let leaf_idx = self.next_u64() as usize % leaf_count;
2355                self.stats.leaf_steal_attempts += 1;
2356                if self.process_leaf(leaf_idx) {
2357                    self.stats.leaf_steal_successes += 1;
2358                    return true;
2359                }
2360            }
2361        }
2362        false
2363    }
2364
2365    #[inline(always)]
2366    fn try_any_partition_linear(&mut self, leaf_count: usize) -> bool {
2367        for leaf_idx in 0..leaf_count {
2368            self.stats.leaf_steal_attempts += 1;
2369            if self.process_leaf(leaf_idx) {
2370                self.stats.leaf_steal_successes += 1;
2371                return true;
2372            }
2373        }
2374        false
2375    }
2376
2377    #[inline(always)]
2378    fn poll_handle(&mut self, handle: TaskHandle) {
2379        let task = handle.task();
2380
2381        // Check if this task has a pinned generator
2382        if task.has_pinned_generator() {
2383            // Poll the task by resuming its pinned generator
2384            self.poll_task_with_pinned_generator(handle, task);
2385            return;
2386        }
2387
2388        struct ActiveTaskGuard;
2389        impl Drop for ActiveTaskGuard {
2390            fn drop(&mut self) {
2391                CURRENT_TASK.set(std::ptr::null_mut());
2392            }
2393        }
2394        CURRENT_TASK.set(task as *const Task as *mut Task);
2395        let _active_guard = ActiveTaskGuard;
2396
2397        self.stats.tasks_polled += 1;
2398
2399        if !task.is_yielded() {
2400            task.begin();
2401        } else {
2402            task.record_yield();
2403            task.clear_yielded();
2404        }
2405
2406        self.poll_task(handle, task);
2407    }
2408
2409    /// Poll a task that has a pinned generator
2410    fn poll_task_with_pinned_generator(&mut self, handle: TaskHandle, task: &Task) {
2411        struct ActiveTaskGuard;
2412        impl Drop for ActiveTaskGuard {
2413            fn drop(&mut self) {
2414                CURRENT_TASK.set(std::ptr::null_mut());
2415            }
2416        }
2417        CURRENT_TASK.set(task as *const Task as *mut Task);
2418        let _active_guard = ActiveTaskGuard;
2419
2420        self.stats.tasks_polled += 1;
2421
2422        let mode = task.generator_run_mode();
2423
2424        match mode {
2425            GeneratorRunMode::Switch => {
2426                // Switch mode: Generator has interrupted poll on stack
2427                // Resume once to complete that poll, then transition to Poll mode or finish
2428                self.poll_task_switch_mode(handle, task);
2429            }
2430            GeneratorRunMode::Poll => {
2431                // Poll mode: Generator contains poll loop
2432                // Resume generator, it will call poll once and yield
2433                self.poll_task_poll_mode(handle, task);
2434            }
2435            GeneratorRunMode::None => {
2436                // No generator, shouldn't happen but treat as error
2437                task.finish();
2438                self.stats.completed_count = self.stats.completed_count.saturating_add(1);
2439            }
2440        }
2441    }
2442
2443    /// Handle Switch mode: resume generator once to complete interrupted poll,
2444    /// then transition to Poll mode or finish task
2445    fn poll_task_switch_mode(&mut self, handle: TaskHandle, task: &Task) {
2446        let generator = unsafe { task.get_pinned_generator() };
2447        if let Some(task_gen_iter) = generator {
2448            // Resume generator once - this completes the interrupted poll
2449            match task_gen_iter.next() {
2450                Some(_) => {
2451                    // Generator yielded after completing poll
2452                    // Check if task still has a future (if not, task completed during poll)
2453                    let future_ptr = task.take_future();
2454
2455                    if future_ptr.is_none() {
2456                        // Task completed during the interrupted poll, clean up generator
2457                        unsafe {
2458                            task.take_pinned_generator();
2459                        }
2460                        task.finish();
2461                        self.stats.completed_count = self.stats.completed_count.saturating_add(1);
2462                    } else {
2463                        // Task still pending - transition to Poll mode
2464                        // Discard the worker generator and create a task-specific Poll mode generator
2465                        unsafe {
2466                            task.take_pinned_generator();
2467                        }
2468                        self.create_poll_mode_generator(handle, task);
2469                    }
2470                }
2471                None => {
2472                    // Generator exhausted - task must be complete
2473                    unsafe {
2474                        task.take_pinned_generator();
2475                    }
2476                    task.finish();
2477                    self.stats.completed_count = self.stats.completed_count.saturating_add(1);
2478                }
2479            }
2480        } else {
2481            // Generator lost, treat as error
2482            task.finish();
2483            self.stats.completed_count = self.stats.completed_count.saturating_add(1);
2484        }
2485    }
2486
2487    /// Handle Poll mode: generator contains poll loop, resume to continue task
2488    fn poll_task_poll_mode(&mut self, handle: TaskHandle, task: &Task) {
2489        let generator = unsafe { task.get_pinned_generator() };
2490        if let Some(task_gen_iter) = generator {
2491            // Resume generator - it will poll once and yield the result
2492            match task_gen_iter.next() {
2493                Some(status) => {
2494                    // Generator yielded - check status and task state
2495                    if status == 1 || status == 2 {
2496                        // Status 1 = task complete, status 2 = future dropped
2497                        // Task completed, clean up generator
2498                        unsafe {
2499                            task.take_pinned_generator();
2500                        }
2501                        task.finish();
2502                        self.stats.completed_count = self.stats.completed_count.saturating_add(1);
2503                    } else if task.is_yielded() {
2504                        // Task yielded, re-enqueue
2505                        task.record_yield();
2506                        self.stats.yielded_count += 1;
2507                        self.enqueue_yield(handle);
2508                    } else {
2509                        // Task returned Pending, waiting for wake
2510                        // Don't re-enqueue, task will be woken when ready
2511                        self.stats.waiting_count += 1;
2512                        task.finish();
2513                    }
2514                }
2515                None => {
2516                    // Generator completed - task is done
2517                    unsafe {
2518                        task.take_pinned_generator();
2519                    }
2520                    task.finish();
2521                    self.stats.completed_count = self.stats.completed_count.saturating_add(1);
2522                }
2523            }
2524        } else {
2525            // Generator lost
2526            task.finish();
2527            self.stats.completed_count = self.stats.completed_count.saturating_add(1);
2528        }
2529    }
2530
2531    /// Create a Poll mode generator for a task
2532    /// This generator wraps task.poll_future() in a loop
2533    fn create_poll_mode_generator(&mut self, handle: TaskHandle, task: &Task) {
2534        const STACK_SIZE: usize = 512 * 1024; // 512KB stack
2535
2536        // Capture task pointer as usize for Send safety
2537        let task_addr = task as *const Task as usize;
2538        let handle_copy = handle;
2539
2540        let generator =
2541            crate::generator::Gn::<()>::new_scoped_opt(STACK_SIZE, move |mut scope| -> usize {
2542                let task = unsafe { &*(task_addr as *const Task) };
2543
2544                loop {
2545                    // Create waker and context
2546                    let waker = unsafe { task.waker_yield() };
2547                    let mut cx = Context::from_waker(&waker);
2548
2549                    // Poll the task's future
2550                    let poll_result = unsafe { task.poll_future(&mut cx) };
2551
2552                    match poll_result {
2553                        Some(Poll::Ready(())) => {
2554                            // Task complete - return from generator
2555                            return 1; // Status code: task complete
2556                        }
2557                        Some(Poll::Pending) => {
2558                            // Task still pending - yield and continue loop
2559                            scope.yield_(crate::runtime::preemption::GeneratorYieldReason::Cooperative.as_usize());
2560                        }
2561                        None => {
2562                            // Future is gone, task complete
2563                            return 2; // Status code: future dropped
2564                        }
2565                    }
2566                }
2567            });
2568
2569        // Store generator in task with Poll mode
2570        let boxed_iter: Box<dyn Iterator<Item = usize> + 'static> =
2571            unsafe { std::mem::transmute(Box::new(generator) as Box<dyn Iterator<Item = usize>>) };
2572
2573        unsafe {
2574            task.pin_generator(boxed_iter, GeneratorRunMode::Poll);
2575        }
2576
2577        // Re-enqueue task for next poll with the new generator
2578        self.enqueue_yield(handle_copy);
2579    }
2580
2581    fn poll_task(&mut self, handle: TaskHandle, task: &Task) {
2582        let waker = unsafe { task.waker_yield() };
2583        let mut cx = Context::from_waker(&waker);
2584        let poll_result = unsafe { task.poll_future(&mut cx) };
2585
2586        match poll_result {
2587            Some(Poll::Ready(())) => {
2588                task.finish();
2589                self.stats.completed_count = self.stats.completed_count.saturating_add(1);
2590                CURRENT_TASK.set(std::ptr::null_mut());
2591                if let Some(ptr) = task.take_future() {
2592                    unsafe { FutureAllocator::drop_boxed(ptr) };
2593                }
2594            }
2595            Some(Poll::Pending) => {
2596                if task.is_yielded() {
2597                    task.record_yield();
2598                    self.stats.yielded_count += 1;
2599                    // Yielded Tasks stay in EXECUTING state with yielded set to true
2600                    // without resetting the signal. All attempts to set the signal
2601                    // will not set it since it is guaranteed to run via the yield queue.
2602                    self.enqueue_yield(handle);
2603                } else {
2604                    self.stats.waiting_count += 1;
2605                    task.finish();
2606                }
2607            }
2608            None => {
2609                self.stats.completed_count += 1;
2610                task.finish();
2611            }
2612        }
2613    }
2614
2615    fn schedule_timer_for_current_task(&mut self, timer: &Timer, delay: Duration) -> Option<u64> {
2616        let task_ptr = CURRENT_TASK.with(|cell| cell.get()) as *mut Task;
2617        if task_ptr.is_null() {
2618            return None;
2619        }
2620
2621        let task = unsafe { &*task_ptr };
2622        let Some(slot) = task.slot() else {
2623            return None;
2624        };
2625
2626        if timer.is_scheduled() {
2627            if let Some(worker_id) = timer.worker_id() {
2628                if worker_id == self.worker_id {
2629                    let existing_id = timer.timer_id();
2630                    if self
2631                        .timer_wheel
2632                        .cancel_timer(existing_id)
2633                        .unwrap_or(None)
2634                        .is_some()
2635                    {
2636                        timer.reset();
2637                    }
2638                } else {
2639                    // TODO: support cross-worker cancellation when necessary.
2640                }
2641            }
2642        }
2643
2644        let delay_ns = delay.as_nanos().min(u128::from(u64::MAX)) as u64;
2645        let now = self.timer_wheel.now_ns();
2646        let deadline_ns = now.saturating_add(delay_ns);
2647
2648        let handle = timer.prepare(slot, task.global_id(), self.worker_id);
2649
2650        if let Some(timer_id) = self.enqueue_timer_entry(deadline_ns, handle) {
2651            timer.commit_schedule(timer_id, deadline_ns);
2652            Some(deadline_ns)
2653        } else {
2654            timer.reset();
2655            None
2656        }
2657    }
2658}
2659
2660fn make_task_waker(slot: &TaskSlot) -> Waker {
2661    let ptr = slot as *const TaskSlot as *const ();
2662    unsafe { Waker::from_raw(RawWaker::new(ptr, &TASK_WAKER_VTABLE)) }
2663}
2664
2665unsafe fn task_waker_clone(ptr: *const ()) -> RawWaker {
2666    RawWaker::new(ptr, &TASK_WAKER_VTABLE)
2667}
2668
2669unsafe fn task_waker_wake(ptr: *const ()) {
2670    unsafe {
2671        let slot = &*(ptr as *const TaskSlot);
2672        let task_ptr = slot.task_ptr();
2673        if task_ptr.is_null() {
2674            return;
2675        }
2676        let task = &*task_ptr;
2677        task.schedule();
2678    }
2679}
2680
2681unsafe fn task_waker_wake_by_ref(ptr: *const ()) {
2682    unsafe {
2683        task_waker_wake(ptr);
2684    }
2685}
2686
2687unsafe fn task_waker_drop(_: *const ()) {}
2688
2689static TASK_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
2690    task_waker_clone,
2691    task_waker_wake,
2692    task_waker_wake_by_ref,
2693    task_waker_drop,
2694);
2695
2696/// Schedules a timer for the task currently being polled by the active worker.
2697pub fn schedule_timer_for_current_task(
2698    _cx: &Context<'_>,
2699    timer: &Timer,
2700    delay: Duration,
2701) -> Option<u64> {
2702    let timer_wheel = current_timer_wheel()?;
2703    let task = current_task()?;
2704    let worker_id = timer_wheel.worker_id();
2705    let slot = task.slot()?;
2706
2707    // Cancel existing timer if scheduled
2708    if timer.is_scheduled() {
2709        if let Some(existing_worker_id) = timer.worker_id() {
2710            if existing_worker_id == worker_id {
2711                let existing_id = timer.timer_id();
2712                if timer_wheel
2713                    .cancel_timer(existing_id)
2714                    .unwrap_or(None)
2715                    .is_some()
2716                {
2717                    timer.reset();
2718                }
2719            } else {
2720                // Cross-worker cancellation: send message to the worker that owns the timer
2721                let existing_id = timer.timer_id();
2722                let ops = CROSS_WORKER_OPS.with(|cell| cell.get());
2723                if let Some(ops) = ops {
2724                    if ops.post_cancel_message(worker_id, existing_worker_id, existing_id) {
2725                        timer.reset();
2726                    }
2727                }
2728            }
2729        }
2730    }
2731
2732    let delay_ns = delay.as_nanos().min(u128::from(u64::MAX)) as u64;
2733    let now = timer_wheel.now_ns();
2734    let deadline_ns = now.saturating_add(delay_ns);
2735
2736    let handle = timer.prepare(slot, task.global_id(), worker_id);
2737
2738    match timer_wheel.schedule_timer(deadline_ns, handle) {
2739        Ok(timer_id) => {
2740            timer.commit_schedule(timer_id, deadline_ns);
2741            Some(deadline_ns)
2742        }
2743        Err(_) => {
2744            timer.reset();
2745            None
2746        }
2747    }
2748}
2749
2750/// Cancels a timer owned by the task currently being polled by the active worker.
2751pub fn cancel_timer_for_current_task(timer: &Timer) -> bool {
2752    if !timer.is_scheduled() {
2753        return false;
2754    }
2755
2756    let Some(timer_worker_id) = timer.worker_id() else {
2757        return false;
2758    };
2759
2760    let timer_id = timer.timer_id();
2761    if timer_id == 0 {
2762        return false;
2763    }
2764
2765    let timer_wheel = current_timer_wheel();
2766    let Some(timer_wheel) = timer_wheel else {
2767        return false;
2768    };
2769
2770    let worker_id = timer_wheel.worker_id();
2771
2772    if timer_worker_id == worker_id {
2773        if timer_wheel.cancel_timer(timer_id).unwrap_or(None).is_some() {
2774            timer.mark_cancelled(timer_id);
2775            return true;
2776        }
2777        return false;
2778    }
2779
2780    let ops = CROSS_WORKER_OPS.with(|cell| cell.get());
2781    let Some(ops) = ops else {
2782        return false;
2783    };
2784
2785    // Cross-worker cancellation: send message to the worker that owns the timer
2786    if ops.post_cancel_message(worker_id, timer_worker_id, timer_id) {
2787        timer.mark_cancelled(timer_id);
2788        true
2789    } else {
2790        false
2791    }
2792}
2793
2794/// Returns the most recent `now_ns` observed by the active worker.
2795pub fn current_worker_now_ns() -> Option<u64> {
2796    let timer_wheel = current_timer_wheel()?;
2797    Some(timer_wheel.now_ns())
2798}