1use super::deque::{Stealer, Worker as YieldWorker};
2use super::summary::Summary;
3use super::task::{FutureAllocator, GeneratorRunMode, Task, TaskArena, TaskHandle, TaskSlot};
4use super::timer::{Timer, TimerHandle};
5use super::timer_wheel::TimerWheel;
6use super::waker::WorkerWaker;
7use crate::PopError;
8use crate::runtime::mpsc;
9use crate::runtime::ticker::{TickHandler, TickHandlerRegistration};
10use crate::{PushError, utils};
11use std::cell::{Cell, UnsafeCell};
12use std::ptr::{self, NonNull};
13use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering};
14use std::sync::{Arc, Mutex};
15use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
16use std::thread;
17use std::time::{Duration, Instant};
18
19use crate::utils::bits;
20use rand::RngCore;
21
22#[cfg(unix)]
23use libc;
24
25const DEFAULT_QUEUE_SEG_SHIFT: usize = 8;
26const DEFAULT_QUEUE_NUM_SEGS_SHIFT: usize = 8;
27const MAX_WORKER_LIMIT: usize = 512;
28
29const DEFAULT_WAKE_BURST: usize = 4;
30const FULL_SUMMARY_SCAN_CADENCE_MASK: u64 = 1024 * 8 - 1;
31const DEFAULT_TICK_DURATION_NS: u64 = 1 << 20; const TIMER_TICKS_PER_WHEEL: usize = 1024 * 1;
33const TIMER_EXPIRE_BUDGET: usize = 4096;
34const MESSAGE_BATCH_SIZE: usize = 4096;
35
36trait CrossWorkerOps: Send + Sync {
43 fn post_cancel_message(&self, from_worker_id: u32, to_worker_id: u32, timer_id: u64) -> bool;
44}
45
46thread_local! {
47 static CURRENT_TIMER_WHEEL: Cell<*mut TimerWheel<TimerHandle>> = Cell::new(ptr::null_mut());
48 static CURRENT_TASK: Cell<*mut Task> = Cell::new(ptr::null_mut());
49 static CROSS_WORKER_OPS: Cell<Option<&'static dyn CrossWorkerOps>> = const { Cell::new(None) };
50 static GENERATOR_PIN_REQUESTED: Cell<bool> = const { Cell::new(false) };
52 static GENERATOR_PIN_TASK: Cell<*mut Task> = const { Cell::new(ptr::null_mut()) };
54}
55
56pub struct WorkerTLS {}
57
58#[inline]
72pub fn is_worker_thread() -> bool {
73 current_task().is_some()
74}
75
76#[inline]
77pub fn current_task<'a>() -> Option<&'a mut Task> {
78 let task = CURRENT_TASK.with(|cell| cell.get());
79 if task.is_null() {
80 return None;
81 } else {
82 unsafe { Some(&mut *task) }
83 }
84}
85
86pub fn current_timer_wheel<'a>() -> Option<&'a mut TimerWheel<TimerHandle>> {
87 let timer_wheel = CURRENT_TIMER_WHEEL.with(|cell| cell.get());
88 if timer_wheel.is_null() {
89 None
90 } else {
91 unsafe { Some(&mut *timer_wheel) }
92 }
93}
94
95pub fn pin_current_generator() -> bool {
118 let task_ptr = CURRENT_TASK.with(|cell| cell.get());
119 if task_ptr.is_null() {
120 return false;
121 }
122
123 GENERATOR_PIN_REQUESTED.with(|cell| cell.set(true));
125 true
126}
127
128#[derive(Clone, Copy, Debug)]
130pub struct TimerSchedule {
131 handle: TimerHandle,
132 deadline_ns: u64,
133}
134
135impl TimerSchedule {
136 pub fn new(handle: TimerHandle, deadline_ns: u64) -> Self {
137 Self {
138 handle,
139 deadline_ns,
140 }
141 }
142
143 #[inline(always)]
144 pub fn handle(&self) -> TimerHandle {
145 self.handle
146 }
147
148 #[inline(always)]
149 pub fn deadline_ns(&self) -> u64 {
150 self.deadline_ns
151 }
152
153 #[inline(always)]
154 pub fn into_parts(self) -> (TimerHandle, u64) {
155 (self.handle, self.deadline_ns)
156 }
157}
158
159unsafe impl Send for TimerSchedule {}
160unsafe impl Sync for TimerSchedule {}
161
162#[derive(Debug, Clone)]
164pub struct TimerBatch {
165 entries: Box<[TimerSchedule]>,
166}
167
168impl TimerBatch {
169 pub fn new(entries: Vec<TimerSchedule>) -> Self {
170 Self {
171 entries: entries.into_boxed_slice(),
172 }
173 }
174
175 pub fn empty() -> Self {
176 Self {
177 entries: Box::new([]),
178 }
179 }
180
181 pub fn len(&self) -> usize {
182 self.entries.len()
183 }
184
185 pub fn is_empty(&self) -> bool {
186 self.entries.is_empty()
187 }
188
189 pub fn iter(&self) -> impl Iterator<Item = &TimerSchedule> {
190 self.entries.iter()
191 }
192
193 pub fn into_vec(self) -> Vec<TimerSchedule> {
194 self.entries.into_vec()
195 }
196}
197
198#[derive(Clone, Debug)]
200pub enum WorkerMessage {
201 ScheduleTimer {
202 timer: TimerSchedule,
203 },
204 ScheduleBatch {
205 timers: TimerBatch,
206 },
207 CancelTimer {
208 worker_id: u32,
209 timer_id: u64,
210 },
211 WorkerCountChanged {
212 new_worker_count: u16,
213 },
214
215 RebalancePartitions {
217 partition_start: usize,
218 partition_end: usize,
219 },
220
221 MigrateTasks {
223 task_handles: Vec<TaskHandle>,
224 },
225
226 ReportHealth,
228
229 GracefulShutdown,
231
232 Shutdown,
234
235 Noop,
236}
237
238unsafe impl Send for WorkerMessage {}
239unsafe impl Sync for WorkerMessage {}
240
241#[derive(Clone, Copy, Debug)]
242pub struct WorkerServiceConfig {
243 pub tick_duration: Duration,
244 pub min_workers: usize,
245 pub max_workers: usize,
246}
247
248impl Default for WorkerServiceConfig {
249 fn default() -> Self {
250 Self {
251 tick_duration: Duration::from_nanos(DEFAULT_TICK_DURATION_NS),
252 min_workers: utils::num_cpus(),
253 max_workers: utils::num_cpus(),
254 }
255 }
256}
257
258pub struct WorkerService<
259 const P: usize = DEFAULT_QUEUE_SEG_SHIFT,
260 const NUM_SEGS_P2: usize = DEFAULT_QUEUE_NUM_SEGS_SHIFT,
261> {
262 arena: TaskArena,
264 summary_tree: Summary,
265
266 config: WorkerServiceConfig,
267 tick_duration: Duration,
268 tick_duration_ns: u64,
269 clock_ns: Arc<AtomicU64>,
270
271 wakers: Box<[Arc<WorkerWaker>]>,
276
277 worker_actives: Box<[AtomicU64]>,
278 worker_now_ns: Box<[AtomicU64]>,
279 worker_shutdowns: Box<[AtomicBool]>,
280 worker_threads: Box<[Mutex<Option<thread::JoinHandle<()>>>]>,
281 worker_thread_handles: Box<[Mutex<Option<crate::runtime::preemption::WorkerThreadHandle>>]>,
282 worker_stats: Box<[WorkerStats]>,
283 worker_count: Arc<AtomicUsize>,
284 worker_max_id: AtomicUsize,
285 receivers: Box<[UnsafeCell<mpsc::Receiver<WorkerMessage, P, NUM_SEGS_P2>>]>,
286 senders: Box<[mpsc::Sender<WorkerMessage, P, NUM_SEGS_P2>]>,
287 tick_senders: Box<[mpsc::Sender<WorkerMessage, P, NUM_SEGS_P2>]>,
288 yield_queues: Box<[YieldWorker<TaskHandle>]>,
289 yield_stealers: Box<[Stealer<TaskHandle>]>,
290 timers: Box<[UnsafeCell<TimerWheel<TimerHandle>>]>,
291 shutdown: AtomicBool,
292 register_mutex: Mutex<()>,
293 tick_registration: Mutex<Option<TickHandlerRegistration>>,
295 rebalance_requested: AtomicBool,
297 tick_health_check_interval: u64,
299 tick_partition_rebalance_interval: u64,
300 tick_scaling_check_interval: u64,
301}
302
303unsafe impl<const P: usize, const NUM_SEGS_P2: usize> Send for WorkerService<P, NUM_SEGS_P2> {}
304
305unsafe impl<const P: usize, const NUM_SEGS_P2: usize> Sync for WorkerService<P, NUM_SEGS_P2> {}
309
310impl<const P: usize, const NUM_SEGS_P2: usize> WorkerService<P, NUM_SEGS_P2> {
311 pub fn start(
312 arena: TaskArena,
313 config: WorkerServiceConfig,
314 tick_service: &Arc<super::ticker::TickService>,
315 ) -> Arc<Self> {
316 let tick_duration = config.tick_duration;
317 let tick_duration_ns = normalize_tick_duration_ns(config.tick_duration);
318 let min_workers = config.min_workers.max(1);
319 let max_workers = config.max_workers.max(min_workers).min(MAX_WORKER_LIMIT);
320
321 let mut wakers = Vec::with_capacity(max_workers);
323 for _ in 0..max_workers {
324 wakers.push(Arc::new(WorkerWaker::new()));
325 }
326 let wakers = wakers.into_boxed_slice();
327
328 let worker_count = Arc::new(AtomicUsize::new(0));
330
331 let summary_tree = Summary::new(
333 arena.config().leaf_count,
334 arena.layout().signals_per_leaf,
335 &wakers,
336 &worker_count,
337 );
338
339 let mut worker_actives = Vec::with_capacity(max_workers);
340 for _ in 0..max_workers {
341 worker_actives.push(AtomicU64::new(0));
342 }
343 let mut worker_now_ns = Vec::with_capacity(max_workers);
344 for _ in 0..max_workers {
345 worker_now_ns.push(AtomicU64::new(0));
346 }
347 let mut worker_shutdowns = Vec::with_capacity(max_workers);
348 for _ in 0..max_workers {
349 worker_shutdowns.push(AtomicBool::new(false));
350 }
351 let mut worker_threads = Vec::with_capacity(max_workers);
352 for _ in 0..max_workers {
353 worker_threads.push(Mutex::new(None));
354 }
355 let mut worker_thread_handles = Vec::with_capacity(max_workers);
356 for _ in 0..max_workers {
357 worker_thread_handles.push(Mutex::new(None));
358 }
359 let mut worker_stats = Vec::with_capacity(max_workers);
360 for _ in 0..max_workers {
361 worker_stats.push(WorkerStats::default());
362 }
363
364 let mut receivers = Vec::with_capacity(max_workers);
365 let mut senders = Vec::<mpsc::Sender<WorkerMessage, P, NUM_SEGS_P2>>::with_capacity(
366 max_workers * max_workers,
367 );
368 for worker_id in 0..max_workers {
369 let rx = mpsc::new_with_waker(Arc::clone(&wakers[worker_id]));
371 receivers.push(UnsafeCell::new(rx));
372 }
373 for worker_id in 0..max_workers {
374 for other_worker_id in 0..max_workers {
375 senders.push(
376 unsafe { &*receivers[worker_id].get() }
377 .create_sender_with_config(0)
378 .expect("ran out of producer slots"),
379 )
380 }
381 }
382
383 let mut tick_senders = Vec::with_capacity(max_workers);
385 for worker_id in 0..max_workers {
386 tick_senders.push(
387 unsafe { &*receivers[worker_id].get() }
388 .create_sender_with_config(0)
389 .expect("ran out of producer slots"),
390 );
391 }
392
393 let mut yield_queues = Vec::with_capacity(max_workers);
394 for _ in 0..max_workers {
395 yield_queues.push(YieldWorker::new_fifo());
396 }
397 let mut yield_stealers = Vec::with_capacity(max_workers);
398 for worker_id in 0..max_workers {
399 yield_stealers.push(yield_queues[worker_id].stealer());
400 }
401 let mut timers = Vec::with_capacity(max_workers);
402 for worker_id in 0..max_workers {
403 timers.push(UnsafeCell::new(TimerWheel::new(
404 tick_duration,
405 TIMER_TICKS_PER_WHEEL,
406 worker_id as u32,
407 )));
408 }
409
410 let service = Arc::new(Self {
411 arena,
412 summary_tree,
413 config,
414 tick_duration: tick_duration,
415 tick_duration_ns,
416 clock_ns: Arc::new(AtomicU64::new(0)),
417 wakers,
418 worker_actives: worker_actives.into_boxed_slice(),
419 worker_now_ns: worker_now_ns.into_boxed_slice(),
420 worker_shutdowns: worker_shutdowns.into_boxed_slice(),
421 worker_threads: worker_threads.into_boxed_slice(),
422 worker_thread_handles: worker_thread_handles.into_boxed_slice(),
423 worker_stats: worker_stats.into_boxed_slice(),
424 worker_count, worker_max_id: AtomicUsize::new(0),
426 receivers: receivers.into_boxed_slice(),
427 senders: senders.into_boxed_slice(),
428 tick_senders: tick_senders.into_boxed_slice(),
429 yield_queues: yield_queues.into_boxed_slice(),
430 yield_stealers: yield_stealers.into_boxed_slice(),
431 timers: timers.into_boxed_slice(),
432 shutdown: AtomicBool::new(false),
433 register_mutex: Mutex::new(()),
434 tick_registration: Mutex::new(None),
435 rebalance_requested: AtomicBool::new(false),
436 tick_health_check_interval: 100,
437 tick_partition_rebalance_interval: 1000,
438 tick_scaling_check_interval: 500,
439 });
440
441 service.worker_count.store(min_workers, Ordering::SeqCst);
445 let mut spawned = 0;
448 for _ in 0..min_workers {
449 if service.spawn_worker_internal(&service, false).is_err() {
450 break;
451 }
452 spawned += 1;
453 }
454
455 if spawned < min_workers {
457 service.worker_count.store(spawned, Ordering::SeqCst);
458 }
460
461 let registration = {
463 let handler: Arc<dyn TickHandler> = Arc::clone(&service) as Arc<dyn TickHandler>;
464 tick_service
465 .register(handler)
466 .expect("Failed to register with TickService")
467 };
468
469 unsafe {
476 let service_ptr = Arc::as_ptr(&service) as *mut WorkerService<P, NUM_SEGS_P2>;
477 *(*service_ptr).tick_registration.lock().unwrap() = Some(registration);
478 }
479
480 service
481 }
482
483 #[inline]
485 pub fn arena(&self) -> &TaskArena {
486 &self.arena
487 }
488
489 #[inline]
491 pub fn summary(&self) -> &Summary {
492 &self.summary_tree
493 }
494
495 #[inline]
497 pub fn reserve_task(&self) -> Option<TaskHandle> {
498 if self.arena.is_closed() {
499 return None;
500 }
501 let (leaf_idx, signal_idx, bit) = self.summary_tree.reserve_task()?;
502 let handle = match self.arena.handle_for_location(leaf_idx, signal_idx, bit) {
503 Some(handle) => handle,
504 None => {
505 self.summary_tree
506 .release_task_in_leaf(leaf_idx, signal_idx, bit as usize);
507 return None;
508 }
509 };
510 self.arena.increment_total_tasks();
511 Some(handle)
512 }
513
514 #[inline]
516 pub fn release_task(&self, handle: TaskHandle) {
517 let task = handle.task();
518 self.summary_tree.release_task_in_leaf(
519 task.leaf_idx() as usize,
520 task.signal_idx() as usize,
521 task.signal_bit() as usize,
522 );
523 self.arena.decrement_total_tasks();
524 }
525
526 pub fn spawn_worker(&self) -> Result<(), PushError<()>>
527 where
528 Self: Sized,
529 {
530 let service_arc = unsafe {
534 Arc::from_raw(self as *const Self)
538 };
539 let result = self.spawn_worker_internal(&service_arc, true);
540 std::mem::forget(service_arc);
542 result
543 }
544
545 fn spawn_worker_internal(
546 &self,
547 service: &Arc<Self>,
548 increment_count: bool,
549 ) -> Result<(), PushError<()>> {
550 let _lock = self.register_mutex.lock().expect("register_mutex poisoned");
551 let now_ns = Instant::now().elapsed().as_nanos() as u64;
552
553 let mut worker_id: Option<usize> = None;
555 for id in 0..self.worker_actives.len() {
556 if self.worker_actives[id]
557 .compare_exchange(0, now_ns, Ordering::SeqCst, Ordering::SeqCst)
558 .is_ok()
559 {
560 worker_id = Some(id);
561 break;
562 }
563 }
564 if worker_id.is_none() {
565 return Err(PushError::Full(()));
566 }
567 let worker_id = worker_id.unwrap();
568 if worker_id >= self.worker_max_id.load(Ordering::SeqCst) {
570 self.worker_max_id.store(worker_id + 1, Ordering::SeqCst);
571 }
572
573 if increment_count {
574 self.worker_count.fetch_add(1, Ordering::SeqCst);
575 self.rebalance_requested.store(true, Ordering::Release);
578 }
579
580 self.worker_actives[worker_id].store(1, Ordering::SeqCst);
582
583 let service_clone = Arc::clone(service);
584 let shutdown = Arc::new(AtomicBool::new(false));
585 let timer_resolution_ns = self.tick_duration().as_nanos().max(1) as u64;
586 let join = std::thread::spawn(move || {
587 let task_slot = TaskSlot::new(std::ptr::null_mut());
588 let task_slot_ptr = &task_slot as *const _ as *mut TaskSlot;
589 let mut timer_output = Vec::with_capacity(MESSAGE_BATCH_SIZE);
590 for _ in 0..MESSAGE_BATCH_SIZE {
591 timer_output.push((
592 0u64,
593 0u64,
594 TimerHandle::new(unsafe { NonNull::new_unchecked(task_slot_ptr) }, 0, 0, 0),
595 ));
596 }
597 let mut message_batch = Vec::with_capacity(MESSAGE_BATCH_SIZE);
598 for _ in 0..MESSAGE_BATCH_SIZE {
599 message_batch.push(WorkerMessage::Noop);
600 }
601
602 let mut w = Worker {
603 service: Arc::clone(&service_clone),
604 wait_strategy: WaitStrategy::default(),
605 shutdown: &service_clone.worker_shutdowns[worker_id],
606 receiver: unsafe {
607 &mut *service_clone.receivers[worker_id].get()
610 },
611 yield_queue: &service_clone.yield_queues[worker_id],
612 timer_wheel: unsafe {
613 &mut *service_clone.timers[worker_id].get()
616 },
617 wake_stats: WakeStats::default(),
618 stats: WorkerStats::default(),
619 partition_start: 0,
620 partition_end: 0,
621 partition_len: 0,
622 cached_worker_count: 0,
623 wake_burst_limit: DEFAULT_WAKE_BURST,
624 worker_id: worker_id as u32,
625 timer_resolution_ns,
626 timer_output,
627 message_batch: message_batch.into_boxed_slice(),
628 rng: crate::utils::Random::new(),
629 current_task: std::ptr::null_mut(),
630 preemption_requested: AtomicBool::new(false),
631 #[cfg(windows)]
632 generator_scope_atomic: std::sync::atomic::AtomicPtr::new(std::ptr::null_mut()),
633 };
634 CURRENT_TIMER_WHEEL.set(w.timer_wheel as *mut TimerWheel<TimerHandle>);
635 let ops_ref: &'static dyn CrossWorkerOps =
637 unsafe { &*(&*service_clone as *const dyn CrossWorkerOps) };
638 CROSS_WORKER_OPS.set(Some(ops_ref));
639
640 let _preemption_handle = crate::runtime::preemption::init_worker_preemption().ok(); #[cfg(unix)]
646 let thread_handle_result = crate::runtime::preemption::WorkerThreadHandle::current();
647
648 #[cfg(windows)]
649 let thread_handle_result =
650 crate::runtime::preemption::WorkerThreadHandle::current(&w.preemption_requested);
651
652 #[cfg(not(any(unix, windows)))]
653 let thread_handle_result =
654 crate::runtime::preemption::WorkerThreadHandle::current(&w.preemption_requested);
655
656 if let Ok(thread_handle) = thread_handle_result {
657 *service_clone.worker_thread_handles[worker_id]
658 .lock()
659 .expect("worker_thread_handles lock poisoned") = Some(thread_handle);
660 }
661
662 let w_ptr = &mut w as *mut Worker<'_, P, NUM_SEGS_P2>;
667 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
668 unsafe { (*w_ptr).run() };
669 }));
670 if let Err(e) = result {
671 eprintln!("[Worker {}] Panicked: {:?}", worker_id, e);
672 }
673 service_clone.worker_count.fetch_sub(1, Ordering::SeqCst);
677 service_clone.worker_actives[worker_id].store(0, Ordering::SeqCst);
679 service_clone
681 .rebalance_requested
682 .store(true, Ordering::Release);
683 });
684
685 *self.worker_threads[worker_id]
687 .lock()
688 .expect("worker_threads lock poisoned") = Some(join);
689
690 Ok(())
691 }
692
693 pub(crate) fn post_message(
694 &self,
695 from_worker_id: u32,
696 to_worker_id: u32,
697 message: WorkerMessage,
698 ) -> Result<(), PushError<WorkerMessage>> {
699 unsafe {
701 self.senders
702 [((from_worker_id as usize) * self.config.max_workers) + (to_worker_id as usize)]
703 .unsafe_try_push(message)
704 }
705 }
706
707 pub(crate) fn try_yield_steal(
708 &self,
709 from_worker_id: usize,
710 limit: usize,
711 next_rand: usize,
712 ) -> (u64, bool) {
713 let queue = &self.yield_queues[from_worker_id];
714 let stealer = &self.yield_stealers[from_worker_id];
715 let max_workers = self.worker_max_id.load(Ordering::Relaxed);
716 let mut index = next_rand;
717 let mut attempts = 0;
718 for _ in 0..max_workers {
719 let worker_id = index % max_workers;
720 if worker_id == from_worker_id {
721 continue;
722 }
723 let steal = stealer.steal_batch_with_limit(queue, limit);
724 if steal.is_success() {
725 return (attempts, true);
726 }
727 index += 1;
728 attempts += 1;
729 }
730 (attempts, false)
731 }
732
733 pub fn tick_duration(&self) -> Duration {
734 self.tick_duration
735 }
736
737 pub fn clock_ns(&self) -> u64 {
738 self.clock_ns.load(Ordering::Acquire)
739 }
740
741 pub fn shutdown(&self) {
742 *self.tick_registration.lock().expect("lock poisoned") = None;
743 if self.shutdown.swap(true, Ordering::Release) {
744 return;
745 }
746
747 #[cfg(debug_assertions)]
748 {
749 eprintln!("[WorkerService] Shutdown initiated, sending shutdown messages...");
750 }
751
752 let max_workers = self.worker_max_id.load(Ordering::Relaxed);
754 for worker_id in 0..=max_workers {
755 if worker_id < self.worker_actives.len()
756 && self.worker_actives[worker_id].load(Ordering::Relaxed) != 0
757 {
758 let _ = unsafe {
760 self.tick_senders[worker_id].unsafe_try_push(WorkerMessage::Shutdown)
761 };
762 self.wakers[worker_id].mark_tasks();
764 }
765 }
766
767 #[cfg(debug_assertions)]
768 {
769 eprintln!("[WorkerService] Joining worker threads...");
770 }
771
772 for (idx, worker_thread) in self.worker_threads.iter().enumerate() {
774 if let Some(handle) = worker_thread.lock().unwrap().take() {
775 #[cfg(debug_assertions)]
776 {
777 eprintln!("[WorkerService] Joining worker thread {}...", idx);
778 }
779 let _ = handle.join();
780 #[cfg(debug_assertions)]
781 {
782 eprintln!("[WorkerService] Worker thread {} joined", idx);
783 }
784 }
785 }
786
787 #[cfg(debug_assertions)]
788 {
789 eprintln!("[WorkerService] Shutdown complete");
790 }
791 }
792
793 #[inline]
795 pub fn worker_count(&self) -> usize {
796 self.worker_count.load(Ordering::Relaxed)
797 }
798
799 #[inline]
801 pub fn worker_has_work(&self, worker_id: usize) -> bool {
802 if worker_id >= self.wakers.len() {
803 return false;
804 }
805 let waker = &self.wakers[worker_id];
806 let summary = waker.snapshot_summary();
807 let status = waker.status();
808 summary != 0 || status != 0
809 }
810
811 fn supervisor_health_check(&self, now_ns: u64) {
813 let max_workers = self.worker_max_id.load(Ordering::Relaxed);
814 let tick_duration_ns = self.tick_duration_ns;
815 let stuck_threshold_ns = tick_duration_ns * 10; for worker_id in 0..max_workers {
818 let is_active = self.worker_actives[worker_id].load(Ordering::Relaxed);
819 if is_active == 0 {
820 continue; }
822
823 let last_update = self.worker_now_ns[worker_id].load(Ordering::Relaxed);
824 let time_since_update = now_ns.saturating_sub(last_update);
825
826 if time_since_update > stuck_threshold_ns {
828 #[cfg(debug_assertions)]
829 {
830 eprintln!(
831 "[Supervisor] WARNING: Worker {} appears stuck (no update for {}ns)",
832 worker_id, time_since_update
833 );
834 }
835 }
836
837 if worker_id < self.tick_senders.len() {
839 let _ = unsafe {
841 self.tick_senders[worker_id].unsafe_try_push(WorkerMessage::ReportHealth)
842 };
843 }
844 }
845 }
846
847 fn supervisor_rebalance_partitions(&self) {
849 let active_workers = self.worker_count.load(Ordering::Relaxed);
850 if active_workers == 0 {
851 return;
852 }
853
854 let total_leaves = self.arena.leaf_count();
855 let leaves_per_worker = (total_leaves + active_workers - 1) / active_workers;
856
857 let max_workers = self.worker_max_id.load(Ordering::Relaxed);
858 let mut active_worker_ids = Vec::with_capacity(max_workers);
859
860 for worker_id in 0..max_workers {
862 if self.worker_actives[worker_id].load(Ordering::Relaxed) != 0 {
863 active_worker_ids.push(worker_id);
864 }
865 }
866
867 for (idx, &worker_id) in active_worker_ids.iter().enumerate() {
869 let partition_start = idx * leaves_per_worker;
870 let partition_end = ((idx + 1) * leaves_per_worker).min(total_leaves);
871
872 let _ = unsafe {
875 self.tick_senders[worker_id].unsafe_try_push(WorkerMessage::RebalancePartitions {
876 partition_start,
877 partition_end,
878 })
879 };
880 }
881
882 #[cfg(debug_assertions)]
883 {
884 eprintln!(
885 "[Supervisor] Rebalanced {} leaves across {} active workers",
886 total_leaves, active_workers
887 );
888 }
889 }
890
891 fn supervisor_graceful_shutdown(&self) {
893 let _ = self.tick_registration.lock().unwrap().take();
894 let max_workers = self.worker_max_id.load(Ordering::Relaxed);
895
896 #[cfg(debug_assertions)]
897 {
898 eprintln!("[Supervisor] Shutting down, max_workers={}", max_workers);
899 }
900
901 for worker_id in 0..=max_workers {
903 #[cfg(debug_assertions)]
904 {
905 let is_active = worker_id < self.worker_actives.len()
906 && self.worker_actives[worker_id].load(Ordering::Relaxed) != 0;
907 eprintln!("[Supervisor] Worker {} active={}", worker_id, is_active);
908 }
909 if worker_id < self.worker_actives.len()
910 && self.worker_actives[worker_id].load(Ordering::Relaxed) != 0
911 {
912 #[cfg(debug_assertions)]
913 {
914 eprintln!("[Supervisor] Sending shutdown to worker {}", worker_id);
915 }
916 let _ = unsafe {
918 self.tick_senders[worker_id].unsafe_try_push(WorkerMessage::GracefulShutdown)
919 };
920 self.wakers[worker_id].mark_tasks();
922 }
923 }
924
925 #[cfg(debug_assertions)]
926 {
927 eprintln!("[Supervisor] Sent graceful shutdown to all workers");
928 }
929
930 let shutdown_timeout = std::time::Duration::from_secs(5);
932 let shutdown_start = std::time::Instant::now();
933
934 loop {
935 let active_count = self.worker_count.load(Ordering::Relaxed);
936 if active_count == 0 {
937 break;
938 }
939
940 if shutdown_start.elapsed() > shutdown_timeout {
941 #[cfg(debug_assertions)]
942 {
943 eprintln!(
944 "[Supervisor] Graceful shutdown timeout, {} workers still active",
945 active_count
946 );
947 }
948 for worker_id in 0..max_workers {
950 if self.worker_actives[worker_id].load(Ordering::Relaxed) != 0 {
951 let _ = unsafe {
953 self.tick_senders[worker_id].unsafe_try_push(WorkerMessage::Shutdown)
954 };
955 }
956 }
957 break;
958 }
959
960 std::thread::sleep(std::time::Duration::from_millis(10));
961 }
962 }
963
964 fn supervisor_dynamic_scaling(&self) {
966 let active_workers = self.worker_count.load(Ordering::Relaxed);
967 let min_workers = self.config.min_workers;
968 let max_workers_config = self.config.max_workers;
969
970 let mut total_work_signals = 0u64;
972 let max_worker_id = self.worker_max_id.load(Ordering::Relaxed);
973
974 for worker_id in 0..max_worker_id {
975 if self.worker_actives[worker_id].load(Ordering::Relaxed) == 0 {
976 continue;
977 }
978
979 let waker = &self.wakers[worker_id];
980 let summary = waker.snapshot_summary();
981 let status = waker.status();
982
983 total_work_signals += summary.count_ones() as u64 + status.count_ones() as u64;
985 }
986
987 if active_workers > 0 {
989 let avg_work_per_worker = total_work_signals / active_workers as u64;
990 let scale_up_threshold = 4; if avg_work_per_worker > scale_up_threshold && active_workers < max_workers_config {
993 match self.spawn_worker() {
995 Ok(()) => {
996 let new_count = self.worker_count.load(Ordering::Relaxed);
997
998 #[cfg(debug_assertions)]
999 {
1000 eprintln!(
1001 "[Supervisor] Scaled UP: {} -> {} workers (avg_work={})",
1002 active_workers, new_count, avg_work_per_worker
1003 );
1004 }
1005
1006 for sender in self.tick_senders.iter() {
1008 let _ = unsafe {
1010 sender.unsafe_try_push(WorkerMessage::WorkerCountChanged {
1011 new_worker_count: new_count as u16,
1012 })
1013 };
1014 }
1015 }
1016 Err(_) => {
1017 #[cfg(debug_assertions)]
1018 {
1019 eprintln!("[Supervisor] Failed to scale up: no available worker slots");
1020 }
1021 }
1022 }
1023 }
1024 }
1025
1026 if active_workers > min_workers && total_work_signals == 0 {
1028 for worker_id in (0..max_worker_id).rev() {
1030 if self.worker_actives[worker_id].load(Ordering::Relaxed) != 0 {
1031 let _ = unsafe {
1034 self.tick_senders[worker_id]
1035 .unsafe_try_push(WorkerMessage::GracefulShutdown)
1036 };
1037
1038 #[cfg(debug_assertions)]
1039 {
1040 eprintln!(
1041 "[Supervisor] Scaled DOWN: removed worker {} (no work detected)",
1042 worker_id
1043 );
1044 }
1045 break; }
1047 }
1048 }
1049 }
1050
1051 #[allow(dead_code)]
1054 fn supervisor_task_migration(&self) {
1055 let max_worker_id = self.worker_max_id.load(Ordering::Relaxed);
1056
1057 let mut worker_loads: Vec<(usize, u64)> = Vec::new();
1059
1060 for worker_id in 0..max_worker_id {
1061 if self.worker_actives[worker_id].load(Ordering::Relaxed) == 0 {
1062 continue;
1063 }
1064
1065 let waker = &self.wakers[worker_id];
1066 let summary = waker.snapshot_summary();
1067 let status = waker.status();
1068 let load = summary.count_ones() as u64 + status.count_ones() as u64;
1069
1070 worker_loads.push((worker_id, load));
1071 }
1072
1073 if worker_loads.len() < 2 {
1074 return; }
1076
1077 worker_loads.sort_by_key(|&(_, load)| load);
1079
1080 let (most_loaded_id, most_loaded_load) = worker_loads[worker_loads.len() - 1];
1082 let (least_loaded_id, least_loaded_load) = worker_loads[0];
1083
1084 let imbalance = most_loaded_load.saturating_sub(least_loaded_load);
1086 let migration_threshold = 3;
1087
1088 if imbalance >= migration_threshold {
1089 #[cfg(debug_assertions)]
1097 {
1098 eprintln!(
1099 "[Supervisor] Task migration opportunity: worker {} (load={}) -> worker {} (load={})",
1100 most_loaded_id, most_loaded_load, least_loaded_id, least_loaded_load
1101 );
1102 }
1103
1104 let _ = (most_loaded_id, least_loaded_id); }
1109 }
1110
1111 pub fn interrupt_worker(
1131 &self,
1132 worker_id: usize,
1133 ) -> Result<bool, crate::runtime::preemption::PreemptionError> {
1134 if worker_id >= self.worker_thread_handles.len() {
1135 return Ok(false);
1136 }
1137
1138 let handle_guard = self.worker_thread_handles[worker_id]
1139 .lock()
1140 .expect("worker_thread_handles lock poisoned");
1141
1142 if let Some(ref handle) = *handle_guard {
1143 handle.interrupt()?;
1144 Ok(true)
1145 } else {
1146 Ok(false)
1147 }
1148 }
1149}
1150
1151impl<const P: usize, const NUM_SEGS_P2: usize> CrossWorkerOps for WorkerService<P, NUM_SEGS_P2> {
1152 fn post_cancel_message(&self, from_worker_id: u32, to_worker_id: u32, timer_id: u64) -> bool {
1153 self.post_message(
1154 from_worker_id,
1155 to_worker_id,
1156 WorkerMessage::CancelTimer {
1157 worker_id: to_worker_id,
1158 timer_id,
1159 },
1160 )
1161 .is_ok()
1162 }
1163}
1164
1165impl<const P: usize, const NUM_SEGS_P2: usize> TickHandler for WorkerService<P, NUM_SEGS_P2> {
1166 fn tick_duration(&self) -> Duration {
1167 self.tick_duration
1168 }
1169
1170 fn on_tick(&self, tick_count: u64, now_ns: u64) {
1171 self.clock_ns.store(now_ns, Ordering::Release);
1173 let max_workers = self.worker_max_id.load(Ordering::Relaxed);
1174 for i in 0..=max_workers {
1175 if i >= self.worker_now_ns.len() {
1176 break;
1177 }
1178 self.worker_now_ns[i].store(now_ns, Ordering::Release);
1179 unsafe {
1181 let timer_wheel = &mut *self.timers[i].get();
1184 timer_wheel.set_now_ns(now_ns);
1185 }
1186 }
1187
1188 if tick_count == 0 {
1190 self.supervisor_rebalance_partitions();
1191 }
1192
1193 if self.rebalance_requested.swap(false, Ordering::AcqRel) {
1195 self.supervisor_rebalance_partitions();
1196 }
1197
1198 if tick_count % self.tick_health_check_interval == 0 {
1200 self.supervisor_health_check(now_ns);
1201 }
1202
1203 if tick_count % self.tick_scaling_check_interval == 0 {
1205 self.supervisor_dynamic_scaling();
1206 }
1207
1208 if tick_count % self.tick_partition_rebalance_interval == 0 {
1210 self.supervisor_rebalance_partitions();
1211 }
1212 }
1213
1214 fn on_shutdown(&self) {
1215 self.supervisor_graceful_shutdown();
1217 }
1218}
1219
1220impl<const P: usize, const NUM_SEGS_P2: usize> Drop for WorkerService<P, NUM_SEGS_P2> {
1221 fn drop(&mut self) {
1222 *self.tick_registration.lock().expect("lock poisoned") = None;
1223 if !self.shutdown.load(Ordering::Acquire) {
1224 self.shutdown();
1225 }
1226 }
1227}
1228
1229fn normalize_tick_duration_ns(duration: Duration) -> u64 {
1230 let nanos = duration.as_nanos().max(1).min(u128::from(u64::MAX)) as u64;
1231 nanos.next_power_of_two()
1232}
1233
1234#[derive(Debug, Default, Clone)]
1236pub struct WorkerStats {
1237 pub tasks_polled: u64,
1239
1240 pub completed_count: u64,
1242
1243 pub yielded_count: u64,
1245
1246 pub waiting_count: u64,
1248
1249 pub cas_failures: u64,
1251
1252 pub empty_scans: u64,
1254
1255 pub yield_queue_polls: u64,
1257
1258 pub signal_polls: u64,
1260
1261 pub steal_attempts: u64,
1263
1264 pub steal_successes: u64,
1266
1267 pub leaf_summary_checks: u64,
1269
1270 pub leaf_summary_hits: u64,
1272
1273 pub leaf_steal_attempts: u64,
1275
1276 pub leaf_steal_successes: u64,
1278
1279 pub timer_fires: u64,
1280}
1281
1282#[derive(Debug, Clone)]
1285pub struct WorkerHealthSnapshot {
1286 pub worker_id: u32,
1287 pub timestamp_ns: u64,
1288 pub stats: WorkerStats,
1289 pub yield_queue_len: usize,
1290 pub mpsc_queue_len: usize,
1291 pub active_leaf_partitions: Vec<usize>,
1292 pub has_work: bool,
1293}
1294
1295#[derive(Clone, Copy, Debug, Default)]
1296pub struct WakeStats {
1297 pub release_calls: u64,
1298 pub released_permits: u64,
1299 pub last_backlog: usize,
1300 pub max_backlog: usize,
1301 pub queue_release_calls: u64,
1302}
1303
1304#[derive(Clone, Copy, Debug)]
1305pub struct WaitStrategy {
1306 pub spin_before_sleep: usize,
1307 pub park_timeout: Option<Duration>,
1308}
1309
1310impl WaitStrategy {
1311 pub fn new(spin_before_sleep: usize, park_timeout: Option<Duration>) -> Self {
1312 Self {
1313 spin_before_sleep,
1314 park_timeout,
1315 }
1316 }
1317
1318 pub fn non_blocking() -> Self {
1319 Self::new(0, Some(Duration::from_secs(0)))
1320 }
1321
1322 pub fn park_immediately() -> Self {
1323 Self::new(0, None)
1324 }
1325}
1326
1327impl Default for WaitStrategy {
1328 fn default() -> Self {
1329 Self::new(0, Some(Duration::from_millis(2000)))
1330 }
1331}
1332
1333pub struct Worker<
1334 'a,
1335 const P: usize = DEFAULT_QUEUE_SEG_SHIFT,
1336 const NUM_SEGS_P2: usize = DEFAULT_QUEUE_NUM_SEGS_SHIFT,
1337> {
1338 service: Arc<WorkerService<P, NUM_SEGS_P2>>,
1339 wait_strategy: WaitStrategy,
1340 shutdown: &'a AtomicBool,
1341 receiver: &'a mut mpsc::Receiver<WorkerMessage, P, NUM_SEGS_P2>,
1342 yield_queue: &'a YieldWorker<TaskHandle>,
1343 timer_wheel: &'a mut TimerWheel<TimerHandle>,
1344 wake_stats: WakeStats,
1345 stats: WorkerStats,
1346 partition_start: usize,
1347 partition_end: usize,
1348 partition_len: usize,
1349 cached_worker_count: usize,
1350 wake_burst_limit: usize,
1351 worker_id: u32,
1352 timer_resolution_ns: u64,
1353 timer_output: Vec<(u64, u64, TimerHandle)>,
1354 message_batch: Box<[WorkerMessage]>,
1355 rng: crate::utils::Random,
1356 pub(crate) current_task: *mut Task,
1357 preemption_requested: AtomicBool,
1359 #[cfg(windows)]
1361 generator_scope_atomic: std::sync::atomic::AtomicPtr<()>,
1362}
1363
1364impl<'a, const P: usize, const NUM_SEGS_P2: usize> Worker<'a, P, NUM_SEGS_P2> {
1365 #[inline]
1366 pub fn stats(&self) -> &WorkerStats {
1367 &self.stats
1368 }
1369
1370 #[inline]
1373 fn has_work(&self) -> bool {
1374 let waker = &self.service.wakers[self.worker_id as usize];
1375
1376 let status = waker.status();
1380 if status != 0 {
1381 return true;
1382 }
1383
1384 let summary = waker.snapshot_summary();
1386 if summary != 0 {
1387 return true;
1388 }
1389
1390 if waker.permits() > 0 {
1392 return true;
1393 }
1394
1395 false
1396 }
1397
1398 pub fn set_wait_strategy(&mut self, strategy: WaitStrategy) {
1399 self.wait_strategy = strategy;
1400 }
1401
1402 #[inline(always)]
1403 pub fn run_once(&mut self) -> bool {
1404 let mut did_work = false;
1405
1406 if self.poll_timers() {
1407 did_work = true;
1408 }
1409
1410 if self.process_messages() {
1412 did_work = true;
1413 }
1414
1415 let yielded = self.poll_yield(self.yield_queue.len());
1417 if yielded > 0 {
1418 did_work = true;
1419 }
1420
1421 if self.try_partition_random() {
1423 did_work = true;
1424 }
1425
1426 let rand = self.next_u64();
1433
1434 if !did_work && rand & FULL_SUMMARY_SCAN_CADENCE_MASK == 0 {
1435 let leaf_count = self.service.arena().leaf_count();
1436 if self.try_any_partition_random(leaf_count) {
1437 did_work = true;
1438 }
1439 }
1443
1444 if !did_work {
1445 if self.poll_yield(self.yield_queue.len() as usize) > 0 {
1446 did_work = true;
1447 }
1448
1449 }
1453
1454 did_work
1455 }
1456
1457 #[inline(always)]
1458 pub fn run_once_exhaustive(&mut self) -> bool {
1459 let mut did_work = false;
1460
1461 if self.poll_timers() {
1462 did_work = true;
1463 }
1464
1465 if self.process_messages() {
1467 did_work = true;
1468 }
1469
1470 let yielded = self.poll_yield(self.yield_queue.len());
1472 if yielded > 0 {
1473 did_work = true;
1474 }
1475
1476 if self.try_partition_random() {
1478 did_work = true;
1479 }
1480
1481 if self.try_partition_random() {
1482 did_work = true;
1483 } else if self.try_partition_linear() {
1484 did_work = true;
1485 }
1486
1487 let rand = self.next_u64();
1488
1489 if !did_work {
1490 let leaf_count = self.service.arena().leaf_count();
1491 if self.try_any_partition_random(leaf_count) {
1492 did_work = true;
1493 } else if self.try_any_partition_linear(leaf_count) {
1494 did_work = true;
1495 }
1496 }
1497
1498 if !did_work {
1499 if self.poll_yield(self.yield_queue.len() as usize) > 0 {
1500 did_work = true;
1501 }
1502 if self.poll_yield_steal(1) > 0 {
1503 did_work = true;
1504 }
1505 }
1506
1507 did_work
1508 }
1509
1510 #[inline]
1511 fn run_last_before_park(&mut self) -> bool {
1512 false
1513 }
1514
1515 pub(crate) fn run(&mut self) {
1516 const STACK_SIZE: usize = 2 * 1024 * 1024; crate::runtime::preemption::init_worker_thread_preemption(&self.preemption_requested);
1522
1523 while !self.shutdown.load(Ordering::Relaxed) {
1526 let worker_ptr = self as *mut Self;
1528
1529 let worker_addr = worker_ptr as usize;
1532
1533 let mut generator =
1534 crate::generator::Gn::<()>::new_scoped_opt(STACK_SIZE, move |mut scope| -> usize {
1535 let worker = unsafe { &mut *(worker_addr as *mut Worker<P, NUM_SEGS_P2>) };
1537 let mut spin_count = 0;
1538 let waker_id = worker.worker_id as usize;
1539
1540 let scope_ptr = &mut scope as *mut _ as *mut ();
1542
1543 #[cfg(unix)]
1545 crate::runtime::preemption::set_generator_scope(scope_ptr);
1546
1547 #[cfg(windows)]
1549 worker
1550 .generator_scope_atomic
1551 .store(scope_ptr, std::sync::atomic::Ordering::Release);
1552
1553 loop {
1555 if worker.shutdown.load(Ordering::Relaxed) {
1557 #[cfg(unix)]
1558 crate::runtime::preemption::clear_generator_scope();
1559 #[cfg(windows)]
1560 worker
1561 .generator_scope_atomic
1562 .store(std::ptr::null_mut(), std::sync::atomic::Ordering::Release);
1563 return crate::generator::done();
1564 }
1565
1566 let mut progress = worker.run_once();
1568
1569 let manual_pin_requested = GENERATOR_PIN_REQUESTED.with(|cell| {
1571 let requested = cell.get();
1572 if requested {
1573 cell.set(false); true
1575 } else {
1576 false
1577 }
1578 });
1579
1580 let preemption_requested =
1582 crate::runtime::preemption::check_and_clear_preemption(
1583 &worker.preemption_requested,
1584 );
1585
1586 if manual_pin_requested || preemption_requested {
1587 let task_to_pin = CURRENT_TASK.with(|cell| cell.get());
1589
1590 if !task_to_pin.is_null() {
1591 let task = unsafe { &*task_to_pin };
1592
1593 if task.has_pinned_generator() {
1595 unsafe {
1599 task.set_generator_run_mode(GeneratorRunMode::Switch);
1600 }
1601 scope.yield_(crate::runtime::preemption::GeneratorYieldReason::Cooperative.as_usize());
1603 continue;
1604 } else {
1605 GENERATOR_PIN_TASK.with(|cell| cell.set(task_to_pin));
1608
1609 scope.yield_(crate::runtime::preemption::GeneratorYieldReason::Cooperative.as_usize());
1611
1612 #[cfg(unix)]
1618 crate::runtime::preemption::clear_generator_scope();
1619 #[cfg(windows)]
1620 worker.generator_scope_atomic.store(
1621 std::ptr::null_mut(),
1622 std::sync::atomic::Ordering::Release,
1623 );
1624 return crate::generator::done();
1625 }
1626 }
1627 }
1629
1630 if !progress {
1631 spin_count += 1;
1632
1633 if spin_count >= worker.wait_strategy.spin_before_sleep {
1634 core::hint::spin_loop();
1635 progress = worker.run_once_exhaustive();
1636
1637 if progress {
1638 spin_count = 0;
1639 scope.yield_(crate::runtime::preemption::GeneratorYieldReason::Cooperative.as_usize());
1640 continue;
1641 }
1642 } else if spin_count < worker.wait_strategy.spin_before_sleep {
1643 core::hint::spin_loop();
1644 if spin_count % 100 == 0 {
1645 scope.yield_(crate::runtime::preemption::GeneratorYieldReason::Cooperative.as_usize());
1646 }
1647 continue;
1648 }
1649
1650 let park_duration = worker.calculate_park_duration();
1652
1653 worker.service.wakers[waker_id].sync_partition_summary(
1655 worker.partition_start,
1656 worker.partition_end,
1657 &worker.service.summary().leaf_words,
1658 );
1659
1660 match park_duration {
1663 Some(duration) if duration.is_zero() => {}
1664 Some(duration) => {
1665 worker.service.wakers[waker_id].acquire_timeout(duration);
1667 }
1668 None => {
1669 worker.service.wakers[waker_id]
1671 .acquire_timeout(Duration::from_millis(250));
1672 }
1673 }
1674 spin_count = 0;
1675 } else {
1676 spin_count = 0;
1677 scope.yield_(crate::runtime::preemption::GeneratorYieldReason::Cooperative.as_usize());
1678 }
1679 }
1680 });
1681
1682 for _ in &mut generator {
1685 }
1687
1688 let task_ptr = GENERATOR_PIN_TASK.with(|cell| {
1691 let ptr = cell.get();
1692 if !ptr.is_null() {
1693 cell.set(ptr::null_mut()); }
1695 ptr
1696 });
1697
1698 if !task_ptr.is_null() {
1699 let task = unsafe { &*task_ptr };
1703
1704 let boxed_iter: Box<dyn Iterator<Item = usize> + 'static> = unsafe {
1707 std::mem::transmute(Box::new(generator) as Box<dyn Iterator<Item = usize>>)
1708 };
1709
1710 unsafe {
1712 task.pin_generator(boxed_iter, GeneratorRunMode::Switch);
1713 }
1714
1715 if let Some(slot) = task.slot() {
1718 let handle = TaskHandle::from_task(task);
1719
1720 task.mark_yielded();
1722 task.record_yield();
1723
1724 self.enqueue_yield(handle);
1726
1727 self.stats.yielded_count += 1;
1729 }
1730
1731 } else {
1733 break;
1735 }
1736 }
1737 }
1738
1739 #[inline]
1742 fn calculate_park_duration(&mut self) -> Option<Duration> {
1743 if let Some(next_deadline_ns) = self.timer_wheel.next_deadline() {
1745 let now_ns = self.timer_wheel.now_ns();
1746
1747 if next_deadline_ns > now_ns {
1748 let timer_duration_ns = next_deadline_ns - now_ns;
1749 let timer_duration = Duration::from_nanos(timer_duration_ns);
1750
1751 let duration = match self.wait_strategy.park_timeout {
1753 Some(strategy_timeout) => Some(strategy_timeout.min(timer_duration)),
1754 None => Some(timer_duration),
1755 };
1756 return duration;
1757 } else {
1758 return Some(Duration::ZERO);
1760 }
1761 }
1762
1763 const MAX_PARK_DURATION: Duration = Duration::from_millis(250);
1765 match self.wait_strategy.park_timeout {
1766 Some(timeout) => Some(timeout.min(MAX_PARK_DURATION)),
1767 None => Some(MAX_PARK_DURATION),
1768 }
1769 }
1770
1771 fn process_messages(&mut self) -> bool {
1772 let mut progress = false;
1773 loop {
1774 match self.receiver.try_pop() {
1775 Ok(message) => {
1776 if self.handle_message(message) {
1777 progress = true;
1778 }
1779 }
1780 Err(PopError::Empty) | Err(PopError::Timeout) => {
1781 break;
1783 }
1784 Err(PopError::Closed) => break,
1785 }
1786 }
1787 progress
1788 }
1789
1790 #[inline(always)]
1791 fn handle_message(&mut self, message: WorkerMessage) -> bool {
1792 match message {
1793 WorkerMessage::ScheduleTimer { timer } => self.handle_timer_schedule(timer),
1794 WorkerMessage::ScheduleBatch { timers } => {
1795 let mut scheduled = false;
1796 for timer in timers.into_vec() {
1797 scheduled |= self.handle_timer_schedule(timer);
1798 }
1799 scheduled
1800 }
1801 WorkerMessage::CancelTimer {
1802 worker_id,
1803 timer_id,
1804 } => self.cancel_remote_timer(worker_id, timer_id),
1805 WorkerMessage::WorkerCountChanged { new_worker_count } => {
1806 self.cached_worker_count = new_worker_count as usize;
1807 true
1808 }
1809 WorkerMessage::RebalancePartitions {
1810 partition_start,
1811 partition_end,
1812 } => self.handle_rebalance_partitions(partition_start, partition_end),
1813 WorkerMessage::MigrateTasks { task_handles } => self.handle_migrate_tasks(task_handles),
1814 WorkerMessage::ReportHealth => {
1815 self.handle_report_health();
1816 true
1817 }
1818 WorkerMessage::GracefulShutdown => {
1819 self.handle_graceful_shutdown();
1820 true
1821 }
1822 WorkerMessage::Shutdown => {
1823 self.shutdown.store(true, Ordering::Release);
1824 true
1825 }
1826 WorkerMessage::Noop => true,
1827 }
1828 }
1829
1830 fn handle_timer_schedule(&mut self, timer: TimerSchedule) -> bool {
1831 let (handle, deadline_ns) = timer.into_parts();
1832 if handle.worker_id() != self.worker_id {
1833 return false;
1834 }
1835 self.enqueue_timer_entry(deadline_ns, handle).is_some()
1836 }
1837
1838 fn handle_rebalance_partitions(
1839 &mut self,
1840 partition_start: usize,
1841 partition_end: usize,
1842 ) -> bool {
1843 self.partition_start = partition_start;
1844 self.partition_end = partition_end;
1845 self.partition_len = partition_end.saturating_sub(partition_start);
1846
1847 let waker_id = self.worker_id as usize;
1850 self.service.wakers[waker_id].sync_partition_summary(
1851 partition_start,
1852 partition_end,
1853 &self.service.summary().leaf_words,
1854 );
1855
1856 true
1857 }
1858
1859 fn handle_migrate_tasks(&mut self, task_handles: Vec<TaskHandle>) -> bool {
1860 if task_handles.is_empty() {
1863 return false;
1864 }
1865
1866 let count = task_handles.len();
1867 for handle in task_handles {
1868 self.enqueue_yield(handle);
1869 }
1870
1871 #[cfg(debug_assertions)]
1872 {
1873 eprintln!(
1874 "[Worker {}] Received {} migrated tasks",
1875 self.worker_id, count
1876 );
1877 }
1878
1879 true
1880 }
1881
1882 fn handle_report_health(&mut self) {
1883 let snapshot = WorkerHealthSnapshot {
1885 worker_id: self.worker_id,
1886 timestamp_ns: self.timer_wheel.now_ns(),
1887 stats: self.stats.clone(),
1888 yield_queue_len: self.yield_queue.len(),
1889 mpsc_queue_len: 0, active_leaf_partitions: (self.partition_start..self.partition_end).collect(),
1891 has_work: self.has_work(),
1892 };
1893
1894 #[cfg(debug_assertions)]
1897 {
1898 eprintln!(
1899 "[Worker {}] Health: tasks_polled={}, yield_queue={}, mpsc_queue={}, partitions={:?}, has_work={}",
1900 snapshot.worker_id,
1901 snapshot.stats.tasks_polled,
1902 snapshot.yield_queue_len,
1903 snapshot.mpsc_queue_len,
1904 snapshot.active_leaf_partitions,
1905 snapshot.has_work
1906 );
1907 }
1908 let _ = snapshot; }
1910
1911 fn handle_graceful_shutdown(&mut self) {
1912 #[cfg(debug_assertions)]
1916 {
1917 eprintln!(
1918 "[Worker {}] Received graceful shutdown signal",
1919 self.worker_id
1920 );
1921 }
1922
1923 loop {
1925 match self.receiver.try_pop() {
1926 Ok(message) => match message {
1927 WorkerMessage::Shutdown | WorkerMessage::GracefulShutdown => break,
1928 _ => {
1929 self.handle_message(message);
1930 }
1931 },
1932 Err(_) => break,
1933 }
1934 }
1935
1936 self.shutdown.store(true, Ordering::Release);
1938
1939 #[cfg(debug_assertions)]
1940 {
1941 eprintln!("[Worker {}] Set shutdown flag", self.worker_id);
1942 }
1943 }
1944
1945 fn enqueue_timer_entry(&mut self, deadline_ns: u64, handle: TimerHandle) -> Option<u64> {
1946 match self.timer_wheel.schedule_timer(deadline_ns, handle) {
1947 Ok(timer_id) => Some(timer_id),
1948 Err(_) => None, }
1950 }
1951
1952 fn poll_timers(&mut self) -> bool {
1953 let now_ns = self.timer_wheel.now_ns();
1954 let expired = self
1955 .timer_wheel
1956 .poll(now_ns, TIMER_EXPIRE_BUDGET, &mut self.timer_output);
1957 if expired == 0 {
1958 return false;
1959 }
1960
1961 let mut progress = false;
1962 for idx in 0..expired {
1963 let (timer_id, deadline_ns, handle) = self.timer_output[idx];
1964 if self.process_timer_entry(timer_id, deadline_ns, handle) {
1965 progress = true;
1966 }
1967 }
1968 progress
1969 }
1970
1971 fn process_timer_entry(
1972 &mut self,
1973 timer_id: u64,
1974 _deadline_ns: u64,
1975 handle: TimerHandle,
1976 ) -> bool {
1977 if handle.worker_id() != self.worker_id {
1978 return false;
1979 }
1980
1981 let slot = unsafe { handle.task_slot().as_ref() };
1982 let task_ptr = slot.task_ptr();
1983 if task_ptr.is_null() {
1984 return false;
1985 }
1986
1987 let task = unsafe { &*task_ptr };
1988 if task.global_id() != handle.task_id() {
1989 return false;
1990 }
1991
1992 task.schedule();
1993 self.stats.timer_fires = self.stats.timer_fires.saturating_add(1);
2017 true
2018 }
2019
2020 fn cancel_timer(&mut self, timer: &Timer) -> bool {
2021 if !timer.is_scheduled() {
2022 return false;
2023 }
2024
2025 let Some(worker_id) = timer.worker_id() else {
2026 return false;
2027 };
2028
2029 let timer_id = timer.timer_id();
2030 if timer_id == 0 {
2031 return false;
2032 }
2033
2034 if worker_id == self.worker_id {
2035 if self
2036 .timer_wheel
2037 .cancel_timer(timer_id)
2038 .unwrap_or(None)
2039 .is_some()
2040 {
2041 timer.mark_cancelled(timer_id);
2042 return true;
2043 }
2044 return false;
2045 }
2046
2047 self.service
2048 .post_cancel_message(self.worker_id, worker_id, timer_id)
2049 }
2050
2051 #[inline]
2052 fn cancel_remote_timer(&mut self, worker_id: u32, timer_id: u64) -> bool {
2053 if worker_id != self.worker_id {
2054 return false;
2055 }
2056
2057 self.service
2058 .post_cancel_message(self.worker_id, worker_id, timer_id)
2059 }
2060
2061 #[inline(always)]
2062 pub fn next_u64(&mut self) -> u64 {
2063 self.rng.next()
2064 }
2065
2066 #[inline(always)]
2067 pub fn poll_yield(&mut self, max: usize) -> usize {
2068 let mut count = 0;
2069 while let Some(handle) = self.try_acquire_local_yield() {
2070 self.stats.yield_queue_polls += 1;
2071 self.poll_handle(handle);
2072 count += 1;
2073 if count >= max {
2074 break;
2075 }
2076 }
2077 count
2078 }
2079
2080 #[inline(always)]
2081 pub fn poll_yield_steal(&mut self, max: usize) -> usize {
2082 let mut count = 0;
2083 while let Some(handle) = self.try_steal_yielded() {
2084 self.stats.yield_queue_polls += 1;
2085 self.poll_handle(handle);
2086 count += 1;
2087 if count >= max {
2088 break;
2089 }
2090 }
2091 count
2092 }
2093
2094 #[inline(always)]
2095 pub fn run_until_idle(&mut self) -> usize {
2096 let mut processed = 0;
2097 while self.run_once() {
2098 processed += 1;
2099 }
2100 processed += self.poll_yield_steal(32);
2101 while self.run_once() {
2102 processed += 1;
2103 }
2104 processed
2105 }
2106
2107 #[inline(always)]
2108 pub fn poll_blocking(&mut self, strategy: &WaitStrategy) -> bool {
2109 let mut spins = 0usize;
2110 loop {
2111 if self.run_once() {
2112 return true;
2113 }
2114
2115 if !self.has_work() {
2117 if strategy.spin_before_sleep > 0 && spins < strategy.spin_before_sleep {
2118 spins += 1;
2119 core::hint::spin_loop();
2120 continue;
2121 }
2122
2123 spins = 0;
2124
2125 let park_duration = self.calculate_park_duration();
2127
2128 let waker_id = self.worker_id as usize;
2130 self.service.wakers[waker_id].sync_partition_summary(
2131 self.partition_start,
2132 self.partition_end,
2133 &self.service.summary().leaf_words,
2134 );
2135
2136 match park_duration {
2137 Some(timeout) if timeout.is_zero() => {
2138 return false;
2140 }
2141 Some(timeout) => {
2142 let timed_out = !self.service.wakers[waker_id].acquire_timeout(timeout);
2145
2146 if timed_out {
2147 return false;
2149 }
2150 }
2151 None => {
2152 self.service.wakers[waker_id].acquire();
2155 }
2156 }
2157 }
2158 }
2159 }
2160
2161 #[inline(always)]
2162 fn try_acquire_task(&mut self, leaf_idx: usize) -> Option<TaskHandle> {
2163 let signals_per_leaf = self.service.arena().signals_per_leaf();
2164 if signals_per_leaf == 0 {
2165 return None;
2166 }
2167
2168 let mask = if signals_per_leaf >= 64 {
2169 u64::MAX
2170 } else {
2171 (1u64 << signals_per_leaf) - 1
2172 };
2173
2174 self.stats.leaf_summary_checks = self.stats.leaf_summary_checks.saturating_add(1);
2175 let mut available =
2176 self.service.summary().leaf_words[leaf_idx].load(Ordering::Acquire) & mask;
2177 if available == 0 {
2178 self.stats.empty_scans = self.stats.empty_scans.saturating_add(1);
2179 return None;
2180 }
2181 self.stats.leaf_summary_hits = self.stats.leaf_summary_hits.saturating_add(1);
2182
2183 let signals = self.service.arena().active_signals(leaf_idx);
2184 let mut attempts = signals_per_leaf;
2185
2186 while available != 0 && attempts > 0 {
2187 let start = (self.next_u64() as usize) % signals_per_leaf;
2188
2189 let (signal_idx, signal) = loop {
2190 let candidate = bits::find_nearest(available, start as u64);
2191 if candidate >= 64 {
2192 self.stats.leaf_summary_checks =
2193 self.stats.leaf_summary_checks.saturating_add(1);
2194 available =
2195 self.service.summary().leaf_words[leaf_idx].load(Ordering::Acquire) & mask;
2196 if available == 0 {
2197 self.stats.empty_scans = self.stats.empty_scans.saturating_add(1);
2198 return None;
2199 }
2200 self.stats.leaf_summary_hits = self.stats.leaf_summary_hits.saturating_add(1);
2201 continue;
2202 }
2203 let bit_index = candidate as usize;
2204 let sig = unsafe { &*self.service.arena().task_signal_ptr(leaf_idx, bit_index) };
2205 let bits = sig.load(Ordering::Acquire);
2206 if bits == 0 {
2207 available &= !(1u64 << bit_index);
2208 self.service
2209 .summary()
2210 .mark_signal_inactive(leaf_idx, bit_index);
2211 attempts -= 1;
2212 continue;
2213 }
2214 break (bit_index, sig);
2215 };
2216
2217 let bits = signal.load(Ordering::Acquire);
2218 let bit_seed = (self.next_u64() & 63) as u64;
2219 let bit_candidate = bits::find_nearest(bits, bit_seed);
2220 let bit_idx = if bit_candidate < 64 {
2221 bit_candidate as u64
2222 } else {
2223 available &= !(1u64 << signal_idx);
2224 attempts -= 1;
2225 continue;
2226 };
2227
2228 let (remaining, acquired) = signal.try_acquire(bit_idx);
2229 if !acquired {
2230 self.stats.cas_failures = self.stats.cas_failures.saturating_add(1);
2231 available &= !(1u64 << signal_idx);
2232 attempts -= 1;
2233 continue;
2234 }
2235
2236 let remaining_mask = remaining;
2237 if remaining_mask == 0 {
2238 self.service
2239 .summary()
2240 .mark_signal_inactive(leaf_idx, signal_idx);
2241 }
2242
2243 self.stats.signal_polls = self.stats.signal_polls.saturating_add(1);
2244 let slot_idx = signal_idx * 64 + bit_idx as usize;
2245 let task = unsafe { self.service.arena().task(leaf_idx, slot_idx) };
2246 return Some(TaskHandle::from_task(task));
2247 }
2248
2249 self.stats.empty_scans = self.stats.empty_scans.saturating_add(1);
2250 None
2251 }
2252
2253 #[inline(always)]
2254 fn enqueue_yield(&mut self, handle: TaskHandle) {
2255 let was_empty = self.yield_queue.push_with_status(handle);
2256 if was_empty {
2257 self.service.wakers[self.worker_id as usize].mark_yield();
2259 }
2260 }
2261
2262 #[inline(always)]
2263 fn try_acquire_local_yield(&mut self) -> Option<TaskHandle> {
2264 let (item, was_last) = self.yield_queue.pop_with_status();
2265 if let Some(handle) = item {
2266 if was_last {
2267 self.service.wakers[self.worker_id as usize].try_unmark_yield();
2269 }
2270 return Some(handle);
2271 }
2272 None
2273 }
2274
2275 #[inline(always)]
2276 fn try_steal_yielded(&mut self) -> Option<TaskHandle> {
2277 let next_rand = self.next_u64();
2278 let (attempts, success) =
2279 self.service
2280 .try_yield_steal(self.worker_id as usize, 1, next_rand as usize);
2281 if success {
2282 self.stats.steal_attempts += attempts;
2283 self.stats.steal_successes += 1;
2284 self.try_acquire_local_yield()
2285 } else {
2286 None
2287 }
2288 }
2289
2290 #[inline(always)]
2291 fn process_leaf(&mut self, leaf_idx: usize) -> bool {
2292 if let Some(handle) = self.try_acquire_task(leaf_idx) {
2293 self.poll_handle(handle);
2294 return true;
2295 }
2296 false
2297 }
2298
2299 #[inline(always)]
2300 fn try_partition_random(&mut self) -> bool {
2301 let partition_len = self.partition_len;
2302
2303 if partition_len.is_power_of_two() {
2304 let mask = partition_len - 1;
2305 for _ in 0..partition_len {
2306 let start_offset = self.next_u64() as usize & mask;
2307 let leaf_idx = self.partition_start + start_offset;
2308 if self.process_leaf(leaf_idx) {
2309 return true;
2310 }
2311 }
2312 } else {
2313 for _ in 0..partition_len {
2314 let start_offset = self.next_u64() as usize % partition_len;
2315 let leaf_idx = self.partition_start + start_offset;
2316 if self.process_leaf(leaf_idx) {
2317 return true;
2318 }
2319 }
2320 }
2321
2322 false
2323 }
2324
2325 #[inline(always)]
2326 fn try_partition_linear(&mut self) -> bool {
2327 let partition_start = self.partition_start;
2328 let partition_len = self.partition_len;
2329
2330 for offset in 0..partition_len {
2331 let leaf_idx = partition_start + offset;
2332 if self.process_leaf(leaf_idx) {
2333 return true;
2334 }
2335 }
2336
2337 false
2338 }
2339
2340 #[inline(always)]
2341 fn try_any_partition_random(&mut self, leaf_count: usize) -> bool {
2342 if leaf_count.is_power_of_two() {
2343 let leaf_mask = leaf_count - 1;
2344 for _ in 0..leaf_count {
2345 let leaf_idx = self.next_u64() as usize & leaf_mask;
2346 self.stats.leaf_steal_attempts += 1;
2347 if self.process_leaf(leaf_idx) {
2348 self.stats.leaf_steal_successes += 1;
2349 return true;
2350 }
2351 }
2352 } else {
2353 for _ in 0..leaf_count {
2354 let leaf_idx = self.next_u64() as usize % leaf_count;
2355 self.stats.leaf_steal_attempts += 1;
2356 if self.process_leaf(leaf_idx) {
2357 self.stats.leaf_steal_successes += 1;
2358 return true;
2359 }
2360 }
2361 }
2362 false
2363 }
2364
2365 #[inline(always)]
2366 fn try_any_partition_linear(&mut self, leaf_count: usize) -> bool {
2367 for leaf_idx in 0..leaf_count {
2368 self.stats.leaf_steal_attempts += 1;
2369 if self.process_leaf(leaf_idx) {
2370 self.stats.leaf_steal_successes += 1;
2371 return true;
2372 }
2373 }
2374 false
2375 }
2376
2377 #[inline(always)]
2378 fn poll_handle(&mut self, handle: TaskHandle) {
2379 let task = handle.task();
2380
2381 if task.has_pinned_generator() {
2383 self.poll_task_with_pinned_generator(handle, task);
2385 return;
2386 }
2387
2388 struct ActiveTaskGuard;
2389 impl Drop for ActiveTaskGuard {
2390 fn drop(&mut self) {
2391 CURRENT_TASK.set(std::ptr::null_mut());
2392 }
2393 }
2394 CURRENT_TASK.set(task as *const Task as *mut Task);
2395 let _active_guard = ActiveTaskGuard;
2396
2397 self.stats.tasks_polled += 1;
2398
2399 if !task.is_yielded() {
2400 task.begin();
2401 } else {
2402 task.record_yield();
2403 task.clear_yielded();
2404 }
2405
2406 self.poll_task(handle, task);
2407 }
2408
2409 fn poll_task_with_pinned_generator(&mut self, handle: TaskHandle, task: &Task) {
2411 struct ActiveTaskGuard;
2412 impl Drop for ActiveTaskGuard {
2413 fn drop(&mut self) {
2414 CURRENT_TASK.set(std::ptr::null_mut());
2415 }
2416 }
2417 CURRENT_TASK.set(task as *const Task as *mut Task);
2418 let _active_guard = ActiveTaskGuard;
2419
2420 self.stats.tasks_polled += 1;
2421
2422 let mode = task.generator_run_mode();
2423
2424 match mode {
2425 GeneratorRunMode::Switch => {
2426 self.poll_task_switch_mode(handle, task);
2429 }
2430 GeneratorRunMode::Poll => {
2431 self.poll_task_poll_mode(handle, task);
2434 }
2435 GeneratorRunMode::None => {
2436 task.finish();
2438 self.stats.completed_count = self.stats.completed_count.saturating_add(1);
2439 }
2440 }
2441 }
2442
2443 fn poll_task_switch_mode(&mut self, handle: TaskHandle, task: &Task) {
2446 let generator = unsafe { task.get_pinned_generator() };
2447 if let Some(task_gen_iter) = generator {
2448 match task_gen_iter.next() {
2450 Some(_) => {
2451 let future_ptr = task.take_future();
2454
2455 if future_ptr.is_none() {
2456 unsafe {
2458 task.take_pinned_generator();
2459 }
2460 task.finish();
2461 self.stats.completed_count = self.stats.completed_count.saturating_add(1);
2462 } else {
2463 unsafe {
2466 task.take_pinned_generator();
2467 }
2468 self.create_poll_mode_generator(handle, task);
2469 }
2470 }
2471 None => {
2472 unsafe {
2474 task.take_pinned_generator();
2475 }
2476 task.finish();
2477 self.stats.completed_count = self.stats.completed_count.saturating_add(1);
2478 }
2479 }
2480 } else {
2481 task.finish();
2483 self.stats.completed_count = self.stats.completed_count.saturating_add(1);
2484 }
2485 }
2486
2487 fn poll_task_poll_mode(&mut self, handle: TaskHandle, task: &Task) {
2489 let generator = unsafe { task.get_pinned_generator() };
2490 if let Some(task_gen_iter) = generator {
2491 match task_gen_iter.next() {
2493 Some(status) => {
2494 if status == 1 || status == 2 {
2496 unsafe {
2499 task.take_pinned_generator();
2500 }
2501 task.finish();
2502 self.stats.completed_count = self.stats.completed_count.saturating_add(1);
2503 } else if task.is_yielded() {
2504 task.record_yield();
2506 self.stats.yielded_count += 1;
2507 self.enqueue_yield(handle);
2508 } else {
2509 self.stats.waiting_count += 1;
2512 task.finish();
2513 }
2514 }
2515 None => {
2516 unsafe {
2518 task.take_pinned_generator();
2519 }
2520 task.finish();
2521 self.stats.completed_count = self.stats.completed_count.saturating_add(1);
2522 }
2523 }
2524 } else {
2525 task.finish();
2527 self.stats.completed_count = self.stats.completed_count.saturating_add(1);
2528 }
2529 }
2530
2531 fn create_poll_mode_generator(&mut self, handle: TaskHandle, task: &Task) {
2534 const STACK_SIZE: usize = 512 * 1024; let task_addr = task as *const Task as usize;
2538 let handle_copy = handle;
2539
2540 let generator =
2541 crate::generator::Gn::<()>::new_scoped_opt(STACK_SIZE, move |mut scope| -> usize {
2542 let task = unsafe { &*(task_addr as *const Task) };
2543
2544 loop {
2545 let waker = unsafe { task.waker_yield() };
2547 let mut cx = Context::from_waker(&waker);
2548
2549 let poll_result = unsafe { task.poll_future(&mut cx) };
2551
2552 match poll_result {
2553 Some(Poll::Ready(())) => {
2554 return 1; }
2557 Some(Poll::Pending) => {
2558 scope.yield_(crate::runtime::preemption::GeneratorYieldReason::Cooperative.as_usize());
2560 }
2561 None => {
2562 return 2; }
2565 }
2566 }
2567 });
2568
2569 let boxed_iter: Box<dyn Iterator<Item = usize> + 'static> =
2571 unsafe { std::mem::transmute(Box::new(generator) as Box<dyn Iterator<Item = usize>>) };
2572
2573 unsafe {
2574 task.pin_generator(boxed_iter, GeneratorRunMode::Poll);
2575 }
2576
2577 self.enqueue_yield(handle_copy);
2579 }
2580
2581 fn poll_task(&mut self, handle: TaskHandle, task: &Task) {
2582 let waker = unsafe { task.waker_yield() };
2583 let mut cx = Context::from_waker(&waker);
2584 let poll_result = unsafe { task.poll_future(&mut cx) };
2585
2586 match poll_result {
2587 Some(Poll::Ready(())) => {
2588 task.finish();
2589 self.stats.completed_count = self.stats.completed_count.saturating_add(1);
2590 CURRENT_TASK.set(std::ptr::null_mut());
2591 if let Some(ptr) = task.take_future() {
2592 unsafe { FutureAllocator::drop_boxed(ptr) };
2593 }
2594 }
2595 Some(Poll::Pending) => {
2596 if task.is_yielded() {
2597 task.record_yield();
2598 self.stats.yielded_count += 1;
2599 self.enqueue_yield(handle);
2603 } else {
2604 self.stats.waiting_count += 1;
2605 task.finish();
2606 }
2607 }
2608 None => {
2609 self.stats.completed_count += 1;
2610 task.finish();
2611 }
2612 }
2613 }
2614
2615 fn schedule_timer_for_current_task(&mut self, timer: &Timer, delay: Duration) -> Option<u64> {
2616 let task_ptr = CURRENT_TASK.with(|cell| cell.get()) as *mut Task;
2617 if task_ptr.is_null() {
2618 return None;
2619 }
2620
2621 let task = unsafe { &*task_ptr };
2622 let Some(slot) = task.slot() else {
2623 return None;
2624 };
2625
2626 if timer.is_scheduled() {
2627 if let Some(worker_id) = timer.worker_id() {
2628 if worker_id == self.worker_id {
2629 let existing_id = timer.timer_id();
2630 if self
2631 .timer_wheel
2632 .cancel_timer(existing_id)
2633 .unwrap_or(None)
2634 .is_some()
2635 {
2636 timer.reset();
2637 }
2638 } else {
2639 }
2641 }
2642 }
2643
2644 let delay_ns = delay.as_nanos().min(u128::from(u64::MAX)) as u64;
2645 let now = self.timer_wheel.now_ns();
2646 let deadline_ns = now.saturating_add(delay_ns);
2647
2648 let handle = timer.prepare(slot, task.global_id(), self.worker_id);
2649
2650 if let Some(timer_id) = self.enqueue_timer_entry(deadline_ns, handle) {
2651 timer.commit_schedule(timer_id, deadline_ns);
2652 Some(deadline_ns)
2653 } else {
2654 timer.reset();
2655 None
2656 }
2657 }
2658}
2659
2660fn make_task_waker(slot: &TaskSlot) -> Waker {
2661 let ptr = slot as *const TaskSlot as *const ();
2662 unsafe { Waker::from_raw(RawWaker::new(ptr, &TASK_WAKER_VTABLE)) }
2663}
2664
2665unsafe fn task_waker_clone(ptr: *const ()) -> RawWaker {
2666 RawWaker::new(ptr, &TASK_WAKER_VTABLE)
2667}
2668
2669unsafe fn task_waker_wake(ptr: *const ()) {
2670 unsafe {
2671 let slot = &*(ptr as *const TaskSlot);
2672 let task_ptr = slot.task_ptr();
2673 if task_ptr.is_null() {
2674 return;
2675 }
2676 let task = &*task_ptr;
2677 task.schedule();
2678 }
2679}
2680
2681unsafe fn task_waker_wake_by_ref(ptr: *const ()) {
2682 unsafe {
2683 task_waker_wake(ptr);
2684 }
2685}
2686
2687unsafe fn task_waker_drop(_: *const ()) {}
2688
2689static TASK_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
2690 task_waker_clone,
2691 task_waker_wake,
2692 task_waker_wake_by_ref,
2693 task_waker_drop,
2694);
2695
2696pub fn schedule_timer_for_current_task(
2698 _cx: &Context<'_>,
2699 timer: &Timer,
2700 delay: Duration,
2701) -> Option<u64> {
2702 let timer_wheel = current_timer_wheel()?;
2703 let task = current_task()?;
2704 let worker_id = timer_wheel.worker_id();
2705 let slot = task.slot()?;
2706
2707 if timer.is_scheduled() {
2709 if let Some(existing_worker_id) = timer.worker_id() {
2710 if existing_worker_id == worker_id {
2711 let existing_id = timer.timer_id();
2712 if timer_wheel
2713 .cancel_timer(existing_id)
2714 .unwrap_or(None)
2715 .is_some()
2716 {
2717 timer.reset();
2718 }
2719 } else {
2720 let existing_id = timer.timer_id();
2722 let ops = CROSS_WORKER_OPS.with(|cell| cell.get());
2723 if let Some(ops) = ops {
2724 if ops.post_cancel_message(worker_id, existing_worker_id, existing_id) {
2725 timer.reset();
2726 }
2727 }
2728 }
2729 }
2730 }
2731
2732 let delay_ns = delay.as_nanos().min(u128::from(u64::MAX)) as u64;
2733 let now = timer_wheel.now_ns();
2734 let deadline_ns = now.saturating_add(delay_ns);
2735
2736 let handle = timer.prepare(slot, task.global_id(), worker_id);
2737
2738 match timer_wheel.schedule_timer(deadline_ns, handle) {
2739 Ok(timer_id) => {
2740 timer.commit_schedule(timer_id, deadline_ns);
2741 Some(deadline_ns)
2742 }
2743 Err(_) => {
2744 timer.reset();
2745 None
2746 }
2747 }
2748}
2749
2750pub fn cancel_timer_for_current_task(timer: &Timer) -> bool {
2752 if !timer.is_scheduled() {
2753 return false;
2754 }
2755
2756 let Some(timer_worker_id) = timer.worker_id() else {
2757 return false;
2758 };
2759
2760 let timer_id = timer.timer_id();
2761 if timer_id == 0 {
2762 return false;
2763 }
2764
2765 let timer_wheel = current_timer_wheel();
2766 let Some(timer_wheel) = timer_wheel else {
2767 return false;
2768 };
2769
2770 let worker_id = timer_wheel.worker_id();
2771
2772 if timer_worker_id == worker_id {
2773 if timer_wheel.cancel_timer(timer_id).unwrap_or(None).is_some() {
2774 timer.mark_cancelled(timer_id);
2775 return true;
2776 }
2777 return false;
2778 }
2779
2780 let ops = CROSS_WORKER_OPS.with(|cell| cell.get());
2781 let Some(ops) = ops else {
2782 return false;
2783 };
2784
2785 if ops.post_cancel_message(worker_id, timer_worker_id, timer_id) {
2787 timer.mark_cancelled(timer_id);
2788 true
2789 } else {
2790 false
2791 }
2792}
2793
2794pub fn current_worker_now_ns() -> Option<u64> {
2796 let timer_wheel = current_timer_wheel()?;
2797 Some(timer_wheel.now_ns())
2798}