Skip to main content

reifydb_runtime/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Runtime management for ReifyDB.
5//!
6//! This crate provides a facade over platform-specific runtime implementations:
7//! - **Native**: tokio multi-threaded runtime + rayon-based actor system
8//! - **WASM**: Single-threaded execution with sequential processing
9//!
10//! The API is identical across platforms, with compile-time dispatch ensuring
11//! zero runtime overhead.
12//!
13//! # Example
14//!
15//! ```ignore
16//! use reifydb_runtime::{SharedRuntime, SharedRuntimeConfig};
17//!
18//! // Create a runtime with default configuration
19//! let runtime = SharedRuntime::from_config(SharedRuntimeConfig::default());
20//!
21//! // Or with custom configuration
22//! let config = SharedRuntimeConfig::default()
23//!     .async_threads(4)
24//!     .compute_threads(4)
25//!     .compute_max_in_flight(16);
26//! let runtime = SharedRuntime::from_config(config);
27//!
28//! // Spawn async work
29//! runtime.spawn(async {
30//!     // async work here
31//! });
32//!
33//! // Use the actor system for spawning actors and compute
34//! let system = runtime.actor_system();
35//! let handle = system.spawn("my-actor", MyActor::new());
36//!
37//! // Run CPU-bound work
38//! let result = system.install(|| expensive_calculation());
39//! ```
40
41#![allow(dead_code)]
42
43pub mod context;
44
45pub mod hash;
46
47pub mod sync;
48
49pub mod actor;
50
51use std::{future::Future, sync::Arc};
52#[cfg(not(target_arch = "wasm32"))]
53use std::{mem::ManuallyDrop, time::Duration};
54
55use crate::{
56	actor::system::{ActorSystem, ActorSystemConfig},
57	context::clock::{Clock, MockClock},
58};
59
60/// Configuration for creating a [`SharedRuntime`].
61#[derive(Clone)]
62pub struct SharedRuntimeConfig {
63	/// Number of worker threads for async runtime (ignored in WASM)
64	pub async_threads: usize,
65	/// Number of worker threads for compute/actor pool (ignored in WASM)
66	pub compute_threads: usize,
67	/// Maximum concurrent compute tasks (ignored in WASM)
68	pub compute_max_in_flight: usize,
69	/// Clock for time operations (defaults to real system clock)
70	pub clock: Clock,
71	/// Random number generator (defaults to OS entropy)
72	pub rng: context::rng::Rng,
73}
74
75impl Default for SharedRuntimeConfig {
76	fn default() -> Self {
77		Self {
78			async_threads: 1,
79			compute_threads: 1,
80			compute_max_in_flight: 32,
81			clock: Clock::Real,
82			rng: context::rng::Rng::default(),
83		}
84	}
85}
86
87impl SharedRuntimeConfig {
88	/// Set the number of async worker threads.
89	pub fn async_threads(mut self, threads: usize) -> Self {
90		self.async_threads = threads;
91		self
92	}
93
94	/// Set the number of compute worker threads.
95	pub fn compute_threads(mut self, threads: usize) -> Self {
96		self.compute_threads = threads;
97		self
98	}
99
100	/// Set the maximum number of in-flight compute tasks.
101	pub fn compute_max_in_flight(mut self, max: usize) -> Self {
102		self.compute_max_in_flight = max;
103		self
104	}
105
106	/// Configure for deterministic testing with the given seed.
107	/// Sets a mock clock starting at `seed` milliseconds and a seeded RNG.
108	pub fn deterministic_testing(mut self, seed: u64) -> Self {
109		self.clock = Clock::Mock(MockClock::from_millis(seed));
110		self.rng = context::rng::Rng::seeded(seed);
111		self
112	}
113
114	/// Derive an [`ActorSystemConfig`] from this runtime config.
115	pub fn actor_system_config(&self) -> ActorSystemConfig {
116		ActorSystemConfig {
117			pool_threads: self.compute_threads,
118			max_in_flight: self.compute_max_in_flight,
119		}
120	}
121}
122
123// WASM runtime types - single-threaded execution support
124use std::fmt;
125#[cfg(target_arch = "wasm32")]
126use std::{
127	pin::Pin,
128	task::{Context, Poll},
129};
130
131#[cfg(target_arch = "wasm32")]
132use futures_util::future::LocalBoxFuture;
133#[cfg(not(target_arch = "wasm32"))]
134use tokio::runtime::{self as tokio_runtime, Runtime};
135#[cfg(not(target_arch = "wasm32"))]
136use tokio::task::JoinHandle;
137
138/// WASM-compatible handle (placeholder).
139#[cfg(target_arch = "wasm32")]
140#[derive(Clone, Copy, Debug)]
141pub struct WasmHandle;
142
143/// WASM-compatible join handle.
144///
145/// Implements Future to be compatible with async/await.
146#[cfg(target_arch = "wasm32")]
147pub struct WasmJoinHandle<T> {
148	future: LocalBoxFuture<'static, T>,
149}
150
151#[cfg(target_arch = "wasm32")]
152impl<T> Future for WasmJoinHandle<T> {
153	type Output = Result<T, WasmJoinError>;
154
155	fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
156		match self.future.as_mut().poll(cx) {
157			Poll::Ready(v) => Poll::Ready(Ok(v)),
158			Poll::Pending => Poll::Pending,
159		}
160	}
161}
162
163/// WASM join error (compatible with tokio::task::JoinError API).
164#[cfg(target_arch = "wasm32")]
165#[derive(Debug)]
166pub struct WasmJoinError;
167
168#[cfg(target_arch = "wasm32")]
169impl fmt::Display for WasmJoinError {
170	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
171		write!(f, "WASM task failed")
172	}
173}
174
175#[cfg(target_arch = "wasm32")]
176use std::error::Error;
177
178#[cfg(target_arch = "wasm32")]
179impl Error for WasmJoinError {}
180
181/// Inner shared state for the runtime (native).
182#[cfg(not(target_arch = "wasm32"))]
183struct SharedRuntimeInner {
184	tokio: ManuallyDrop<Runtime>,
185	system: ActorSystem,
186	clock: Clock,
187	rng: context::rng::Rng,
188}
189
190#[cfg(not(target_arch = "wasm32"))]
191impl Drop for SharedRuntimeInner {
192	fn drop(&mut self) {
193		// SAFETY: drop is called exactly once; taking the Runtime here
194		// prevents its default Drop (which calls shutdown_background and
195		// does NOT wait). We call shutdown_timeout instead so that worker
196		// threads and I/O resources (epoll fd, timer fd, etc.) are fully
197		// reclaimed before this function returns.
198		let rt = unsafe { ManuallyDrop::take(&mut self.tokio) };
199		rt.shutdown_timeout(Duration::from_secs(5));
200	}
201}
202
203/// Inner shared state for the runtime (WASM).
204#[cfg(target_arch = "wasm32")]
205struct SharedRuntimeInner {
206	system: ActorSystem,
207	clock: Clock,
208	rng: context::rng::Rng,
209}
210
211/// Shared runtime that can be cloned and passed across subsystems.
212///
213/// Platform-agnostic facade over:
214/// - Native: tokio multi-threaded runtime + unified actor system
215/// - WASM: Single-threaded execution
216///
217/// Uses Arc internally, so cloning is cheap and all clones share the same
218/// underlying runtime and actor system.
219#[derive(Clone)]
220pub struct SharedRuntime(Arc<SharedRuntimeInner>);
221
222impl SharedRuntime {
223	/// Create a new shared runtime from configuration.
224	///
225	/// # Panics
226	///
227	/// Panics if the runtime cannot be created (native only).
228	#[cfg(not(target_arch = "wasm32"))]
229	pub fn from_config(config: SharedRuntimeConfig) -> Self {
230		let tokio = tokio_runtime::Builder::new_multi_thread()
231			.worker_threads(config.async_threads)
232			.thread_name("async")
233			.enable_all()
234			.build()
235			.expect("Failed to create tokio runtime");
236
237		let system = ActorSystem::new(config.actor_system_config());
238
239		Self(Arc::new(SharedRuntimeInner {
240			tokio: ManuallyDrop::new(tokio),
241			system,
242			clock: config.clock,
243			rng: config.rng,
244		}))
245	}
246
247	/// Create a new shared runtime from configuration.
248	#[cfg(target_arch = "wasm32")]
249	pub fn from_config(config: SharedRuntimeConfig) -> Self {
250		let system = ActorSystem::new(config.actor_system_config());
251
252		Self(Arc::new(SharedRuntimeInner {
253			system,
254			clock: config.clock,
255			rng: config.rng,
256		}))
257	}
258
259	/// Get the unified actor system for spawning actors and compute.
260	pub fn actor_system(&self) -> ActorSystem {
261		self.0.system.clone()
262	}
263
264	/// Get the clock for this runtime (shared across all threads).
265	pub fn clock(&self) -> &Clock {
266		&self.0.clock
267	}
268
269	/// Get the RNG for this runtime (shared across all threads).
270	pub fn rng(&self) -> &context::rng::Rng {
271		&self.0.rng
272	}
273
274	/// Get a handle to the async runtime.
275	///
276	/// Returns a platform-specific handle type:
277	/// - Native: `tokio_runtime::Handle`
278	/// - WASM: `WasmHandle`
279	#[cfg(not(target_arch = "wasm32"))]
280	pub fn handle(&self) -> tokio_runtime::Handle {
281		self.0.tokio.handle().clone()
282	}
283
284	/// Get a handle to the async runtime.
285	#[cfg(target_arch = "wasm32")]
286	pub fn handle(&self) -> WasmHandle {
287		WasmHandle
288	}
289
290	/// Spawn a future onto the runtime.
291	///
292	/// Returns a platform-specific join handle type:
293	/// - Native: `JoinHandle`
294	/// - WASM: `WasmJoinHandle`
295	#[cfg(not(target_arch = "wasm32"))]
296	pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
297	where
298		F: Future + Send + 'static,
299		F::Output: Send + 'static,
300	{
301		self.0.tokio.spawn(future)
302	}
303
304	/// Spawn a future onto the runtime.
305	#[cfg(target_arch = "wasm32")]
306	pub fn spawn<F>(&self, future: F) -> WasmJoinHandle<F::Output>
307	where
308		F: Future + 'static,
309		F::Output: 'static,
310	{
311		WasmJoinHandle {
312			future: Box::pin(future),
313		}
314	}
315
316	/// Block the current thread until the future completes.
317	///
318	/// **Note:** Not supported in WASM builds - will panic.
319	#[cfg(not(target_arch = "wasm32"))]
320	pub fn block_on<F>(&self, future: F) -> F::Output
321	where
322		F: Future,
323	{
324		self.0.tokio.block_on(future)
325	}
326
327	/// Block the current thread until the future completes.
328	///
329	/// **Note:** Not supported in WASM builds - will panic.
330	#[cfg(target_arch = "wasm32")]
331	pub fn block_on<F>(&self, _future: F) -> F::Output
332	where
333		F: Future,
334	{
335		unimplemented!("block_on not supported in WASM - use async execution instead")
336	}
337
338	/// Executes a closure on the actor system's pool directly.
339	///
340	/// This is a convenience method that delegates to the actor system's `install`.
341	/// Synchronous and bypasses admission control.
342	pub fn install<R, F>(&self, f: F) -> R
343	where
344		R: Send,
345		F: FnOnce() -> R + Send,
346	{
347		self.0.system.install(f)
348	}
349}
350
351impl fmt::Debug for SharedRuntime {
352	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
353		f.debug_struct("SharedRuntime").finish_non_exhaustive()
354	}
355}
356
357// Keep existing tests but gate them by target
358#[cfg(all(test, not(reifydb_single_threaded)))]
359mod tests {
360	use super::*;
361
362	fn test_config() -> SharedRuntimeConfig {
363		SharedRuntimeConfig::default().async_threads(2).compute_threads(2).compute_max_in_flight(4)
364	}
365
366	#[test]
367	fn test_runtime_creation() {
368		let runtime = SharedRuntime::from_config(test_config());
369		let result = runtime.block_on(async { 42 });
370		assert_eq!(result, 42);
371	}
372
373	#[test]
374	fn test_runtime_clone_shares_same_runtime() {
375		let rt1 = SharedRuntime::from_config(test_config());
376		let rt2 = rt1.clone();
377		assert!(Arc::ptr_eq(&rt1.0, &rt2.0));
378	}
379
380	#[test]
381	fn test_spawn() {
382		let runtime = SharedRuntime::from_config(test_config());
383		let handle = runtime.spawn(async { 123 });
384		let result = runtime.block_on(handle).unwrap();
385		assert_eq!(result, 123);
386	}
387
388	#[test]
389	fn test_actor_system_accessible() {
390		let runtime = SharedRuntime::from_config(test_config());
391		let system = runtime.actor_system();
392		let result = system.install(|| 42);
393		assert_eq!(result, 42);
394	}
395
396	#[test]
397	fn test_install() {
398		let runtime = SharedRuntime::from_config(test_config());
399		let result = runtime.install(|| 42);
400		assert_eq!(result, 42);
401	}
402}
403
404#[cfg(all(test, reifydb_single_threaded))]
405mod wasm_tests {
406	use super::*;
407
408	#[test]
409	fn test_wasm_runtime_creation() {
410		let runtime = SharedRuntime::from_config(SharedRuntimeConfig::default());
411		let system = runtime.actor_system();
412		let result = system.install(|| 42);
413		assert_eq!(result, 42);
414	}
415}