Skip to main content

reifydb_runtime/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3#![cfg_attr(not(debug_assertions), deny(clippy::disallowed_methods))]
4#![cfg_attr(debug_assertions, warn(clippy::disallowed_methods))]
5#![allow(clippy::tabs_in_doc_comments)]
6
7//! Runtime management for ReifyDB.
8//!
9//! This crate provides a facade over platform-specific runtime implementations:
10//! - **Native**: tokio multi-threaded runtime + rayon-based actor system
11//! - **WASM**: Single-threaded execution with sequential processing
12//!
13//! The API is identical across platforms, with compile-time dispatch ensuring
14//! zero runtime overhead.
15//!
16//! # Example
17//!
18//! ```ignore
19//! use reifydb_runtime::{SharedRuntime, SharedRuntimeConfig};
20//!
21//! // Create a runtime with default configuration
22//! let runtime = SharedRuntime::from_config(SharedRuntimeConfig::default());
23//!
24//! // Or with custom configuration
25//! let config = SharedRuntimeConfig::default()
26//!     .async_threads(4)
27//!     .system_threads(4)
28//!     .query_threads(4);
29//! let runtime = SharedRuntime::from_config(config);
30//!
31//! // Spawn async work
32//! runtime.spawn(async {
33//!     // async work here
34//! });
35//!
36//! // Use the actor system for spawning actors
37//! let system = runtime.actor_system();
38//! let handle = system.spawn("my-actor", MyActor::new());
39//! ```
40
41#![allow(dead_code)]
42
43pub mod context;
44
45pub mod hash;
46
47pub mod pool;
48
49pub mod sync;
50
51pub mod actor;
52
53#[cfg(not(reifydb_target = "dst"))]
54use std::future::Future;
55use std::{sync::Arc, thread::available_parallelism};
56
57use crate::{
58	actor::system::ActorSystem,
59	context::clock::{Clock, MockClock},
60	pool::{PoolConfig, Pools},
61};
62
63/// Configuration for creating a [`SharedRuntime`].
64#[derive(Clone)]
65pub struct SharedRuntimeConfig {
66	/// Number of worker threads for async runtime (ignored in WASM)
67	pub async_threads: usize,
68	/// Number of worker threads for the system pool (lightweight actors).
69	pub system_threads: usize,
70	/// Number of worker threads for the query pool (execution-heavy actors).
71	pub query_threads: usize,
72	/// Clock for time operations (defaults to real system clock)
73	pub clock: Clock,
74	/// Random number generator (defaults to OS entropy)
75	pub rng: context::rng::Rng,
76}
77
78impl Default for SharedRuntimeConfig {
79	fn default() -> Self {
80		let cpus = available_parallelism().map_or(1, |n| n.get());
81		Self {
82			async_threads: 1,
83			system_threads: cpus.min(4),
84			query_threads: cpus,
85			clock: Clock::Real,
86			rng: context::rng::Rng::default(),
87		}
88	}
89}
90
91impl SharedRuntimeConfig {
92	/// Set the number of async worker threads.
93	pub fn async_threads(mut self, threads: usize) -> Self {
94		self.async_threads = threads;
95		self
96	}
97
98	/// Set the number of system pool threads (lightweight actors).
99	pub fn system_threads(mut self, threads: usize) -> Self {
100		self.system_threads = threads;
101		self
102	}
103
104	/// Set the number of query pool threads (execution-heavy actors).
105	pub fn query_threads(mut self, threads: usize) -> Self {
106		self.query_threads = threads;
107		self
108	}
109
110	/// Configure for deterministic testing with the given seed.
111	/// Sets a mock clock starting at `seed` milliseconds and a seeded RNG.
112	pub fn deterministic_testing(mut self, seed: u64) -> Self {
113		self.clock = Clock::Mock(MockClock::from_millis(seed));
114		self.rng = context::rng::Rng::seeded(seed);
115		self
116	}
117}
118
119// WASM runtime types - single-threaded execution support
120use std::fmt;
121#[cfg(target_arch = "wasm32")]
122use std::{
123	pin::Pin,
124	task::{Context, Poll},
125};
126
127#[cfg(target_arch = "wasm32")]
128use futures_util::future::LocalBoxFuture;
129#[cfg(all(not(target_arch = "wasm32"), not(reifydb_target = "dst")))]
130use tokio::runtime as tokio_runtime;
131#[cfg(all(not(target_arch = "wasm32"), not(reifydb_target = "dst")))]
132use tokio::task::JoinHandle;
133
134/// WASM-compatible handle (placeholder).
135#[cfg(target_arch = "wasm32")]
136#[derive(Clone, Copy, Debug)]
137pub struct WasmHandle;
138
139/// WASM-compatible join handle.
140///
141/// Implements Future to be compatible with async/await.
142#[cfg(target_arch = "wasm32")]
143pub struct WasmJoinHandle<T> {
144	future: LocalBoxFuture<'static, T>,
145}
146
147#[cfg(target_arch = "wasm32")]
148impl<T> Future for WasmJoinHandle<T> {
149	type Output = Result<T, WasmJoinError>;
150
151	fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
152		match self.future.as_mut().poll(cx) {
153			Poll::Ready(v) => Poll::Ready(Ok(v)),
154			Poll::Pending => Poll::Pending,
155		}
156	}
157}
158
159/// WASM join error (compatible with tokio::task::JoinError API).
160#[cfg(target_arch = "wasm32")]
161#[derive(Debug)]
162pub struct WasmJoinError;
163
164#[cfg(target_arch = "wasm32")]
165impl fmt::Display for WasmJoinError {
166	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
167		write!(f, "WASM task failed")
168	}
169}
170
171#[cfg(target_arch = "wasm32")]
172use std::error::Error;
173
174#[cfg(target_arch = "wasm32")]
175impl Error for WasmJoinError {}
176
177/// Inner shared state for the runtime (native).
178#[cfg(all(not(target_arch = "wasm32"), not(reifydb_target = "dst")))]
179struct SharedRuntimeInner {
180	system: ActorSystem,
181	pools: Pools,
182	clock: Clock,
183	rng: context::rng::Rng,
184}
185
186#[cfg(all(not(target_arch = "wasm32"), not(reifydb_target = "dst")))]
187impl Drop for SharedRuntimeInner {
188	fn drop(&mut self) {
189		self.system.shutdown();
190		let _ = self.system.join();
191	}
192}
193
194/// Inner shared state for the runtime (WASM).
195#[cfg(target_arch = "wasm32")]
196struct SharedRuntimeInner {
197	system: ActorSystem,
198	pools: Pools,
199	clock: Clock,
200	rng: context::rng::Rng,
201}
202
203/// Inner shared state for the runtime (DST).
204#[cfg(reifydb_target = "dst")]
205struct SharedRuntimeInner {
206	system: ActorSystem,
207	pools: Pools,
208	clock: Clock,
209	rng: context::rng::Rng,
210}
211
212/// Shared runtime that can be cloned and passed across subsystems.
213///
214/// Platform-agnostic facade over:
215/// - Native: tokio multi-threaded runtime + unified actor system
216/// - WASM: Single-threaded execution
217///
218/// Uses Arc internally, so cloning is cheap and all clones share the same
219/// underlying runtime and actor system.
220#[derive(Clone)]
221pub struct SharedRuntime(Arc<SharedRuntimeInner>);
222
223impl SharedRuntime {
224	/// Create a new shared runtime from configuration.
225	pub fn from_config(config: SharedRuntimeConfig) -> Self {
226		let pools = Pools::new(PoolConfig {
227			system_threads: config.system_threads,
228			query_threads: config.query_threads,
229			async_threads: config.async_threads,
230		});
231		let system = ActorSystem::new(pools.clone(), config.clock.clone());
232
233		Self(Arc::new(SharedRuntimeInner {
234			system,
235			pools,
236			clock: config.clock,
237			rng: config.rng,
238		}))
239	}
240
241	/// Get the unified actor system for spawning actors and compute.
242	pub fn actor_system(&self) -> ActorSystem {
243		self.0.system.clone()
244	}
245
246	/// Get the clock for this runtime (shared across all threads).
247	pub fn clock(&self) -> &Clock {
248		&self.0.clock
249	}
250
251	/// Get the RNG for this runtime (shared across all threads).
252	pub fn rng(&self) -> &context::rng::Rng {
253		&self.0.rng
254	}
255
256	/// Get the pools.
257	pub fn pools(&self) -> Pools {
258		self.0.pools.clone()
259	}
260
261	/// Get a handle to the async runtime.
262	#[cfg(all(not(target_arch = "wasm32"), not(reifydb_target = "dst")))]
263	pub fn handle(&self) -> tokio_runtime::Handle {
264		self.0.pools.handle()
265	}
266
267	/// Get a handle to the async runtime.
268	#[cfg(target_arch = "wasm32")]
269	pub fn handle(&self) -> WasmHandle {
270		WasmHandle
271	}
272
273	/// Spawn a future onto the runtime.
274	#[cfg(all(not(target_arch = "wasm32"), not(reifydb_target = "dst")))]
275	pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
276	where
277		F: Future + Send + 'static,
278		F::Output: Send + 'static,
279	{
280		self.0.pools.spawn(future)
281	}
282
283	/// Spawn a future onto the runtime.
284	#[cfg(target_arch = "wasm32")]
285	pub fn spawn<F>(&self, future: F) -> WasmJoinHandle<F::Output>
286	where
287		F: Future + 'static,
288		F::Output: 'static,
289	{
290		WasmJoinHandle {
291			future: Box::pin(future),
292		}
293	}
294
295	/// Block the current thread until the future completes.
296	#[cfg(all(not(target_arch = "wasm32"), not(reifydb_target = "dst")))]
297	pub fn block_on<F>(&self, future: F) -> F::Output
298	where
299		F: Future,
300	{
301		self.0.pools.block_on(future)
302	}
303
304	/// Block the current thread until the future completes.
305	#[cfg(target_arch = "wasm32")]
306	pub fn block_on<F>(&self, _future: F) -> F::Output
307	where
308		F: Future,
309	{
310		unimplemented!("block_on not supported in WASM - use async execution instead")
311	}
312}
313
314impl fmt::Debug for SharedRuntime {
315	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
316		f.debug_struct("SharedRuntime").finish_non_exhaustive()
317	}
318}
319
320// Keep existing tests but gate them by target
321#[cfg(all(test, not(reifydb_single_threaded)))]
322mod tests {
323	use super::*;
324
325	fn test_config() -> SharedRuntimeConfig {
326		SharedRuntimeConfig::default().async_threads(2).system_threads(2).query_threads(2)
327	}
328
329	#[test]
330	fn test_runtime_creation() {
331		let runtime = SharedRuntime::from_config(test_config());
332		let result = runtime.block_on(async { 42 });
333		assert_eq!(result, 42);
334	}
335
336	#[test]
337	fn test_runtime_clone_shares_same_runtime() {
338		let rt1 = SharedRuntime::from_config(test_config());
339		let rt2 = rt1.clone();
340		assert!(Arc::ptr_eq(&rt1.0, &rt2.0));
341	}
342
343	#[test]
344	fn test_spawn() {
345		let runtime = SharedRuntime::from_config(test_config());
346		let handle = runtime.spawn(async { 123 });
347		let result = runtime.block_on(handle).unwrap();
348		assert_eq!(result, 123);
349	}
350
351	#[test]
352	fn test_actor_system_accessible() {
353		let runtime = SharedRuntime::from_config(test_config());
354		let _system = runtime.actor_system();
355	}
356}