agave_thread_manager/
native_thread_runtime.rs1use {
2 crate::{
3 policy::{apply_policy, parse_policy, CoreAllocation},
4 MAX_THREAD_NAME_CHARS,
5 },
6 anyhow::bail,
7 log::warn,
8 serde::{Deserialize, Serialize},
9 solana_metrics::datapoint_info,
10 std::{
11 ops::Deref,
12 sync::{
13 atomic::{AtomicUsize, Ordering},
14 Arc, Mutex,
15 },
16 },
17};
18
19#[derive(Clone, Debug, Serialize, Deserialize)]
20#[serde(default)]
21pub struct NativeConfig {
22 pub core_allocation: CoreAllocation,
23 pub max_threads: usize,
24 pub priority: u8,
26 pub policy: String,
27 pub stack_size_bytes: usize,
28}
29
30impl Default for NativeConfig {
31 fn default() -> Self {
32 Self {
33 core_allocation: CoreAllocation::OsDefault,
34 max_threads: 16,
35 priority: crate::policy::DEFAULT_PRIORITY,
36 policy: "OTHER".to_owned(),
37 stack_size_bytes: 2 * 1024 * 1024,
38 }
39 }
40}
41
42#[derive(Debug)]
43pub struct NativeThreadRuntimeInner {
44 pub id_count: AtomicUsize,
45 pub running_count: Arc<AtomicUsize>,
46 pub config: NativeConfig,
47 pub name: String,
48}
49
50#[derive(Debug, Clone)]
51pub struct NativeThreadRuntime {
52 inner: Arc<NativeThreadRuntimeInner>,
53}
54
55impl Deref for NativeThreadRuntime {
56 type Target = NativeThreadRuntimeInner;
57
58 fn deref(&self) -> &Self::Target {
59 &self.inner
60 }
61}
62
63pub struct JoinHandle<T> {
64 std_handle: Option<std::thread::JoinHandle<T>>,
65 running_count: Arc<AtomicUsize>,
66}
67
68impl<T> JoinHandle<T> {
69 fn join_inner(&mut self) -> std::thread::Result<T> {
70 match self.std_handle.take() {
71 Some(jh) => {
72 let result = jh.join();
73 let rc = self.running_count.fetch_sub(1, Ordering::Relaxed);
74 datapoint_info!("thread-manager-native", ("threads-running", rc, i64),);
75 result
76 }
77 None => {
78 panic!("Thread already joined");
79 }
80 }
81 }
82
83 pub fn join(mut self) -> std::thread::Result<T> {
84 self.join_inner()
85 }
86
87 pub fn is_finished(&self) -> bool {
88 match self.std_handle {
89 Some(ref jh) => jh.is_finished(),
90 None => true,
91 }
92 }
93}
94
95impl<T> Drop for JoinHandle<T> {
96 fn drop(&mut self) {
97 if self.std_handle.is_some() {
98 warn!(
99 "Attempting to drop a Join Handle of a running thread will leak thread IDs, \
100 please join your threads!"
101 );
102 self.join_inner().expect("Child thread panicked");
103 }
104 }
105}
106
107impl NativeThreadRuntime {
108 pub fn new(name: String, cfg: NativeConfig) -> Self {
109 debug_assert!(name.len() < MAX_THREAD_NAME_CHARS, "Thread name too long");
110 Self {
111 inner: Arc::new(NativeThreadRuntimeInner {
112 id_count: AtomicUsize::new(0),
113 running_count: Arc::new(AtomicUsize::new(0)),
114 config: cfg,
115 name,
116 }),
117 }
118 }
119
120 pub fn spawn<F, T>(&self, f: F) -> anyhow::Result<JoinHandle<T>>
121 where
122 F: FnOnce() -> T,
123 F: Send + 'static,
124 T: Send + 'static,
125 {
126 let n = self.id_count.fetch_add(1, Ordering::Relaxed);
127 let name = format!("{}-{}", &self.name, n);
128 self.spawn_named(name, f)
129 }
130
131 pub fn spawn_named<F, T>(&self, name: String, f: F) -> anyhow::Result<JoinHandle<T>>
132 where
133 F: FnOnce() -> T,
134 F: Send + 'static,
135 T: Send + 'static,
136 {
137 debug_assert!(name.len() < MAX_THREAD_NAME_CHARS, "Thread name too long");
138 let spawned = self.running_count.load(Ordering::Relaxed);
139 if spawned >= self.config.max_threads {
140 bail!("All allowed threads in this pool are already spawned");
141 }
142
143 let core_alloc = self.config.core_allocation.clone();
144 let priority = self.config.priority;
145 let policy = parse_policy(&self.config.policy);
146 let chosen_cores_mask = Mutex::new(self.config.core_allocation.as_core_mask_vector());
147 let jh = std::thread::Builder::new()
148 .name(name)
149 .stack_size(self.config.stack_size_bytes)
150 .spawn(move || {
151 apply_policy(&core_alloc, policy, priority, &chosen_cores_mask);
152 f()
153 })?;
154 let rc = self.running_count.fetch_add(1, Ordering::Relaxed);
155 datapoint_info!("thread-manager-native", ("threads-running", rc as i64, i64),);
156 Ok(JoinHandle {
157 std_handle: Some(jh),
158 running_count: self.running_count.clone(),
159 })
160 }
161
162 #[cfg(feature = "dev-context-only-utils")]
163 pub fn new_for_tests(name: &str) -> Self {
164 Self::new(name.to_owned(), NativeConfig::default())
165 }
166}