reifydb_runtime/pool/
native.rs1use std::{future::Future, mem::ManuallyDrop, sync::Arc, time::Duration};
7
8use rayon::{ThreadPool, ThreadPoolBuilder};
9use tokio::{
10 runtime::{self, Runtime},
11 task::JoinHandle,
12};
13
14use super::PoolConfig;
15
16struct PoolsInner {
17 system: Arc<ThreadPool>,
18 query: Arc<ThreadPool>,
19 tokio: Option<ManuallyDrop<Runtime>>,
20}
21
22impl Drop for PoolsInner {
23 fn drop(&mut self) {
24 if let Some(rt) = self.tokio.as_mut() {
25 let rt = unsafe { ManuallyDrop::take(rt) };
26 rt.shutdown_timeout(Duration::from_secs(5));
27 }
28 }
29}
30
31#[derive(Clone)]
40pub struct Pools {
41 inner: Arc<PoolsInner>,
42}
43
44impl Default for Pools {
45 fn default() -> Self {
46 Self::new(PoolConfig::default())
47 }
48}
49
50impl Pools {
51 pub fn new(config: PoolConfig) -> Self {
55 let system = Arc::new(
56 ThreadPoolBuilder::new()
57 .num_threads(config.system_threads)
58 .thread_name(|i| format!("system-pool-{i}"))
59 .build()
60 .expect("failed to build system thread pool"),
61 );
62 let query = Arc::new(
63 ThreadPoolBuilder::new()
64 .num_threads(config.query_threads)
65 .thread_name(|i| format!("query-pool-{i}"))
66 .build()
67 .expect("failed to build query thread pool"),
68 );
69 let tokio = if config.async_threads > 0 {
70 let rt = runtime::Builder::new_multi_thread()
71 .worker_threads(config.async_threads)
72 .thread_name("async")
73 .enable_all()
74 .build()
75 .expect("failed to build tokio runtime");
76 Some(ManuallyDrop::new(rt))
77 } else {
78 None
79 };
80
81 Self {
82 inner: Arc::new(PoolsInner {
83 system,
84 query,
85 tokio,
86 }),
87 }
88 }
89
90 pub fn system_pool(&self) -> &Arc<ThreadPool> {
92 &self.inner.system
93 }
94
95 pub fn query_pool(&self) -> &Arc<ThreadPool> {
97 &self.inner.query
98 }
99
100 fn tokio(&self) -> &Runtime {
101 self.inner.tokio.as_ref().expect("no tokio runtime configured (async_threads = 0)")
102 }
103
104 pub fn handle(&self) -> runtime::Handle {
106 self.tokio().handle().clone()
107 }
108
109 pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
111 where
112 F: Future + Send + 'static,
113 F::Output: Send + 'static,
114 {
115 self.tokio().spawn(future)
116 }
117
118 pub fn block_on<F>(&self, future: F) -> F::Output
120 where
121 F: Future,
122 {
123 self.tokio().block_on(future)
124 }
125}