reifydb_runtime/pool/
native.rs1use std::{future::Future, mem::ManuallyDrop, sync::Arc, time::Duration};
5
6use rayon::{ThreadPool, ThreadPoolBuilder};
7use tokio::{
8 runtime::{self, Runtime},
9 task::JoinHandle,
10};
11
12use super::PoolConfig;
13
14struct PoolsInner {
15 system: Arc<ThreadPool>,
16 query: Arc<ThreadPool>,
17 tokio: Option<ManuallyDrop<Runtime>>,
18}
19
20impl Drop for PoolsInner {
21 fn drop(&mut self) {
22 if let Some(rt) = self.tokio.as_mut() {
23 let rt = unsafe { ManuallyDrop::take(rt) };
24 rt.shutdown_timeout(Duration::from_secs(5));
25 }
26 }
27}
28
29#[derive(Clone)]
30pub struct Pools {
31 inner: Arc<PoolsInner>,
32}
33
34impl Default for Pools {
35 fn default() -> Self {
36 Self::new(PoolConfig::default())
37 }
38}
39
40impl Pools {
41 pub fn new(config: PoolConfig) -> Self {
42 let system = Arc::new(
43 ThreadPoolBuilder::new()
44 .num_threads(config.system_threads)
45 .thread_name(|i| format!("system-pool-{i}"))
46 .build()
47 .expect("failed to build system thread pool"),
48 );
49 let query = Arc::new(
50 ThreadPoolBuilder::new()
51 .num_threads(config.query_threads)
52 .thread_name(|i| format!("query-pool-{i}"))
53 .build()
54 .expect("failed to build query thread pool"),
55 );
56 let tokio = if config.async_threads > 0 {
57 let rt = runtime::Builder::new_multi_thread()
58 .worker_threads(config.async_threads)
59 .thread_name("async")
60 .enable_all()
61 .build()
62 .expect("failed to build tokio runtime");
63 Some(ManuallyDrop::new(rt))
64 } else {
65 None
66 };
67
68 Self {
69 inner: Arc::new(PoolsInner {
70 system,
71 query,
72 tokio,
73 }),
74 }
75 }
76
77 pub fn system_pool(&self) -> &Arc<ThreadPool> {
78 &self.inner.system
79 }
80
81 pub fn system_thread_count(&self) -> usize {
82 self.inner.system.current_num_threads()
83 }
84
85 pub fn query_pool(&self) -> &Arc<ThreadPool> {
86 &self.inner.query
87 }
88
89 fn tokio(&self) -> &Runtime {
90 self.inner.tokio.as_ref().expect("no tokio runtime configured (async_threads = 0)")
91 }
92
93 pub fn handle(&self) -> runtime::Handle {
94 self.tokio().handle().clone()
95 }
96
97 pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
98 where
99 F: Future + Send + 'static,
100 F::Output: Send + 'static,
101 {
102 self.tokio().spawn(future)
103 }
104
105 pub fn block_on<F>(&self, future: F) -> F::Output
106 where
107 F: Future,
108 {
109 self.tokio().block_on(future)
110 }
111}