Skip to main content

reifydb_runtime/pool/
native.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{future::Future, mem::ManuallyDrop, sync::Arc, time::Duration};
5
6use rayon::{ThreadPool, ThreadPoolBuilder};
7use tokio::{
8	runtime::{self, Runtime},
9	task::JoinHandle,
10};
11
12use super::PoolConfig;
13
14struct PoolsInner {
15	system: Arc<ThreadPool>,
16	query: Arc<ThreadPool>,
17	tokio: Option<ManuallyDrop<Runtime>>,
18}
19
20impl Drop for PoolsInner {
21	fn drop(&mut self) {
22		if let Some(rt) = self.tokio.as_mut() {
23			let rt = unsafe { ManuallyDrop::take(rt) };
24			rt.shutdown_timeout(Duration::from_secs(5));
25		}
26	}
27}
28
29/// Handle to the runtime's thread pools.
30///
31/// Wraps two rayon `ThreadPool` instances for workload isolation
32/// and an optional tokio `Runtime` for async I/O.
33///
34/// When `async_threads` is 0 in the config, no tokio runtime is created.
35/// Calling `spawn()`, `block_on()`, or `handle()` without a tokio runtime
36/// will panic.
37#[derive(Clone)]
38pub struct Pools {
39	inner: Arc<PoolsInner>,
40}
41
42impl Default for Pools {
43	fn default() -> Self {
44		Self::new(PoolConfig::default())
45	}
46}
47
48impl Pools {
49	/// Create all thread pools from the given configuration.
50	///
51	/// If `async_threads` is 0, no tokio runtime is created.
52	pub fn new(config: PoolConfig) -> Self {
53		let system = Arc::new(
54			ThreadPoolBuilder::new()
55				.num_threads(config.system_threads)
56				.thread_name(|i| format!("system-pool-{i}"))
57				.build()
58				.expect("failed to build system thread pool"),
59		);
60		let query = Arc::new(
61			ThreadPoolBuilder::new()
62				.num_threads(config.query_threads)
63				.thread_name(|i| format!("query-pool-{i}"))
64				.build()
65				.expect("failed to build query thread pool"),
66		);
67		let tokio = if config.async_threads > 0 {
68			let rt = runtime::Builder::new_multi_thread()
69				.worker_threads(config.async_threads)
70				.thread_name("async")
71				.enable_all()
72				.build()
73				.expect("failed to build tokio runtime");
74			Some(ManuallyDrop::new(rt))
75		} else {
76			None
77		};
78
79		Self {
80			inner: Arc::new(PoolsInner {
81				system,
82				query,
83				tokio,
84			}),
85		}
86	}
87
88	/// Get a reference to the system rayon pool.
89	pub fn system_pool(&self) -> &Arc<ThreadPool> {
90		&self.inner.system
91	}
92
93	/// Number of threads in the system pool.
94	pub fn system_thread_count(&self) -> usize {
95		self.inner.system.current_num_threads()
96	}
97
98	/// Get a reference to the query rayon pool.
99	pub fn query_pool(&self) -> &Arc<ThreadPool> {
100		&self.inner.query
101	}
102
103	fn tokio(&self) -> &Runtime {
104		self.inner.tokio.as_ref().expect("no tokio runtime configured (async_threads = 0)")
105	}
106
107	/// Get a handle to the tokio runtime.
108	pub fn handle(&self) -> runtime::Handle {
109		self.tokio().handle().clone()
110	}
111
112	/// Spawn a future onto the tokio runtime.
113	pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
114	where
115		F: Future + Send + 'static,
116		F::Output: Send + 'static,
117	{
118		self.tokio().spawn(future)
119	}
120
121	/// Block the current thread until the future completes.
122	pub fn block_on<F>(&self, future: F) -> F::Output
123	where
124		F: Future,
125	{
126		self.tokio().block_on(future)
127	}
128}