parallel_processor/execution_manager/
scheduler.rs

1use std::{
2    cell::UnsafeCell,
3    sync::{
4        atomic::{AtomicUsize, Ordering},
5        Arc,
6    },
7};
8
9use crate::execution_manager::notifier::Notifier;
10
11pub struct Scheduler {
12    max_threads: usize,
13    current_running: AtomicUsize,
14    notifier: Notifier,
15}
16
17impl Scheduler {
18    pub fn new(max_threads: usize) -> Arc<Self> {
19        Arc::new(Self {
20            max_threads,
21            current_running: AtomicUsize::new(0),
22            notifier: Notifier::new(),
23        })
24    }
25
26    pub fn max_threads(&self) -> usize {
27        self.max_threads
28    }
29
30    pub fn current_running(&self) -> usize {
31        self.current_running.load(Ordering::Relaxed)
32    }
33}
34
35thread_local! {
36    static SCHEDULER_GUARD: UnsafeCell<Option<SchedulerGuard>> = const { UnsafeCell::new(None) };
37}
38
39struct SchedulerGuard {
40    scheduler: Arc<Scheduler>,
41}
42
43impl Drop for SchedulerGuard {
44    fn drop(&mut self) {
45        self.scheduler
46            .current_running
47            .fetch_sub(1, Ordering::Relaxed);
48        self.scheduler.notifier.notify_one();
49    }
50}
51
52pub fn get_current_scheduler() -> Arc<Scheduler> {
53    SCHEDULER_GUARD.with(|guard| {
54        let guard = unsafe { &*guard.get() };
55        guard.as_ref().map_or_else(
56            || panic!("Scheduler is not initialized for the current thread"),
57            |g| g.scheduler.clone(),
58        )
59    })
60}
61
62pub fn init_current_thread(scheduler: Arc<Scheduler>) {
63    SCHEDULER_GUARD.with(|guard| {
64        let guard = unsafe { &mut *guard.get() };
65        // assert!(
66        //     guard.is_none(),
67        //     "Scheduler is already initialized for the current thread"
68        // );
69        *guard = Some(SchedulerGuard {
70            scheduler: scheduler.clone(),
71        });
72        scheduler.current_running.fetch_add(1, Ordering::Relaxed);
73    });
74}
75
76pub fn uninit_current_thread() {
77    SCHEDULER_GUARD.with(|guard| {
78        let guard = unsafe { &mut *guard.get() };
79        *guard = None;
80    });
81}
82
83#[inline(never)]
84#[cold]
85pub fn run_blocking_op<T>(f: impl FnOnce() -> T) -> T {
86    let Some(guard) = SCHEDULER_GUARD.with(|guard| unsafe { &mut *guard.get() }.as_mut()) else {
87        return f();
88    };
89
90    let active = guard
91        .scheduler
92        .current_running
93        .fetch_sub(1, Ordering::Relaxed);
94    guard.scheduler.notifier.notify_one();
95
96    assert!(active > 0);
97
98    let value = f();
99
100    fn reserve_thread(guard: &SchedulerGuard) -> bool {
101        guard
102            .scheduler
103            .current_running
104            .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
105                if v < guard.scheduler.max_threads {
106                    Some(v + 1)
107                } else {
108                    None
109                }
110            })
111            .is_ok()
112    }
113
114    guard
115        .scheduler
116        .notifier
117        .wait_for_condition(|| reserve_thread(guard));
118    value
119}