reifydb-runtime 0.4.11

Runtime infrastructure for ReifyDB
Documentation
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2025 ReifyDB

//! Native thread pool implementation using rayon and tokio.

use std::{future::Future, mem::ManuallyDrop, sync::Arc, time::Duration};

use rayon::{ThreadPool, ThreadPoolBuilder};
use tokio::{
	runtime::{self, Runtime},
	task::JoinHandle,
};

use super::PoolConfig;

struct PoolsInner {
	system: Arc<ThreadPool>,
	query: Arc<ThreadPool>,
	tokio: Option<ManuallyDrop<Runtime>>,
}

impl Drop for PoolsInner {
	fn drop(&mut self) {
		if let Some(rt) = self.tokio.as_mut() {
			let rt = unsafe { ManuallyDrop::take(rt) };
			rt.shutdown_timeout(Duration::from_secs(5));
		}
	}
}

/// Handle to the runtime's thread pools.
///
/// Wraps two rayon `ThreadPool` instances for workload isolation
/// and an optional tokio `Runtime` for async I/O.
///
/// When `async_threads` is 0 in the config, no tokio runtime is created.
/// Calling `spawn()`, `block_on()`, or `handle()` without a tokio runtime
/// will panic.
#[derive(Clone)]
pub struct Pools {
	inner: Arc<PoolsInner>,
}

impl Default for Pools {
	fn default() -> Self {
		Self::new(PoolConfig::default())
	}
}

impl Pools {
	/// Create all thread pools from the given configuration.
	///
	/// If `async_threads` is 0, no tokio runtime is created.
	pub fn new(config: PoolConfig) -> Self {
		let system = Arc::new(
			ThreadPoolBuilder::new()
				.num_threads(config.system_threads)
				.thread_name(|i| format!("system-pool-{i}"))
				.build()
				.expect("failed to build system thread pool"),
		);
		let query = Arc::new(
			ThreadPoolBuilder::new()
				.num_threads(config.query_threads)
				.thread_name(|i| format!("query-pool-{i}"))
				.build()
				.expect("failed to build query thread pool"),
		);
		let tokio = if config.async_threads > 0 {
			let rt = runtime::Builder::new_multi_thread()
				.worker_threads(config.async_threads)
				.thread_name("async")
				.enable_all()
				.build()
				.expect("failed to build tokio runtime");
			Some(ManuallyDrop::new(rt))
		} else {
			None
		};

		Self {
			inner: Arc::new(PoolsInner {
				system,
				query,
				tokio,
			}),
		}
	}

	/// Get a reference to the system rayon pool.
	pub fn system_pool(&self) -> &Arc<ThreadPool> {
		&self.inner.system
	}

	/// Get a reference to the query rayon pool.
	pub fn query_pool(&self) -> &Arc<ThreadPool> {
		&self.inner.query
	}

	fn tokio(&self) -> &Runtime {
		self.inner.tokio.as_ref().expect("no tokio runtime configured (async_threads = 0)")
	}

	/// Get a handle to the tokio runtime.
	pub fn handle(&self) -> runtime::Handle {
		self.tokio().handle().clone()
	}

	/// Spawn a future onto the tokio runtime.
	pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
	where
		F: Future + Send + 'static,
		F::Output: Send + 'static,
	{
		self.tokio().spawn(future)
	}

	/// Block the current thread until the future completes.
	pub fn block_on<F>(&self, future: F) -> F::Output
	where
		F: Future,
	{
		self.tokio().block_on(future)
	}
}