Skip to main content

reifydb_runtime/pool/
native.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Native thread pool implementation using rayon and tokio.
5
6use std::{future::Future, mem::ManuallyDrop, sync::Arc, time::Duration};
7
8use rayon::{ThreadPool, ThreadPoolBuilder};
9use tokio::{
10	runtime::{self, Runtime},
11	task::JoinHandle,
12};
13
14use super::PoolConfig;
15
16struct PoolsInner {
17	system: Arc<ThreadPool>,
18	query: Arc<ThreadPool>,
19	tokio: Option<ManuallyDrop<Runtime>>,
20}
21
22impl Drop for PoolsInner {
23	fn drop(&mut self) {
24		if let Some(rt) = self.tokio.as_mut() {
25			let rt = unsafe { ManuallyDrop::take(rt) };
26			rt.shutdown_timeout(Duration::from_secs(5));
27		}
28	}
29}
30
31/// Handle to the runtime's thread pools.
32///
33/// Wraps two rayon `ThreadPool` instances for workload isolation
34/// and an optional tokio `Runtime` for async I/O.
35///
36/// When `async_threads` is 0 in the config, no tokio runtime is created.
37/// Calling `spawn()`, `block_on()`, or `handle()` without a tokio runtime
38/// will panic.
39#[derive(Clone)]
40pub struct Pools {
41	inner: Arc<PoolsInner>,
42}
43
44impl Default for Pools {
45	fn default() -> Self {
46		Self::new(PoolConfig::default())
47	}
48}
49
50impl Pools {
51	/// Create all thread pools from the given configuration.
52	///
53	/// If `async_threads` is 0, no tokio runtime is created.
54	pub fn new(config: PoolConfig) -> Self {
55		let system = Arc::new(
56			ThreadPoolBuilder::new()
57				.num_threads(config.system_threads)
58				.thread_name(|i| format!("system-pool-{i}"))
59				.build()
60				.expect("failed to build system thread pool"),
61		);
62		let query = Arc::new(
63			ThreadPoolBuilder::new()
64				.num_threads(config.query_threads)
65				.thread_name(|i| format!("query-pool-{i}"))
66				.build()
67				.expect("failed to build query thread pool"),
68		);
69		let tokio = if config.async_threads > 0 {
70			let rt = runtime::Builder::new_multi_thread()
71				.worker_threads(config.async_threads)
72				.thread_name("async")
73				.enable_all()
74				.build()
75				.expect("failed to build tokio runtime");
76			Some(ManuallyDrop::new(rt))
77		} else {
78			None
79		};
80
81		Self {
82			inner: Arc::new(PoolsInner {
83				system,
84				query,
85				tokio,
86			}),
87		}
88	}
89
90	/// Get a reference to the system rayon pool.
91	pub fn system_pool(&self) -> &Arc<ThreadPool> {
92		&self.inner.system
93	}
94
95	/// Get a reference to the query rayon pool.
96	pub fn query_pool(&self) -> &Arc<ThreadPool> {
97		&self.inner.query
98	}
99
100	fn tokio(&self) -> &Runtime {
101		self.inner.tokio.as_ref().expect("no tokio runtime configured (async_threads = 0)")
102	}
103
104	/// Get a handle to the tokio runtime.
105	pub fn handle(&self) -> runtime::Handle {
106		self.tokio().handle().clone()
107	}
108
109	/// Spawn a future onto the tokio runtime.
110	pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
111	where
112		F: Future + Send + 'static,
113		F::Output: Send + 'static,
114	{
115		self.tokio().spawn(future)
116	}
117
118	/// Block the current thread until the future completes.
119	pub fn block_on<F>(&self, future: F) -> F::Output
120	where
121		F: Future,
122	{
123		self.tokio().block_on(future)
124	}
125}