Skip to main content

reifydb_runtime/
lib.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4//! Runtime management for ReifyDB.
5//!
6//! This crate provides a facade over platform-specific runtime implementations:
7//! - **Native**: tokio multi-threaded runtime + rayon-based actor system
8//! - **WASM**: Single-threaded execution with sequential processing
9//!
10//! The API is identical across platforms, with compile-time dispatch ensuring
11//! zero runtime overhead.
12//!
13//! # Example
14//!
15//! ```ignore
16//! use reifydb_runtime::{SharedRuntime, SharedRuntimeConfig};
17//!
18//! // Create a runtime with default configuration
19//! let runtime = SharedRuntime::from_config(SharedRuntimeConfig::default());
20//!
21//! // Or with custom configuration
22//! let config = SharedRuntimeConfig::default()
23//!     .async_threads(4)
24//!     .compute_threads(4)
25//!     .compute_max_in_flight(16);
26//! let runtime = SharedRuntime::from_config(config);
27//!
28//! // Spawn async work
29//! runtime.spawn(async {
30//!     // async work here
31//! });
32//!
33//! // Use the actor system for spawning actors and compute
34//! let system = runtime.actor_system();
35//! let handle = system.spawn("my-actor", MyActor::new());
36//!
37//! // Run CPU-bound work
38//! let result = system.install(|| expensive_calculation());
39//! ```
40
41#![allow(dead_code)]
42
43pub mod clock;
44
45pub mod hash;
46
47pub mod sync;
48
49pub mod actor;
50
51use std::{future::Future, mem::ManuallyDrop, sync::Arc, time::Duration};
52
53use crate::{
54	actor::system::{ActorSystem, ActorSystemConfig},
55	clock::{Clock, MockClock},
56};
57
58/// Configuration for creating a [`SharedRuntime`].
59#[derive(Clone)]
60pub struct SharedRuntimeConfig {
61	/// Number of worker threads for async runtime (ignored in WASM)
62	pub async_threads: usize,
63	/// Number of worker threads for compute/actor pool (ignored in WASM)
64	pub compute_threads: usize,
65	/// Maximum concurrent compute tasks (ignored in WASM)
66	pub compute_max_in_flight: usize,
67	/// Clock for time operations (defaults to real system clock)
68	pub clock: Clock,
69}
70
71impl Default for SharedRuntimeConfig {
72	fn default() -> Self {
73		Self {
74			async_threads: 1,
75			compute_threads: 1,
76			compute_max_in_flight: 32,
77			clock: Clock::Real,
78		}
79	}
80}
81
82impl SharedRuntimeConfig {
83	/// Set the number of async worker threads.
84	pub fn async_threads(mut self, threads: usize) -> Self {
85		self.async_threads = threads;
86		self
87	}
88
89	/// Set the number of compute worker threads.
90	pub fn compute_threads(mut self, threads: usize) -> Self {
91		self.compute_threads = threads;
92		self
93	}
94
95	/// Set the maximum number of in-flight compute tasks.
96	pub fn compute_max_in_flight(mut self, max: usize) -> Self {
97		self.compute_max_in_flight = max;
98		self
99	}
100
101	/// Use a mock clock starting at the given milliseconds.
102	pub fn mock_clock(mut self, initial_millis: u64) -> Self {
103		self.clock = Clock::Mock(MockClock::from_millis(initial_millis));
104		self
105	}
106
107	/// Use a custom clock.
108	pub fn clock(mut self, clock: Clock) -> Self {
109		self.clock = clock;
110		self
111	}
112
113	/// Derive an [`ActorSystemConfig`] from this runtime config.
114	pub fn actor_system_config(&self) -> ActorSystemConfig {
115		ActorSystemConfig {
116			pool_threads: self.compute_threads,
117			max_in_flight: self.compute_max_in_flight,
118		}
119	}
120}
121
122// WASM runtime types - single-threaded execution support
123#[cfg(target_arch = "wasm32")]
124use std::{pin::Pin, task::Poll};
125
126#[cfg(target_arch = "wasm32")]
127use futures_util::future::LocalBoxFuture;
128#[cfg(not(target_arch = "wasm32"))]
129use tokio::runtime::Runtime;
130
131/// WASM-compatible handle (placeholder).
132#[cfg(target_arch = "wasm32")]
133#[derive(Clone, Copy, Debug)]
134pub struct WasmHandle;
135
136/// WASM-compatible join handle.
137///
138/// Implements Future to be compatible with async/await.
139#[cfg(target_arch = "wasm32")]
140pub struct WasmJoinHandle<T> {
141	future: LocalBoxFuture<'static, T>,
142}
143
144#[cfg(target_arch = "wasm32")]
145impl<T> Future for WasmJoinHandle<T> {
146	type Output = Result<T, WasmJoinError>;
147
148	fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
149		match self.future.as_mut().poll(cx) {
150			Poll::Ready(v) => Poll::Ready(Ok(v)),
151			Poll::Pending => Poll::Pending,
152		}
153	}
154}
155
156/// WASM join error (compatible with tokio::task::JoinError API).
157#[cfg(target_arch = "wasm32")]
158#[derive(Debug)]
159pub struct WasmJoinError;
160
161#[cfg(target_arch = "wasm32")]
162impl std::fmt::Display for WasmJoinError {
163	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
164		write!(f, "WASM task failed")
165	}
166}
167
168#[cfg(target_arch = "wasm32")]
169impl std::error::Error for WasmJoinError {}
170
171/// Inner shared state for the runtime (native).
172#[cfg(not(target_arch = "wasm32"))]
173struct SharedRuntimeInner {
174	tokio: ManuallyDrop<Runtime>,
175	system: ActorSystem,
176	clock: Clock,
177}
178
179#[cfg(not(target_arch = "wasm32"))]
180impl Drop for SharedRuntimeInner {
181	fn drop(&mut self) {
182		// SAFETY: drop is called exactly once; taking the Runtime here
183		// prevents its default Drop (which calls shutdown_background and
184		// does NOT wait). We call shutdown_timeout instead so that worker
185		// threads and I/O resources (epoll fd, timer fd, etc.) are fully
186		// reclaimed before this function returns.
187		let rt = unsafe { ManuallyDrop::take(&mut self.tokio) };
188		rt.shutdown_timeout(Duration::from_secs(5));
189	}
190}
191
192/// Inner shared state for the runtime (WASM).
193#[cfg(target_arch = "wasm32")]
194struct SharedRuntimeInner {
195	system: ActorSystem,
196	clock: Clock,
197}
198
199/// Shared runtime that can be cloned and passed across subsystems.
200///
201/// Platform-agnostic facade over:
202/// - Native: tokio multi-threaded runtime + unified actor system
203/// - WASM: Single-threaded execution
204///
205/// Uses Arc internally, so cloning is cheap and all clones share the same
206/// underlying runtime and actor system.
207#[derive(Clone)]
208pub struct SharedRuntime(Arc<SharedRuntimeInner>);
209
210impl SharedRuntime {
211	/// Create a new shared runtime from configuration.
212	///
213	/// # Panics
214	///
215	/// Panics if the runtime cannot be created (native only).
216	#[cfg(not(target_arch = "wasm32"))]
217	pub fn from_config(config: SharedRuntimeConfig) -> Self {
218		let tokio = tokio::runtime::Builder::new_multi_thread()
219			.worker_threads(config.async_threads)
220			.thread_name("async")
221			.enable_all()
222			.build()
223			.expect("Failed to create tokio runtime");
224
225		let system = ActorSystem::new(config.actor_system_config());
226
227		Self(Arc::new(SharedRuntimeInner {
228			tokio: ManuallyDrop::new(tokio),
229			system,
230			clock: config.clock,
231		}))
232	}
233
234	/// Create a new shared runtime from configuration.
235	#[cfg(target_arch = "wasm32")]
236	pub fn from_config(config: SharedRuntimeConfig) -> Self {
237		let system = ActorSystem::new(config.actor_system_config());
238
239		Self(Arc::new(SharedRuntimeInner {
240			system,
241			clock: config.clock,
242		}))
243	}
244
245	/// Get the unified actor system for spawning actors and compute.
246	pub fn actor_system(&self) -> ActorSystem {
247		self.0.system.clone()
248	}
249
250	/// Get the clock for this runtime (shared across all threads).
251	pub fn clock(&self) -> &Clock {
252		&self.0.clock
253	}
254
255	/// Get a handle to the async runtime.
256	///
257	/// Returns a platform-specific handle type:
258	/// - Native: `tokio::runtime::Handle`
259	/// - WASM: `WasmHandle`
260	#[cfg(not(target_arch = "wasm32"))]
261	pub fn handle(&self) -> tokio::runtime::Handle {
262		self.0.tokio.handle().clone()
263	}
264
265	/// Get a handle to the async runtime.
266	#[cfg(target_arch = "wasm32")]
267	pub fn handle(&self) -> WasmHandle {
268		WasmHandle
269	}
270
271	/// Spawn a future onto the runtime.
272	///
273	/// Returns a platform-specific join handle type:
274	/// - Native: `tokio::task::JoinHandle`
275	/// - WASM: `WasmJoinHandle`
276	#[cfg(not(target_arch = "wasm32"))]
277	pub fn spawn<F>(&self, future: F) -> tokio::task::JoinHandle<F::Output>
278	where
279		F: Future + Send + 'static,
280		F::Output: Send + 'static,
281	{
282		self.0.tokio.spawn(future)
283	}
284
285	/// Spawn a future onto the runtime.
286	#[cfg(target_arch = "wasm32")]
287	pub fn spawn<F>(&self, future: F) -> WasmJoinHandle<F::Output>
288	where
289		F: Future + 'static,
290		F::Output: 'static,
291	{
292		WasmJoinHandle {
293			future: Box::pin(future),
294		}
295	}
296
297	/// Block the current thread until the future completes.
298	///
299	/// **Note:** Not supported in WASM builds - will panic.
300	#[cfg(not(target_arch = "wasm32"))]
301	pub fn block_on<F>(&self, future: F) -> F::Output
302	where
303		F: Future,
304	{
305		self.0.tokio.block_on(future)
306	}
307
308	/// Block the current thread until the future completes.
309	///
310	/// **Note:** Not supported in WASM builds - will panic.
311	#[cfg(target_arch = "wasm32")]
312	pub fn block_on<F>(&self, _future: F) -> F::Output
313	where
314		F: Future,
315	{
316		unimplemented!("block_on not supported in WASM - use async execution instead")
317	}
318
319	/// Executes a closure on the actor system's pool directly.
320	///
321	/// This is a convenience method that delegates to the actor system's `install`.
322	/// Synchronous and bypasses admission control.
323	pub fn install<R, F>(&self, f: F) -> R
324	where
325		R: Send,
326		F: FnOnce() -> R + Send,
327	{
328		self.0.system.install(f)
329	}
330}
331
332impl std::fmt::Debug for SharedRuntime {
333	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
334		f.debug_struct("SharedRuntime").finish_non_exhaustive()
335	}
336}
337
338// Keep existing tests but gate them by target
339#[cfg(all(test, reifydb_target = "native"))]
340mod tests {
341	use super::*;
342
343	fn test_config() -> SharedRuntimeConfig {
344		SharedRuntimeConfig::default().async_threads(2).compute_threads(2).compute_max_in_flight(4)
345	}
346
347	#[test]
348	fn test_runtime_creation() {
349		let runtime = SharedRuntime::from_config(test_config());
350		let result = runtime.block_on(async { 42 });
351		assert_eq!(result, 42);
352	}
353
354	#[test]
355	fn test_runtime_clone_shares_same_runtime() {
356		let rt1 = SharedRuntime::from_config(test_config());
357		let rt2 = rt1.clone();
358		assert!(Arc::ptr_eq(&rt1.0, &rt2.0));
359	}
360
361	#[test]
362	fn test_spawn() {
363		let runtime = SharedRuntime::from_config(test_config());
364		let handle = runtime.spawn(async { 123 });
365		let result = runtime.block_on(handle).unwrap();
366		assert_eq!(result, 123);
367	}
368
369	#[test]
370	fn test_actor_system_accessible() {
371		let runtime = SharedRuntime::from_config(test_config());
372		let system = runtime.actor_system();
373		let result = system.install(|| 42);
374		assert_eq!(result, 42);
375	}
376
377	#[test]
378	fn test_install() {
379		let runtime = SharedRuntime::from_config(test_config());
380		let result = runtime.install(|| 42);
381		assert_eq!(result, 42);
382	}
383}
384
385#[cfg(all(test, reifydb_target = "wasm"))]
386mod wasm_tests {
387	use super::*;
388
389	#[test]
390	fn test_wasm_runtime_creation() {
391		let runtime = SharedRuntime::from_config(SharedRuntimeConfig::default());
392		let system = runtime.actor_system();
393		let result = system.install(|| 42);
394		assert_eq!(result, 42);
395	}
396}