1use crate::foreground;
8use crate::spawner::{BackgroundSpawner, ForegroundSpawner, SPAWNER, Spawner};
9use anyhow::Result;
10use core_affinity::{CoreId, get_core_ids, set_for_current};
11use futures::lock::Mutex;
12use futures::{channel::mpsc::unbounded, executor::LocalPool};
13use rand::seq::IndexedRandom;
14use std::cell::RefCell;
15use std::cmp::{max, min};
16use thread_priority::{ThreadPriority, set_current_thread_priority};
17
18#[derive(Default)]
20pub struct Config {
21 pub min_background_threads: Option<usize>,
22 pub max_background_threads: Option<usize>,
23 pub num_background_threads: Option<usize>,
24}
25
26#[derive(Clone, Copy, PartialEq, Eq, Debug)]
28pub enum ThreadingMode {
29 Disjoint,
37
38 Contended,
46}
47
48pub struct Status {
50 threading_mode: ThreadingMode,
51}
52
53impl Status {
54 pub fn threading_mode(&self) -> ThreadingMode {
55 self.threading_mode
56 }
57}
58
59#[derive(Clone)]
60enum ThreadInitializer {
61 Disjoint { cores: Vec<CoreId> },
62 Contended,
63}
64
65impl ThreadInitializer {
66 fn configure_on_foreground(&self) {
67 match self {
68 Self::Disjoint { cores } => {
69 set_for_current(cores[0]);
70 let _ = set_current_thread_priority(ThreadPriority::Max);
71 std::thread::yield_now();
72 }
73 Self::Contended => {
74 let _ = set_current_thread_priority(ThreadPriority::Max);
75 }
76 }
77 }
78
79 fn configure_on_background(&self) {
80 match self {
81 Self::Disjoint { cores } => {
82 assert!(cores.len() > 1);
83 let mut rng = rand::rng();
84 let core = cores[1..].choose(&mut rng);
85 set_for_current(core.unwrap().clone());
86 }
87 Self::Contended => {
88 let _ = set_current_thread_priority(ThreadPriority::Min);
89 }
90 }
91 }
92
93 fn recommend_num_backgrounds(&self) -> usize {
94 match self {
95 Self::Disjoint { cores } => cores.len() - 1,
96 Self::Contended => 1,
97 }
98 }
99
100 fn threading_mode(&self) -> ThreadingMode {
101 match self {
102 Self::Disjoint { cores: _ } => ThreadingMode::Disjoint,
103 Self::Contended => ThreadingMode::Contended,
104 }
105 }
106}
107
108pub struct Framework {
117 status: Status,
118}
119
120impl Framework {
121 pub fn status(&self) -> &Status {
122 &self.status
123 }
124}
125
126impl Drop for Framework {
127 fn drop(&mut self) {
128 foreground::assert();
129 drop(SPAWNER.replace(Spawner::Uninit))
130 }
131}
132
133pub fn initialize(cfg: Config) -> Result<Framework> {
140 SPAWNER.with_borrow(|v| match v {
141 &Spawner::Uninit => Ok(()),
142 _ => Err(anyhow::anyhow!("Initialized framework in use.")),
143 })?;
144
145 let initializer = {
146 let cores = get_core_ids().unwrap_or(Vec::new());
147 if cores.len() >= 2 {
148 ThreadInitializer::Disjoint { cores }
149 } else {
150 ThreadInitializer::Contended
151 }
152 };
153
154 let mut num_workers = initializer.recommend_num_backgrounds();
155 num_workers = max(1, num_workers);
156 if let Some(min_background_threads) = cfg.min_background_threads {
157 num_workers = max(min_background_threads, num_workers);
158 }
159 if let Some(max_background_threads) = cfg.max_background_threads {
160 num_workers = min(max_background_threads, num_workers);
161 }
162 if let Some(num_background_threads) = cfg.num_background_threads {
163 num_workers = num_background_threads;
164 }
165
166 let (loopback_send, loopback_recv) = unbounded();
167
168 let thread_send = loopback_send.clone();
169 let thread_initializer = initializer.clone();
170 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
171 .worker_threads(num_workers)
172 .on_thread_start(move || {
173 thread_initializer.configure_on_background();
174 SPAWNER.with_borrow_mut(|v| {
175 *v = Spawner::Background(BackgroundSpawner {
176 loopback_send: thread_send.clone(),
177 });
178 })
179 })
180 .enable_all()
181 .build()?;
182
183 initializer.configure_on_foreground();
184 SPAWNER.with_borrow_mut(|v| {
185 let local_pool = LocalPool::new();
186 let local_spawner = local_pool.spawner();
187 *v = Spawner::Foreground(ForegroundSpawner {
188 local_pool: RefCell::new(local_pool),
189 local_spawner: local_spawner,
190 tokio_runtime: tokio_runtime,
191 loopback_recv: Mutex::new(loopback_recv),
192 _loopback_send: loopback_send,
193 });
194 });
195
196 Ok(Framework {
197 status: Status {
198 threading_mode: initializer.threading_mode(),
199 },
200 })
201}