Skip to main content

reifydb_runtime/pool/
native.rs

1// SPDX-License-Identifier: MIT
2// Copyright (c) 2026 ReifyDB
3
4use std::{future::Future, mem::ManuallyDrop, sync::Arc, time::Duration};
5
6use rayon::{ThreadPool, ThreadPoolBuilder};
7use tokio::{
8	runtime::{self, Runtime},
9	task::JoinHandle,
10};
11
12use super::PoolConfig;
13
14struct PoolsInner {
15	system: Arc<ThreadPool>,
16	query: Arc<ThreadPool>,
17	commit: Arc<ThreadPool>,
18	background: Arc<ThreadPool>,
19	tokio: Option<ManuallyDrop<Runtime>>,
20}
21
22impl Drop for PoolsInner {
23	fn drop(&mut self) {
24		if let Some(rt) = self.tokio.as_mut() {
25			let rt = unsafe { ManuallyDrop::take(rt) };
26
27			if runtime::Handle::try_current().is_err() {
28				rt.shutdown_timeout(Duration::from_secs(5));
29			} else {
30				rt.shutdown_background();
31			}
32		}
33	}
34}
35
36#[derive(Clone)]
37pub struct Pools {
38	inner: Arc<PoolsInner>,
39}
40
41impl Default for Pools {
42	fn default() -> Self {
43		Self::new(PoolConfig::default())
44	}
45}
46
47impl Pools {
48	pub fn new(config: PoolConfig) -> Self {
49		let system = Arc::new(
50			ThreadPoolBuilder::new()
51				.num_threads(config.system_threads)
52				.thread_name(|i| format!("system-pool-{i}"))
53				.build()
54				.expect("failed to build system thread pool"),
55		);
56		let query = Arc::new(
57			ThreadPoolBuilder::new()
58				.num_threads(config.query_threads)
59				.thread_name(|i| format!("query-pool-{i}"))
60				.build()
61				.expect("failed to build query thread pool"),
62		);
63		let commit = Arc::new(
64			ThreadPoolBuilder::new()
65				.num_threads(config.commit_threads)
66				.thread_name(|i| format!("commit-pool-{i}"))
67				.build()
68				.expect("failed to build commit thread pool"),
69		);
70		let background = Arc::new(
71			ThreadPoolBuilder::new()
72				.num_threads(config.background_threads)
73				.thread_name(|i| format!("background-pool-{i}"))
74				.build()
75				.expect("failed to build background thread pool"),
76		);
77		let tokio = if config.async_threads > 0 {
78			let rt = runtime::Builder::new_multi_thread()
79				.worker_threads(config.async_threads)
80				.thread_name("async")
81				.enable_all()
82				.build()
83				.expect("failed to build tokio runtime");
84			Some(ManuallyDrop::new(rt))
85		} else {
86			None
87		};
88
89		Self {
90			inner: Arc::new(PoolsInner {
91				system,
92				query,
93				commit,
94				background,
95				tokio,
96			}),
97		}
98	}
99
100	pub fn system_pool(&self) -> &Arc<ThreadPool> {
101		&self.inner.system
102	}
103
104	pub fn system_thread_count(&self) -> usize {
105		self.inner.system.current_num_threads()
106	}
107
108	pub fn query_pool(&self) -> &Arc<ThreadPool> {
109		&self.inner.query
110	}
111
112	pub fn query_thread_count(&self) -> usize {
113		self.inner.query.current_num_threads()
114	}
115
116	pub fn commit_pool(&self) -> &Arc<ThreadPool> {
117		&self.inner.commit
118	}
119
120	pub fn commit_thread_count(&self) -> usize {
121		self.inner.commit.current_num_threads()
122	}
123
124	pub fn background_pool(&self) -> &Arc<ThreadPool> {
125		&self.inner.background
126	}
127
128	pub fn background_thread_count(&self) -> usize {
129		self.inner.background.current_num_threads()
130	}
131
132	fn tokio(&self) -> &Runtime {
133		self.inner.tokio.as_ref().expect("no tokio runtime configured (async_threads = 0)")
134	}
135
136	pub fn handle(&self) -> runtime::Handle {
137		self.tokio().handle().clone()
138	}
139
140	pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
141	where
142		F: Future + Send + 'static,
143		F::Output: Send + 'static,
144	{
145		self.tokio().spawn(future)
146	}
147
148	pub fn block_on<F>(&self, future: F) -> F::Output
149	where
150		F: Future,
151	{
152		self.tokio().block_on(future)
153	}
154}