reifydb_runtime/pool/
native.rs1use std::{future::Future, mem::ManuallyDrop, sync::Arc, time::Duration};
5
6use rayon::{ThreadPool, ThreadPoolBuilder};
7use tokio::{
8 runtime::{self, Runtime},
9 task::JoinHandle,
10};
11
12use super::PoolConfig;
13
14struct PoolsInner {
15 system: Arc<ThreadPool>,
16 query: Arc<ThreadPool>,
17 tokio: Option<ManuallyDrop<Runtime>>,
18}
19
20impl Drop for PoolsInner {
21 fn drop(&mut self) {
22 if let Some(rt) = self.tokio.as_mut() {
23 let rt = unsafe { ManuallyDrop::take(rt) };
24 rt.shutdown_timeout(Duration::from_secs(5));
25 }
26 }
27}
28
29#[derive(Clone)]
38pub struct Pools {
39 inner: Arc<PoolsInner>,
40}
41
42impl Default for Pools {
43 fn default() -> Self {
44 Self::new(PoolConfig::default())
45 }
46}
47
48impl Pools {
49 pub fn new(config: PoolConfig) -> Self {
53 let system = Arc::new(
54 ThreadPoolBuilder::new()
55 .num_threads(config.system_threads)
56 .thread_name(|i| format!("system-pool-{i}"))
57 .build()
58 .expect("failed to build system thread pool"),
59 );
60 let query = Arc::new(
61 ThreadPoolBuilder::new()
62 .num_threads(config.query_threads)
63 .thread_name(|i| format!("query-pool-{i}"))
64 .build()
65 .expect("failed to build query thread pool"),
66 );
67 let tokio = if config.async_threads > 0 {
68 let rt = runtime::Builder::new_multi_thread()
69 .worker_threads(config.async_threads)
70 .thread_name("async")
71 .enable_all()
72 .build()
73 .expect("failed to build tokio runtime");
74 Some(ManuallyDrop::new(rt))
75 } else {
76 None
77 };
78
79 Self {
80 inner: Arc::new(PoolsInner {
81 system,
82 query,
83 tokio,
84 }),
85 }
86 }
87
88 pub fn system_pool(&self) -> &Arc<ThreadPool> {
90 &self.inner.system
91 }
92
93 pub fn system_thread_count(&self) -> usize {
95 self.inner.system.current_num_threads()
96 }
97
98 pub fn query_pool(&self) -> &Arc<ThreadPool> {
100 &self.inner.query
101 }
102
103 fn tokio(&self) -> &Runtime {
104 self.inner.tokio.as_ref().expect("no tokio runtime configured (async_threads = 0)")
105 }
106
107 pub fn handle(&self) -> runtime::Handle {
109 self.tokio().handle().clone()
110 }
111
112 pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
114 where
115 F: Future + Send + 'static,
116 F::Output: Send + 'static,
117 {
118 self.tokio().spawn(future)
119 }
120
121 pub fn block_on<F>(&self, future: F) -> F::Output
123 where
124 F: Future,
125 {
126 self.tokio().block_on(future)
127 }
128}