agave_thread_manager/
tokio_runtime.rs

1use {
2    crate::{
3        policy::{apply_policy, parse_policy, CoreAllocation},
4        MAX_THREAD_NAME_CHARS,
5    },
6    serde::{Deserialize, Serialize},
7    solana_metrics::datapoint_info,
8    std::{
9        ops::Deref,
10        sync::{
11            atomic::{AtomicU64, AtomicUsize, Ordering},
12            Arc, Mutex,
13        },
14        time::Duration,
15    },
16    thread_priority::ThreadExt,
17};
18
19#[derive(Clone, Debug, Serialize, Deserialize)]
20#[serde(default)]
21pub struct TokioConfig {
22    ///number of worker threads tokio is allowed to spawn
23    pub worker_threads: usize,
24    ///max number of blocking threads tokio is allowed to spawn
25    pub max_blocking_threads: usize,
26    /// Priority in range 0..99
27    pub priority: u8,
28    pub policy: String,
29    pub stack_size_bytes: usize,
30    pub event_interval: u32,
31    pub core_allocation: CoreAllocation,
32}
33
34impl Default for TokioConfig {
35    fn default() -> Self {
36        Self {
37            core_allocation: CoreAllocation::OsDefault,
38            worker_threads: 8,
39            max_blocking_threads: 1,
40            priority: crate::policy::DEFAULT_PRIORITY,
41            policy: "OTHER".to_owned(),
42            stack_size_bytes: 2 * 1024 * 1024,
43            event_interval: 61,
44        }
45    }
46}
47
48#[derive(Debug)]
49pub struct TokioRuntime {
50    pub tokio: tokio::runtime::Runtime,
51    pub config: TokioConfig,
52    pub counters: Arc<ThreadCounters>,
53}
54
55impl Deref for TokioRuntime {
56    type Target = tokio::runtime::Runtime;
57
58    fn deref(&self) -> &Self::Target {
59        &self.tokio
60    }
61}
62
63impl TokioRuntime {
64    /// Starts the metrics sampling task on the runtime to monitor
65    /// how many workers are busy doing useful things.
66    pub fn start_metrics_sampling(&self, period: Duration) {
67        let counters = self.counters.clone();
68        self.tokio.spawn(metrics_sampler(counters, period));
69    }
70
71    pub fn new(name: String, cfg: TokioConfig) -> anyhow::Result<Self> {
72        debug_assert!(name.len() < MAX_THREAD_NAME_CHARS, "Thread name too long");
73        let num_workers = if cfg.worker_threads == 0 {
74            num_cpus::get()
75        } else {
76            cfg.worker_threads
77        };
78        let chosen_cores_mask = cfg.core_allocation.as_core_mask_vector();
79
80        let base_name = name.clone();
81        let mut builder = match num_workers {
82            1 => tokio::runtime::Builder::new_current_thread(),
83            _ => {
84                let mut builder = tokio::runtime::Builder::new_multi_thread();
85                builder.worker_threads(num_workers);
86                builder
87            }
88        };
89        let atomic_id: AtomicUsize = AtomicUsize::new(0);
90
91        let counters = Arc::new(ThreadCounters {
92            // no workaround, metrics crate will only consume 'static str
93            namespace: format!("thread-manager-tokio-{}", &base_name).leak(),
94            total_threads_cnt: cfg.worker_threads as u64,
95            active_threads_cnt: AtomicU64::new(
96                (num_workers.wrapping_add(cfg.max_blocking_threads)) as u64,
97            ),
98        });
99        builder
100            .event_interval(cfg.event_interval)
101            .thread_name_fn(move || {
102                let id = atomic_id.fetch_add(1, Ordering::Relaxed);
103                format!("{}-{}", base_name, id)
104            })
105            .on_thread_park({
106                let counters = counters.clone();
107                move || {
108                    counters.on_park();
109                }
110            })
111            .on_thread_unpark({
112                let counters = counters.clone();
113                move || {
114                    counters.on_unpark();
115                }
116            })
117            .thread_stack_size(cfg.stack_size_bytes)
118            .enable_all()
119            .max_blocking_threads(cfg.max_blocking_threads);
120
121        //keep borrow checker happy and move these things into the closure
122        let c = cfg.clone();
123        let chosen_cores_mask = Mutex::new(chosen_cores_mask);
124        builder.on_thread_start(move || {
125            let cur_thread = std::thread::current();
126            let _tid = cur_thread
127                .get_native_id()
128                .expect("Can not get thread id for newly created thread");
129
130            apply_policy(
131                &c.core_allocation,
132                parse_policy(&c.policy),
133                c.priority,
134                &chosen_cores_mask,
135            );
136        });
137        Ok(TokioRuntime {
138            tokio: builder.build()?,
139            config: cfg.clone(),
140            counters,
141        })
142    }
143
144    /// Makes test runtime with 2 threads, only for unittests
145    #[cfg(feature = "dev-context-only-utils")]
146    pub fn new_for_tests() -> Self {
147        let cfg = TokioConfig {
148            worker_threads: 2,
149            ..Default::default()
150        };
151        TokioRuntime::new("solNetTest".to_owned(), cfg.clone())
152            .expect("Failed to create Tokio runtime for tests")
153    }
154}
155
156/// Internal counters to keep track of worker pool utilization
157#[derive(Debug)]
158pub struct ThreadCounters {
159    pub namespace: &'static str,
160    pub total_threads_cnt: u64,
161    pub active_threads_cnt: AtomicU64,
162}
163
164impl ThreadCounters {
165    pub fn on_park(&self) {
166        self.active_threads_cnt.fetch_sub(1, Ordering::Relaxed);
167    }
168
169    pub fn on_unpark(&self) {
170        self.active_threads_cnt.fetch_add(1, Ordering::Relaxed);
171    }
172}
173
174async fn metrics_sampler(counters: Arc<ThreadCounters>, period: Duration) {
175    let mut interval = tokio::time::interval(period);
176    loop {
177        interval.tick().await;
178        let active = counters.active_threads_cnt.load(Ordering::Relaxed) as i64;
179        let parked = (counters.total_threads_cnt as i64).saturating_sub(active);
180        datapoint_info!(
181            counters.namespace,
182            ("threads_parked", parked, i64),
183            ("threads_active", active, i64),
184        );
185    }
186}