Skip to main content

reifydb_runtime/pool/
native.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{future::Future, mem::ManuallyDrop, sync::Arc, time::Duration};
5
6use rayon::{ThreadPool, ThreadPoolBuilder};
7use tokio::{
8	runtime::{self, Runtime},
9	task::JoinHandle,
10};
11
12use super::PoolConfig;
13
14struct PoolsInner {
15	system: Arc<ThreadPool>,
16	query: Arc<ThreadPool>,
17	tokio: Option<ManuallyDrop<Runtime>>,
18}
19
20impl Drop for PoolsInner {
21	fn drop(&mut self) {
22		if let Some(rt) = self.tokio.as_mut() {
23			let rt = unsafe { ManuallyDrop::take(rt) };
24			rt.shutdown_timeout(Duration::from_secs(5));
25		}
26	}
27}
28
29#[derive(Clone)]
30pub struct Pools {
31	inner: Arc<PoolsInner>,
32}
33
34impl Default for Pools {
35	fn default() -> Self {
36		Self::new(PoolConfig::default())
37	}
38}
39
40impl Pools {
41	pub fn new(config: PoolConfig) -> Self {
42		let system = Arc::new(
43			ThreadPoolBuilder::new()
44				.num_threads(config.system_threads)
45				.thread_name(|i| format!("system-pool-{i}"))
46				.build()
47				.expect("failed to build system thread pool"),
48		);
49		let query = Arc::new(
50			ThreadPoolBuilder::new()
51				.num_threads(config.query_threads)
52				.thread_name(|i| format!("query-pool-{i}"))
53				.build()
54				.expect("failed to build query thread pool"),
55		);
56		let tokio = if config.async_threads > 0 {
57			let rt = runtime::Builder::new_multi_thread()
58				.worker_threads(config.async_threads)
59				.thread_name("async")
60				.enable_all()
61				.build()
62				.expect("failed to build tokio runtime");
63			Some(ManuallyDrop::new(rt))
64		} else {
65			None
66		};
67
68		Self {
69			inner: Arc::new(PoolsInner {
70				system,
71				query,
72				tokio,
73			}),
74		}
75	}
76
77	pub fn system_pool(&self) -> &Arc<ThreadPool> {
78		&self.inner.system
79	}
80
81	pub fn system_thread_count(&self) -> usize {
82		self.inner.system.current_num_threads()
83	}
84
85	pub fn query_pool(&self) -> &Arc<ThreadPool> {
86		&self.inner.query
87	}
88
89	fn tokio(&self) -> &Runtime {
90		self.inner.tokio.as_ref().expect("no tokio runtime configured (async_threads = 0)")
91	}
92
93	pub fn handle(&self) -> runtime::Handle {
94		self.tokio().handle().clone()
95	}
96
97	pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
98	where
99		F: Future + Send + 'static,
100		F::Output: Send + 'static,
101	{
102		self.tokio().spawn(future)
103	}
104
105	pub fn block_on<F>(&self, future: F) -> F::Output
106	where
107		F: Future,
108	{
109		self.tokio().block_on(future)
110	}
111}