reifydb_runtime/pool/
native.rs1use std::{future::Future, mem::ManuallyDrop, sync::Arc, time::Duration};
5
6use rayon::{ThreadPool, ThreadPoolBuilder};
7use tokio::{
8 runtime::{self, Runtime},
9 task::JoinHandle,
10};
11
12use super::PoolConfig;
13
14struct PoolsInner {
15 system: Arc<ThreadPool>,
16 query: Arc<ThreadPool>,
17 commit: Arc<ThreadPool>,
18 background: Arc<ThreadPool>,
19 tokio: Option<ManuallyDrop<Runtime>>,
20}
21
22impl Drop for PoolsInner {
23 fn drop(&mut self) {
24 if let Some(rt) = self.tokio.as_mut() {
25 let rt = unsafe { ManuallyDrop::take(rt) };
26
27 if runtime::Handle::try_current().is_err() {
28 rt.shutdown_timeout(Duration::from_secs(5));
29 } else {
30 rt.shutdown_background();
31 }
32 }
33 }
34}
35
36#[derive(Clone)]
37pub struct Pools {
38 inner: Arc<PoolsInner>,
39}
40
41impl Default for Pools {
42 fn default() -> Self {
43 Self::new(PoolConfig::default())
44 }
45}
46
47impl Pools {
48 pub fn new(config: PoolConfig) -> Self {
49 let system = Arc::new(
50 ThreadPoolBuilder::new()
51 .num_threads(config.system_threads)
52 .thread_name(|i| format!("system-pool-{i}"))
53 .build()
54 .expect("failed to build system thread pool"),
55 );
56 let query = Arc::new(
57 ThreadPoolBuilder::new()
58 .num_threads(config.query_threads)
59 .thread_name(|i| format!("query-pool-{i}"))
60 .build()
61 .expect("failed to build query thread pool"),
62 );
63 let commit = Arc::new(
64 ThreadPoolBuilder::new()
65 .num_threads(config.commit_threads)
66 .thread_name(|i| format!("commit-pool-{i}"))
67 .build()
68 .expect("failed to build commit thread pool"),
69 );
70 let background = Arc::new(
71 ThreadPoolBuilder::new()
72 .num_threads(config.background_threads)
73 .thread_name(|i| format!("background-pool-{i}"))
74 .build()
75 .expect("failed to build background thread pool"),
76 );
77 let tokio = if config.async_threads > 0 {
78 let rt = runtime::Builder::new_multi_thread()
79 .worker_threads(config.async_threads)
80 .thread_name("async")
81 .enable_all()
82 .build()
83 .expect("failed to build tokio runtime");
84 Some(ManuallyDrop::new(rt))
85 } else {
86 None
87 };
88
89 Self {
90 inner: Arc::new(PoolsInner {
91 system,
92 query,
93 commit,
94 background,
95 tokio,
96 }),
97 }
98 }
99
100 pub fn system_pool(&self) -> &Arc<ThreadPool> {
101 &self.inner.system
102 }
103
104 pub fn system_thread_count(&self) -> usize {
105 self.inner.system.current_num_threads()
106 }
107
108 pub fn query_pool(&self) -> &Arc<ThreadPool> {
109 &self.inner.query
110 }
111
112 pub fn query_thread_count(&self) -> usize {
113 self.inner.query.current_num_threads()
114 }
115
116 pub fn commit_pool(&self) -> &Arc<ThreadPool> {
117 &self.inner.commit
118 }
119
120 pub fn commit_thread_count(&self) -> usize {
121 self.inner.commit.current_num_threads()
122 }
123
124 pub fn background_pool(&self) -> &Arc<ThreadPool> {
125 &self.inner.background
126 }
127
128 pub fn background_thread_count(&self) -> usize {
129 self.inner.background.current_num_threads()
130 }
131
132 fn tokio(&self) -> &Runtime {
133 self.inner.tokio.as_ref().expect("no tokio runtime configured (async_threads = 0)")
134 }
135
136 pub fn handle(&self) -> runtime::Handle {
137 self.tokio().handle().clone()
138 }
139
140 pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
141 where
142 F: Future + Send + 'static,
143 F::Output: Send + 'static,
144 {
145 self.tokio().spawn(future)
146 }
147
148 pub fn block_on<F>(&self, future: F) -> F::Output
149 where
150 F: Future,
151 {
152 self.tokio().block_on(future)
153 }
154}