parallel_processor/execution_manager/
scheduler.rs1use std::{
2 cell::UnsafeCell,
3 sync::{
4 atomic::{AtomicUsize, Ordering},
5 Arc,
6 },
7};
8
9use crate::execution_manager::notifier::Notifier;
10
11pub struct Scheduler {
12 max_threads: usize,
13 current_running: AtomicUsize,
14 notifier: Notifier,
15}
16
17impl Scheduler {
18 pub fn new(max_threads: usize) -> Arc<Self> {
19 Arc::new(Self {
20 max_threads,
21 current_running: AtomicUsize::new(0),
22 notifier: Notifier::new(),
23 })
24 }
25
26 pub fn max_threads(&self) -> usize {
27 self.max_threads
28 }
29
30 pub fn current_running(&self) -> usize {
31 self.current_running.load(Ordering::Relaxed)
32 }
33}
34
35thread_local! {
36 static SCHEDULER_GUARD: UnsafeCell<Option<SchedulerGuard>> = const { UnsafeCell::new(None) };
37}
38
39struct SchedulerGuard {
40 scheduler: Arc<Scheduler>,
41}
42
43impl Drop for SchedulerGuard {
44 fn drop(&mut self) {
45 self.scheduler
46 .current_running
47 .fetch_sub(1, Ordering::Relaxed);
48 self.scheduler.notifier.notify_one();
49 }
50}
51
52pub fn get_current_scheduler() -> Arc<Scheduler> {
53 SCHEDULER_GUARD.with(|guard| {
54 let guard = unsafe { &*guard.get() };
55 guard.as_ref().map_or_else(
56 || panic!("Scheduler is not initialized for the current thread"),
57 |g| g.scheduler.clone(),
58 )
59 })
60}
61
62pub fn init_current_thread(scheduler: Arc<Scheduler>) {
63 SCHEDULER_GUARD.with(|guard| {
64 let guard = unsafe { &mut *guard.get() };
65 *guard = Some(SchedulerGuard {
70 scheduler: scheduler.clone(),
71 });
72 scheduler.current_running.fetch_add(1, Ordering::Relaxed);
73 });
74}
75
76pub fn uninit_current_thread() {
77 SCHEDULER_GUARD.with(|guard| {
78 let guard = unsafe { &mut *guard.get() };
79 *guard = None;
80 });
81}
82
83#[inline(never)]
84#[cold]
85pub fn run_blocking_op<T>(f: impl FnOnce() -> T) -> T {
86 let Some(guard) = SCHEDULER_GUARD.with(|guard| unsafe { &mut *guard.get() }.as_mut()) else {
87 return f();
88 };
89
90 let active = guard
91 .scheduler
92 .current_running
93 .fetch_sub(1, Ordering::Relaxed);
94 guard.scheduler.notifier.notify_one();
95
96 assert!(active > 0);
97
98 let value = f();
99
100 fn reserve_thread(guard: &SchedulerGuard) -> bool {
101 guard
102 .scheduler
103 .current_running
104 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
105 if v < guard.scheduler.max_threads {
106 Some(v + 1)
107 } else {
108 None
109 }
110 })
111 .is_ok()
112 }
113
114 guard
115 .scheduler
116 .notifier
117 .wait_for_condition(|| reserve_thread(guard));
118 value
119}