Skip to main content

reifydb_runtime/
lib.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4//! Runtime management for ReifyDB.
5//!
6//! This crate provides a facade over platform-specific runtime implementations:
7//! - **Native**: tokio multi-threaded runtime + rayon-based actor system
8//! - **WASM**: Single-threaded execution with sequential processing
9//!
10//! The API is identical across platforms, with compile-time dispatch ensuring
11//! zero runtime overhead.
12//!
13//! # Example
14//!
15//! ```ignore
16//! use reifydb_runtime::{SharedRuntime, SharedRuntimeConfig};
17//!
18//! // Create a runtime with default configuration
19//! let runtime = SharedRuntime::from_config(SharedRuntimeConfig::default());
20//!
21//! // Or with custom configuration
22//! let config = SharedRuntimeConfig::default()
23//!     .async_threads(4)
24//!     .compute_threads(4)
25//!     .compute_max_in_flight(16);
26//! let runtime = SharedRuntime::from_config(config);
27//!
28//! // Spawn async work
29//! runtime.spawn(async {
30//!     // async work here
31//! });
32//!
33//! // Use the actor system for spawning actors and compute
34//! let system = runtime.actor_system();
35//! let handle = system.spawn("my-actor", MyActor::new());
36//!
37//! // Run CPU-bound work
38//! let result = system.install(|| expensive_calculation());
39//! ```
40
41#![allow(dead_code)]
42
43pub mod clock;
44
45pub mod hash;
46
47pub mod sync;
48
49pub mod actor;
50
51use std::{future::Future, sync::Arc};
52#[cfg(not(target_arch = "wasm32"))]
53use std::{mem::ManuallyDrop, time::Duration};
54
55use crate::{
56	actor::system::{ActorSystem, ActorSystemConfig},
57	clock::{Clock, MockClock},
58};
59
60/// Configuration for creating a [`SharedRuntime`].
61#[derive(Clone)]
62pub struct SharedRuntimeConfig {
63	/// Number of worker threads for async runtime (ignored in WASM)
64	pub async_threads: usize,
65	/// Number of worker threads for compute/actor pool (ignored in WASM)
66	pub compute_threads: usize,
67	/// Maximum concurrent compute tasks (ignored in WASM)
68	pub compute_max_in_flight: usize,
69	/// Clock for time operations (defaults to real system clock)
70	pub clock: Clock,
71}
72
73impl Default for SharedRuntimeConfig {
74	fn default() -> Self {
75		Self {
76			async_threads: 1,
77			compute_threads: 1,
78			compute_max_in_flight: 32,
79			clock: Clock::Real,
80		}
81	}
82}
83
84impl SharedRuntimeConfig {
85	/// Set the number of async worker threads.
86	pub fn async_threads(mut self, threads: usize) -> Self {
87		self.async_threads = threads;
88		self
89	}
90
91	/// Set the number of compute worker threads.
92	pub fn compute_threads(mut self, threads: usize) -> Self {
93		self.compute_threads = threads;
94		self
95	}
96
97	/// Set the maximum number of in-flight compute tasks.
98	pub fn compute_max_in_flight(mut self, max: usize) -> Self {
99		self.compute_max_in_flight = max;
100		self
101	}
102
103	/// Use a mock clock starting at the given milliseconds.
104	pub fn mock_clock(mut self, initial_millis: u64) -> Self {
105		self.clock = Clock::Mock(MockClock::from_millis(initial_millis));
106		self
107	}
108
109	/// Use a custom clock.
110	pub fn clock(mut self, clock: Clock) -> Self {
111		self.clock = clock;
112		self
113	}
114
115	/// Derive an [`ActorSystemConfig`] from this runtime config.
116	pub fn actor_system_config(&self) -> ActorSystemConfig {
117		ActorSystemConfig {
118			pool_threads: self.compute_threads,
119			max_in_flight: self.compute_max_in_flight,
120		}
121	}
122}
123
124// WASM runtime types - single-threaded execution support
125use std::fmt;
126#[cfg(target_arch = "wasm32")]
127use std::{
128	pin::Pin,
129	task::{Context, Poll},
130};
131
132#[cfg(target_arch = "wasm32")]
133use futures_util::future::LocalBoxFuture;
134#[cfg(not(target_arch = "wasm32"))]
135use tokio::runtime::{self as tokio_runtime, Runtime};
136#[cfg(not(target_arch = "wasm32"))]
137use tokio::task::JoinHandle;
138
139/// WASM-compatible handle (placeholder).
140#[cfg(target_arch = "wasm32")]
141#[derive(Clone, Copy, Debug)]
142pub struct WasmHandle;
143
144/// WASM-compatible join handle.
145///
146/// Implements Future to be compatible with async/await.
147#[cfg(target_arch = "wasm32")]
148pub struct WasmJoinHandle<T> {
149	future: LocalBoxFuture<'static, T>,
150}
151
152#[cfg(target_arch = "wasm32")]
153impl<T> Future for WasmJoinHandle<T> {
154	type Output = Result<T, WasmJoinError>;
155
156	fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
157		match self.future.as_mut().poll(cx) {
158			Poll::Ready(v) => Poll::Ready(Ok(v)),
159			Poll::Pending => Poll::Pending,
160		}
161	}
162}
163
164/// WASM join error (compatible with tokio::task::JoinError API).
165#[cfg(target_arch = "wasm32")]
166#[derive(Debug)]
167pub struct WasmJoinError;
168
169#[cfg(target_arch = "wasm32")]
170impl fmt::Display for WasmJoinError {
171	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
172		write!(f, "WASM task failed")
173	}
174}
175
176#[cfg(target_arch = "wasm32")]
177use std::error::Error;
178
179#[cfg(target_arch = "wasm32")]
180impl Error for WasmJoinError {}
181
182/// Inner shared state for the runtime (native).
183#[cfg(not(target_arch = "wasm32"))]
184struct SharedRuntimeInner {
185	tokio: ManuallyDrop<Runtime>,
186	system: ActorSystem,
187	clock: Clock,
188}
189
190#[cfg(not(target_arch = "wasm32"))]
191impl Drop for SharedRuntimeInner {
192	fn drop(&mut self) {
193		// SAFETY: drop is called exactly once; taking the Runtime here
194		// prevents its default Drop (which calls shutdown_background and
195		// does NOT wait). We call shutdown_timeout instead so that worker
196		// threads and I/O resources (epoll fd, timer fd, etc.) are fully
197		// reclaimed before this function returns.
198		let rt = unsafe { ManuallyDrop::take(&mut self.tokio) };
199		rt.shutdown_timeout(Duration::from_secs(5));
200	}
201}
202
203/// Inner shared state for the runtime (WASM).
204#[cfg(target_arch = "wasm32")]
205struct SharedRuntimeInner {
206	system: ActorSystem,
207	clock: Clock,
208}
209
210/// Shared runtime that can be cloned and passed across subsystems.
211///
212/// Platform-agnostic facade over:
213/// - Native: tokio multi-threaded runtime + unified actor system
214/// - WASM: Single-threaded execution
215///
216/// Uses Arc internally, so cloning is cheap and all clones share the same
217/// underlying runtime and actor system.
218#[derive(Clone)]
219pub struct SharedRuntime(Arc<SharedRuntimeInner>);
220
221impl SharedRuntime {
222	/// Create a new shared runtime from configuration.
223	///
224	/// # Panics
225	///
226	/// Panics if the runtime cannot be created (native only).
227	#[cfg(not(target_arch = "wasm32"))]
228	pub fn from_config(config: SharedRuntimeConfig) -> Self {
229		let tokio = tokio_runtime::Builder::new_multi_thread()
230			.worker_threads(config.async_threads)
231			.thread_name("async")
232			.enable_all()
233			.build()
234			.expect("Failed to create tokio runtime");
235
236		let system = ActorSystem::new(config.actor_system_config());
237
238		Self(Arc::new(SharedRuntimeInner {
239			tokio: ManuallyDrop::new(tokio),
240			system,
241			clock: config.clock,
242		}))
243	}
244
245	/// Create a new shared runtime from configuration.
246	#[cfg(target_arch = "wasm32")]
247	pub fn from_config(config: SharedRuntimeConfig) -> Self {
248		let system = ActorSystem::new(config.actor_system_config());
249
250		Self(Arc::new(SharedRuntimeInner {
251			system,
252			clock: config.clock,
253		}))
254	}
255
256	/// Get the unified actor system for spawning actors and compute.
257	pub fn actor_system(&self) -> ActorSystem {
258		self.0.system.clone()
259	}
260
261	/// Get the clock for this runtime (shared across all threads).
262	pub fn clock(&self) -> &Clock {
263		&self.0.clock
264	}
265
266	/// Get a handle to the async runtime.
267	///
268	/// Returns a platform-specific handle type:
269	/// - Native: `tokio_runtime::Handle`
270	/// - WASM: `WasmHandle`
271	#[cfg(not(target_arch = "wasm32"))]
272	pub fn handle(&self) -> tokio_runtime::Handle {
273		self.0.tokio.handle().clone()
274	}
275
276	/// Get a handle to the async runtime.
277	#[cfg(target_arch = "wasm32")]
278	pub fn handle(&self) -> WasmHandle {
279		WasmHandle
280	}
281
282	/// Spawn a future onto the runtime.
283	///
284	/// Returns a platform-specific join handle type:
285	/// - Native: `JoinHandle`
286	/// - WASM: `WasmJoinHandle`
287	#[cfg(not(target_arch = "wasm32"))]
288	pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
289	where
290		F: Future + Send + 'static,
291		F::Output: Send + 'static,
292	{
293		self.0.tokio.spawn(future)
294	}
295
296	/// Spawn a future onto the runtime.
297	#[cfg(target_arch = "wasm32")]
298	pub fn spawn<F>(&self, future: F) -> WasmJoinHandle<F::Output>
299	where
300		F: Future + 'static,
301		F::Output: 'static,
302	{
303		WasmJoinHandle {
304			future: Box::pin(future),
305		}
306	}
307
308	/// Block the current thread until the future completes.
309	///
310	/// **Note:** Not supported in WASM builds - will panic.
311	#[cfg(not(target_arch = "wasm32"))]
312	pub fn block_on<F>(&self, future: F) -> F::Output
313	where
314		F: Future,
315	{
316		self.0.tokio.block_on(future)
317	}
318
319	/// Block the current thread until the future completes.
320	///
321	/// **Note:** Not supported in WASM builds - will panic.
322	#[cfg(target_arch = "wasm32")]
323	pub fn block_on<F>(&self, _future: F) -> F::Output
324	where
325		F: Future,
326	{
327		unimplemented!("block_on not supported in WASM - use async execution instead")
328	}
329
330	/// Executes a closure on the actor system's pool directly.
331	///
332	/// This is a convenience method that delegates to the actor system's `install`.
333	/// Synchronous and bypasses admission control.
334	pub fn install<R, F>(&self, f: F) -> R
335	where
336		R: Send,
337		F: FnOnce() -> R + Send,
338	{
339		self.0.system.install(f)
340	}
341}
342
343impl fmt::Debug for SharedRuntime {
344	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
345		f.debug_struct("SharedRuntime").finish_non_exhaustive()
346	}
347}
348
349// Keep existing tests but gate them by target
350#[cfg(all(test, reifydb_target = "native"))]
351mod tests {
352	use super::*;
353
354	fn test_config() -> SharedRuntimeConfig {
355		SharedRuntimeConfig::default().async_threads(2).compute_threads(2).compute_max_in_flight(4)
356	}
357
358	#[test]
359	fn test_runtime_creation() {
360		let runtime = SharedRuntime::from_config(test_config());
361		let result = runtime.block_on(async { 42 });
362		assert_eq!(result, 42);
363	}
364
365	#[test]
366	fn test_runtime_clone_shares_same_runtime() {
367		let rt1 = SharedRuntime::from_config(test_config());
368		let rt2 = rt1.clone();
369		assert!(Arc::ptr_eq(&rt1.0, &rt2.0));
370	}
371
372	#[test]
373	fn test_spawn() {
374		let runtime = SharedRuntime::from_config(test_config());
375		let handle = runtime.spawn(async { 123 });
376		let result = runtime.block_on(handle).unwrap();
377		assert_eq!(result, 123);
378	}
379
380	#[test]
381	fn test_actor_system_accessible() {
382		let runtime = SharedRuntime::from_config(test_config());
383		let system = runtime.actor_system();
384		let result = system.install(|| 42);
385		assert_eq!(result, 42);
386	}
387
388	#[test]
389	fn test_install() {
390		let runtime = SharedRuntime::from_config(test_config());
391		let result = runtime.install(|| 42);
392		assert_eq!(result, 42);
393	}
394}
395
396#[cfg(all(test, reifydb_target = "wasm"))]
397mod wasm_tests {
398	use super::*;
399
400	#[test]
401	fn test_wasm_runtime_creation() {
402		let runtime = SharedRuntime::from_config(SharedRuntimeConfig::default());
403		let system = runtime.actor_system();
404		let result = system.install(|| 42);
405		assert_eq!(result, 42);
406	}
407}