Skip to main content

aeth_task/
framework.rs

1//! Manipulation of the task framework itself.
2//!
3//! Normally the task module user does not need to care
4//! about manipulating the task framework, so we move it
5//! to a submodule to avoid overwhelming them.
6
7use crate::foreground;
8use crate::spawner::{BackgroundSpawner, ForegroundSpawner, SPAWNER, Spawner};
9use anyhow::Result;
10use core_affinity::{CoreId, get_core_ids, set_for_current};
11use futures::lock::Mutex;
12use futures::{channel::mpsc::unbounded, executor::LocalPool};
13use rand::seq::IndexedRandom;
14use std::cell::RefCell;
15use std::cmp::{max, min};
16use thread_priority::{ThreadPriority, set_current_thread_priority};
17
18/// Configuration for the task framework.
19#[derive(Default)]
20pub struct Config {
21    pub min_background_threads: Option<usize>,
22    pub max_background_threads: Option<usize>,
23    pub num_background_threads: Option<usize>,
24}
25
26/// Currently initialized threading mode.
27#[derive(Clone, Copy, PartialEq, Eq, Debug)]
28pub enum ThreadingMode {
29    /// Disjoint mode is the ideal mode that
30    /// multiple CPUs are available, and thus
31    /// foreground thread and background threads
32    /// may run on disjoint set of CPUs.
33    ///
34    /// This is the mode set whenever there're
35    /// 2 or more CPUs permitted.
36    Disjoint,
37
38    /// Contended mode is the downgrade mode that
39    /// only single CPU is available, and
40    /// foreground thread and background threads
41    /// have to run on that CPU.
42    ///
43    /// This mode is set when there's single CPU
44    /// or we are unable to get permitted CPU set.
45    Contended,
46}
47
48/// Task framework initialization status.
49pub struct Status {
50    threading_mode: ThreadingMode,
51}
52
53impl Status {
54    pub fn threading_mode(&self) -> ThreadingMode {
55        self.threading_mode
56    }
57}
58
59#[derive(Clone)]
60enum ThreadInitializer {
61    Disjoint { cores: Vec<CoreId> },
62    Contended,
63}
64
65impl ThreadInitializer {
66    fn configure_on_foreground(&self) {
67        match self {
68            Self::Disjoint { cores } => {
69                set_for_current(cores[0]);
70                let _ = set_current_thread_priority(ThreadPriority::Max);
71                std::thread::yield_now();
72            }
73            Self::Contended => {
74                let _ = set_current_thread_priority(ThreadPriority::Max);
75            }
76        }
77    }
78
79    fn configure_on_background(&self) {
80        match self {
81            Self::Disjoint { cores } => {
82                assert!(cores.len() > 1);
83                let mut rng = rand::rng();
84                let core = cores[1..].choose(&mut rng);
85                set_for_current(core.unwrap().clone());
86            }
87            Self::Contended => {
88                let _ = set_current_thread_priority(ThreadPriority::Min);
89            }
90        }
91    }
92
93    fn recommend_num_backgrounds(&self) -> usize {
94        match self {
95            Self::Disjoint { cores } => cores.len() - 1,
96            Self::Contended => 1,
97        }
98    }
99
100    fn threading_mode(&self) -> ThreadingMode {
101        match self {
102            Self::Disjoint { cores: _ } => ThreadingMode::Disjoint,
103            Self::Contended => ThreadingMode::Contended,
104        }
105    }
106}
107
108/// Initialized framework handle.
109///
110/// This handle is used to control the lifecycle
111/// of the whole system. By dropping it, you
112/// dispose the entire framework.
113///
114/// Disposing the framework is required on some
115/// platform such as windows.
116pub struct Framework {
117    status: Status,
118}
119
120impl Framework {
121    pub fn status(&self) -> &Status {
122        &self.status
123    }
124}
125
126impl Drop for Framework {
127    fn drop(&mut self) {
128        foreground::assert();
129        drop(SPAWNER.replace(Spawner::Uninit))
130    }
131}
132
133/// Initialize the task framework.
134///
135/// Please notice that the tasking system will only
136/// be initialized once, and the later invocation
137/// will always return the result of the first
138/// invocation, ignoring the config.
139pub fn initialize(cfg: Config) -> Result<Framework> {
140    SPAWNER.with_borrow(|v| match v {
141        &Spawner::Uninit => Ok(()),
142        _ => Err(anyhow::anyhow!("Initialized framework in use.")),
143    })?;
144
145    let initializer = {
146        let cores = get_core_ids().unwrap_or(Vec::new());
147        if cores.len() >= 2 {
148            ThreadInitializer::Disjoint { cores }
149        } else {
150            ThreadInitializer::Contended
151        }
152    };
153
154    let mut num_workers = initializer.recommend_num_backgrounds();
155    num_workers = max(1, num_workers);
156    if let Some(min_background_threads) = cfg.min_background_threads {
157        num_workers = max(min_background_threads, num_workers);
158    }
159    if let Some(max_background_threads) = cfg.max_background_threads {
160        num_workers = min(max_background_threads, num_workers);
161    }
162    if let Some(num_background_threads) = cfg.num_background_threads {
163        num_workers = num_background_threads;
164    }
165
166    let (loopback_send, loopback_recv) = unbounded();
167
168    let thread_send = loopback_send.clone();
169    let thread_initializer = initializer.clone();
170    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
171        .worker_threads(num_workers)
172        .on_thread_start(move || {
173            thread_initializer.configure_on_background();
174            SPAWNER.with_borrow_mut(|v| {
175                *v = Spawner::Background(BackgroundSpawner {
176                    loopback_send: thread_send.clone(),
177                });
178            })
179        })
180        .enable_all()
181        .build()?;
182
183    initializer.configure_on_foreground();
184    SPAWNER.with_borrow_mut(|v| {
185        let local_pool = LocalPool::new();
186        let local_spawner = local_pool.spawner();
187        *v = Spawner::Foreground(ForegroundSpawner {
188            local_pool: RefCell::new(local_pool),
189            local_spawner: local_spawner,
190            tokio_runtime: tokio_runtime,
191            loopback_recv: Mutex::new(loopback_recv),
192            _loopback_send: loopback_send,
193        });
194    });
195
196    Ok(Framework {
197        status: Status {
198            threading_mode: initializer.threading_mode(),
199        },
200    })
201}