1use crate::foreground;
8use crate::spawner::{BackgroundSpawner, ForegroundSpawner, SPAWNER, Spawner};
9use anyhow::Result;
10use core_affinity::{CoreId, get_core_ids, set_for_current};
11use futures::lock::Mutex;
12use futures::{channel::mpsc::unbounded, executor::LocalPool};
13use rand::seq::IndexedRandom;
14use std::cell::RefCell;
15use std::cmp::{max, min};
16use thread_priority::{ThreadPriority, set_current_thread_priority};
17
18#[derive(Default)]
20pub struct Config {
21 pub min_background_threads: Option<usize>,
22 pub max_background_threads: Option<usize>,
23 pub num_background_threads: Option<usize>,
24}
25
26#[derive(Clone, Copy, PartialEq, Eq, Debug)]
28pub enum ThreadingMode {
29 Disjoint,
37
38 Contended,
46}
47
48pub struct Status {
50 threading_mode: ThreadingMode,
51}
52
53impl Status {
54 pub fn threading_mode(&self) -> ThreadingMode {
55 self.threading_mode
56 }
57}
58
59#[derive(Clone)]
60enum ThreadInitializer {
61 Disjoint { cores: Vec<CoreId> },
62 Contended,
63}
64
65impl ThreadInitializer {
66 fn configure_on_foreground(&self) {
67 match self {
68 Self::Disjoint { cores } => {
69 set_for_current(cores[0]);
70 let _ = set_current_thread_priority(ThreadPriority::Max);
71 std::thread::yield_now();
72 }
73 Self::Contended => {
74 let _ = set_current_thread_priority(ThreadPriority::Max);
75 }
76 }
77 }
78
79 fn configure_on_background(&self) {
80 match self {
81 Self::Disjoint { cores } => {
82 assert!(cores.len() > 1);
83 let mut rng = rand::rng();
84 let core = cores[1..].choose(&mut rng);
85 set_for_current(core.unwrap().clone());
86 }
87 Self::Contended => {
88 let _ = set_current_thread_priority(ThreadPriority::Min);
89 }
90 }
91 }
92
93 fn recommend_num_backgrounds(&self) -> usize {
94 match self {
95 Self::Disjoint { cores } => cores.len() - 1,
96 Self::Contended => 1,
97 }
98 }
99
100 fn threading_mode(&self) -> ThreadingMode {
101 match self {
102 Self::Disjoint { cores: _ } => ThreadingMode::Disjoint,
103 Self::Contended => ThreadingMode::Contended,
104 }
105 }
106}
107
108pub struct Framework {
117 status: Status,
118}
119
120impl Framework {
121 pub fn status(&self) -> &Status {
122 &self.status
123 }
124}
125
126impl Drop for Framework {
127 fn drop(&mut self) {
128 foreground::assert();
129 drop(SPAWNER.replace(Spawner::Uninit))
130 }
131}
132
133pub fn initialize(cfg: Config) -> Result<Framework> {
140 SPAWNER.with_borrow(|v| {
141 match v {
142 &Spawner::Uninit => Ok(()),
143 _ => Err(anyhow::anyhow!("Initialized framework in use."))
144 }
145 })?;
146
147 let initializer = {
148 let cores = get_core_ids().unwrap_or(Vec::new());
149 if cores.len() >= 2 {
150 ThreadInitializer::Disjoint { cores }
151 } else {
152 ThreadInitializer::Contended
153 }
154 };
155
156 let mut num_workers = initializer.recommend_num_backgrounds();
157 num_workers = max(1, num_workers);
158 if let Some(min_background_threads) = cfg.min_background_threads {
159 num_workers = max(min_background_threads, num_workers);
160 }
161 if let Some(max_background_threads) = cfg.max_background_threads {
162 num_workers = min(max_background_threads, num_workers);
163 }
164 if let Some(num_background_threads) = cfg.num_background_threads {
165 num_workers = num_background_threads;
166 }
167
168 let (loopback_send, loopback_recv) = unbounded();
169
170 let thread_send = loopback_send.clone();
171 let thread_initializer = initializer.clone();
172 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
173 .worker_threads(num_workers)
174 .on_thread_start(move || {
175 thread_initializer.configure_on_background();
176 SPAWNER.with_borrow_mut(|v| {
177 *v = Spawner::Background(BackgroundSpawner {
178 loopback_send: thread_send.clone(),
179 });
180 })
181 })
182 .enable_all()
183 .build()?;
184
185 initializer.configure_on_foreground();
186 SPAWNER.with_borrow_mut(|v| {
187 let local_pool = LocalPool::new();
188 let local_spawner = local_pool.spawner();
189 *v = Spawner::Foreground(ForegroundSpawner {
190 local_pool: RefCell::new(local_pool),
191 local_spawner: local_spawner,
192 tokio_runtime: tokio_runtime,
193 loopback_recv: Mutex::new(loopback_recv),
194 _loopback_send: loopback_send,
195 });
196 });
197
198 Ok(Framework {
199 status: Status {
200 threading_mode: initializer.threading_mode(),
201 },
202 })
203}