Skip to main content

reifydb_runtime/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3#![cfg_attr(not(debug_assertions), deny(clippy::disallowed_methods))]
4#![cfg_attr(debug_assertions, warn(clippy::disallowed_methods))]
5#![allow(clippy::tabs_in_doc_comments)]
6
7//! Runtime management for ReifyDB.
8//!
9//! This crate provides a facade over platform-specific runtime implementations:
10//! - **Native**: tokio multi-threaded runtime + rayon-based actor system
11//! - **WASM**: Single-threaded execution with sequential processing
12//!
13//! The API is identical across platforms, with compile-time dispatch ensuring
14//! zero runtime overhead.
15//!
16//! # Example
17//!
18//! ```ignore
19//! use reifydb_runtime::{SharedRuntime, SharedRuntimeConfig};
20//!
21//! // Create a runtime with default configuration
22//! let runtime = SharedRuntime::from_config(SharedRuntimeConfig::default());
23//!
24//! // Or with custom configuration
25//! let config = SharedRuntimeConfig::default()
26//!     .async_threads(4)
27//!     .compute_threads(4);
28//! let runtime = SharedRuntime::from_config(config);
29//!
30//! // Spawn async work
31//! runtime.spawn(async {
32//!     // async work here
33//! });
34//!
35//! // Use the actor system for spawning actors
36//! let system = runtime.actor_system();
37//! let handle = system.spawn("my-actor", MyActor::new());
38//! ```
39
40#![allow(dead_code)]
41
42pub mod context;
43
44pub mod hash;
45
46pub mod sync;
47
48pub mod actor;
49
50use std::{future::Future, sync::Arc, thread::available_parallelism};
51#[cfg(not(target_arch = "wasm32"))]
52use std::{mem::ManuallyDrop, time::Duration};
53
54use crate::{
55	actor::system::ActorSystem,
56	context::clock::{Clock, MockClock},
57};
58
59/// Configuration for creating a [`SharedRuntime`].
60#[derive(Clone)]
61pub struct SharedRuntimeConfig {
62	/// Number of worker threads for async runtime (ignored in WASM)
63	pub async_threads: usize,
64	/// Number of worker threads for compute/actor pool (ignored in WASM)
65	pub compute_threads: usize,
66	/// Clock for time operations (defaults to real system clock)
67	pub clock: Clock,
68	/// Random number generator (defaults to OS entropy)
69	pub rng: context::rng::Rng,
70}
71
72impl Default for SharedRuntimeConfig {
73	fn default() -> Self {
74		Self {
75			async_threads: 1,
76			compute_threads: available_parallelism().map_or(1, |n| n.get()),
77			clock: Clock::Real,
78			rng: context::rng::Rng::default(),
79		}
80	}
81}
82
83impl SharedRuntimeConfig {
84	/// Set the number of async worker threads.
85	pub fn async_threads(mut self, threads: usize) -> Self {
86		self.async_threads = threads;
87		self
88	}
89
90	/// Set the number of compute worker threads.
91	pub fn compute_threads(mut self, threads: usize) -> Self {
92		self.compute_threads = threads;
93		self
94	}
95
96	/// Configure for deterministic testing with the given seed.
97	/// Sets a mock clock starting at `seed` milliseconds and a seeded RNG.
98	pub fn deterministic_testing(mut self, seed: u64) -> Self {
99		self.clock = Clock::Mock(MockClock::from_millis(seed));
100		self.rng = context::rng::Rng::seeded(seed);
101		self
102	}
103}
104
105// WASM runtime types - single-threaded execution support
106use std::fmt;
107#[cfg(target_arch = "wasm32")]
108use std::{
109	pin::Pin,
110	task::{Context, Poll},
111};
112
113#[cfg(target_arch = "wasm32")]
114use futures_util::future::LocalBoxFuture;
115#[cfg(not(target_arch = "wasm32"))]
116use tokio::runtime::{self as tokio_runtime, Runtime};
117#[cfg(not(target_arch = "wasm32"))]
118use tokio::task::JoinHandle;
119
120/// WASM-compatible handle (placeholder).
121#[cfg(target_arch = "wasm32")]
122#[derive(Clone, Copy, Debug)]
123pub struct WasmHandle;
124
125/// WASM-compatible join handle.
126///
127/// Implements Future to be compatible with async/await.
128#[cfg(target_arch = "wasm32")]
129pub struct WasmJoinHandle<T> {
130	future: LocalBoxFuture<'static, T>,
131}
132
133#[cfg(target_arch = "wasm32")]
134impl<T> Future for WasmJoinHandle<T> {
135	type Output = Result<T, WasmJoinError>;
136
137	fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
138		match self.future.as_mut().poll(cx) {
139			Poll::Ready(v) => Poll::Ready(Ok(v)),
140			Poll::Pending => Poll::Pending,
141		}
142	}
143}
144
145/// WASM join error (compatible with tokio::task::JoinError API).
146#[cfg(target_arch = "wasm32")]
147#[derive(Debug)]
148pub struct WasmJoinError;
149
150#[cfg(target_arch = "wasm32")]
151impl fmt::Display for WasmJoinError {
152	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
153		write!(f, "WASM task failed")
154	}
155}
156
157#[cfg(target_arch = "wasm32")]
158use std::error::Error;
159
160#[cfg(target_arch = "wasm32")]
161impl Error for WasmJoinError {}
162
163/// Inner shared state for the runtime (native).
164#[cfg(not(target_arch = "wasm32"))]
165struct SharedRuntimeInner {
166	tokio: ManuallyDrop<Runtime>,
167	system: ActorSystem,
168	clock: Clock,
169	rng: context::rng::Rng,
170}
171
172#[cfg(not(target_arch = "wasm32"))]
173impl Drop for SharedRuntimeInner {
174	fn drop(&mut self) {
175		// Shut down the actor system FIRST — actors may hold tokio
176		// JoinHandles or spawn tasks, so the runtime must still be alive.
177		self.system.shutdown();
178		let _ = self.system.join();
179
180		// SAFETY: drop is called exactly once; taking the Runtime here
181		// prevents its default Drop (which calls shutdown_background and
182		// does NOT wait). We call shutdown_timeout instead so that worker
183		// threads and I/O resources (epoll fd, timer fd, etc.) are fully
184		// reclaimed before this function returns.
185		let rt = unsafe { ManuallyDrop::take(&mut self.tokio) };
186		rt.shutdown_timeout(Duration::from_secs(5));
187	}
188}
189
190/// Inner shared state for the runtime (WASM).
191#[cfg(target_arch = "wasm32")]
192struct SharedRuntimeInner {
193	system: ActorSystem,
194	clock: Clock,
195	rng: context::rng::Rng,
196}
197
198/// Shared runtime that can be cloned and passed across subsystems.
199///
200/// Platform-agnostic facade over:
201/// - Native: tokio multi-threaded runtime + unified actor system
202/// - WASM: Single-threaded execution
203///
204/// Uses Arc internally, so cloning is cheap and all clones share the same
205/// underlying runtime and actor system.
206#[derive(Clone)]
207pub struct SharedRuntime(Arc<SharedRuntimeInner>);
208
209impl SharedRuntime {
210	/// Create a new shared runtime from configuration.
211	///
212	/// # Panics
213	///
214	/// Panics if the runtime cannot be created (native only).
215	#[cfg(not(target_arch = "wasm32"))]
216	pub fn from_config(config: SharedRuntimeConfig) -> Self {
217		let tokio = tokio_runtime::Builder::new_multi_thread()
218			.worker_threads(config.async_threads)
219			.thread_name("async")
220			.enable_all()
221			.build()
222			.expect("Failed to create tokio runtime");
223
224		let system = ActorSystem::with_clock(config.compute_threads, config.clock.clone());
225
226		Self(Arc::new(SharedRuntimeInner {
227			tokio: ManuallyDrop::new(tokio),
228			system,
229			clock: config.clock,
230			rng: config.rng,
231		}))
232	}
233
234	/// Create a new shared runtime from configuration.
235	#[cfg(target_arch = "wasm32")]
236	pub fn from_config(config: SharedRuntimeConfig) -> Self {
237		let system = ActorSystem::with_clock(config.compute_threads, config.clock.clone());
238
239		Self(Arc::new(SharedRuntimeInner {
240			system,
241			clock: config.clock,
242			rng: config.rng,
243		}))
244	}
245
246	/// Get the unified actor system for spawning actors and compute.
247	pub fn actor_system(&self) -> ActorSystem {
248		self.0.system.clone()
249	}
250
251	/// Get the clock for this runtime (shared across all threads).
252	pub fn clock(&self) -> &Clock {
253		&self.0.clock
254	}
255
256	/// Get the RNG for this runtime (shared across all threads).
257	pub fn rng(&self) -> &context::rng::Rng {
258		&self.0.rng
259	}
260
261	/// Get a handle to the async runtime.
262	///
263	/// Returns a platform-specific handle type:
264	/// - Native: `tokio_runtime::Handle`
265	/// - WASM: `WasmHandle`
266	#[cfg(not(target_arch = "wasm32"))]
267	pub fn handle(&self) -> tokio_runtime::Handle {
268		self.0.tokio.handle().clone()
269	}
270
271	/// Get a handle to the async runtime.
272	#[cfg(target_arch = "wasm32")]
273	pub fn handle(&self) -> WasmHandle {
274		WasmHandle
275	}
276
277	/// Spawn a future onto the runtime.
278	///
279	/// Returns a platform-specific join handle type:
280	/// - Native: `JoinHandle`
281	/// - WASM: `WasmJoinHandle`
282	#[cfg(not(target_arch = "wasm32"))]
283	pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
284	where
285		F: Future + Send + 'static,
286		F::Output: Send + 'static,
287	{
288		self.0.tokio.spawn(future)
289	}
290
291	/// Spawn a future onto the runtime.
292	#[cfg(target_arch = "wasm32")]
293	pub fn spawn<F>(&self, future: F) -> WasmJoinHandle<F::Output>
294	where
295		F: Future + 'static,
296		F::Output: 'static,
297	{
298		WasmJoinHandle {
299			future: Box::pin(future),
300		}
301	}
302
303	/// Block the current thread until the future completes.
304	///
305	/// **Note:** Not supported in WASM builds - will panic.
306	#[cfg(not(target_arch = "wasm32"))]
307	pub fn block_on<F>(&self, future: F) -> F::Output
308	where
309		F: Future,
310	{
311		self.0.tokio.block_on(future)
312	}
313
314	/// Block the current thread until the future completes.
315	///
316	/// **Note:** Not supported in WASM builds - will panic.
317	#[cfg(target_arch = "wasm32")]
318	pub fn block_on<F>(&self, _future: F) -> F::Output
319	where
320		F: Future,
321	{
322		unimplemented!("block_on not supported in WASM - use async execution instead")
323	}
324}
325
326impl fmt::Debug for SharedRuntime {
327	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
328		f.debug_struct("SharedRuntime").finish_non_exhaustive()
329	}
330}
331
332// Keep existing tests but gate them by target
333#[cfg(all(test, not(reifydb_single_threaded)))]
334mod tests {
335	use super::*;
336
337	fn test_config() -> SharedRuntimeConfig {
338		SharedRuntimeConfig::default().async_threads(2).compute_threads(2)
339	}
340
341	#[test]
342	fn test_runtime_creation() {
343		let runtime = SharedRuntime::from_config(test_config());
344		let result = runtime.block_on(async { 42 });
345		assert_eq!(result, 42);
346	}
347
348	#[test]
349	fn test_runtime_clone_shares_same_runtime() {
350		let rt1 = SharedRuntime::from_config(test_config());
351		let rt2 = rt1.clone();
352		assert!(Arc::ptr_eq(&rt1.0, &rt2.0));
353	}
354
355	#[test]
356	fn test_spawn() {
357		let runtime = SharedRuntime::from_config(test_config());
358		let handle = runtime.spawn(async { 123 });
359		let result = runtime.block_on(handle).unwrap();
360		assert_eq!(result, 123);
361	}
362
363	#[test]
364	fn test_actor_system_accessible() {
365		let runtime = SharedRuntime::from_config(test_config());
366		let _system = runtime.actor_system();
367	}
368}