Skip to main content

reifydb_runtime/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3#![cfg_attr(not(debug_assertions), deny(clippy::disallowed_methods))]
4#![cfg_attr(debug_assertions, warn(clippy::disallowed_methods))]
5#![allow(clippy::tabs_in_doc_comments)]
6#![allow(dead_code)]
7
8pub mod context;
9
10pub mod hash;
11
12pub mod pool;
13
14pub mod sync;
15
16pub mod actor;
17
18#[cfg(not(reifydb_target = "dst"))]
19use std::future::Future;
20use std::{sync::Arc, thread::available_parallelism};
21
22use crate::{
23	actor::system::ActorSystem,
24	context::clock::{Clock, MockClock},
25	pool::{PoolConfig, Pools},
26};
27
28/// Configuration for creating a [`SharedRuntime`].
29#[derive(Clone)]
30pub struct SharedRuntimeConfig {
31	/// Number of worker threads for async runtime (ignored in WASM)
32	pub async_threads: usize,
33	/// Number of worker threads for the system pool (lightweight actors).
34	pub system_threads: usize,
35	/// Number of worker threads for the query pool (execution-heavy actors).
36	pub query_threads: usize,
37	/// Clock for time operations (defaults to real system clock)
38	pub clock: Clock,
39	/// Random number generator (defaults to OS entropy)
40	pub rng: context::rng::Rng,
41}
42
43impl Default for SharedRuntimeConfig {
44	fn default() -> Self {
45		let cpus = available_parallelism().map_or(1, |n| n.get());
46		Self {
47			async_threads: 1,
48			system_threads: cpus.min(4),
49			query_threads: cpus,
50			clock: Clock::Real,
51			rng: context::rng::Rng::default(),
52		}
53	}
54}
55
56impl SharedRuntimeConfig {
57	/// Set the number of async worker threads.
58	pub fn async_threads(mut self, threads: usize) -> Self {
59		self.async_threads = threads;
60		self
61	}
62
63	/// Set the number of system pool threads (lightweight actors).
64	pub fn system_threads(mut self, threads: usize) -> Self {
65		self.system_threads = threads;
66		self
67	}
68
69	/// Set the number of query pool threads (execution-heavy actors).
70	pub fn query_threads(mut self, threads: usize) -> Self {
71		self.query_threads = threads;
72		self
73	}
74
75	/// Configure for deterministic testing with the given seed.
76	/// Sets a mock clock starting at `seed` milliseconds and a seeded RNG.
77	pub fn seeded(mut self, seed: u64) -> Self {
78		self.clock = Clock::Mock(MockClock::from_millis(seed));
79		self.rng = context::rng::Rng::seeded(seed);
80		self
81	}
82}
83
84// WASM runtime types - single-threaded execution support
85use std::fmt;
86#[cfg(target_arch = "wasm32")]
87use std::{
88	pin::Pin,
89	task::{Context, Poll},
90};
91
92#[cfg(target_arch = "wasm32")]
93use futures_util::future::LocalBoxFuture;
94#[cfg(all(not(target_arch = "wasm32"), not(reifydb_target = "dst")))]
95use tokio::runtime as tokio_runtime;
96#[cfg(all(not(target_arch = "wasm32"), not(reifydb_target = "dst")))]
97use tokio::task::JoinHandle;
98
99/// WASM-compatible handle (placeholder).
100#[cfg(target_arch = "wasm32")]
101#[derive(Clone, Copy, Debug)]
102pub struct WasmHandle;
103
104/// WASM-compatible join handle.
105///
106/// Implements Future to be compatible with async/await.
107#[cfg(target_arch = "wasm32")]
108pub struct WasmJoinHandle<T> {
109	future: LocalBoxFuture<'static, T>,
110}
111
112#[cfg(target_arch = "wasm32")]
113impl<T> Future for WasmJoinHandle<T> {
114	type Output = Result<T, WasmJoinError>;
115
116	fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
117		match self.future.as_mut().poll(cx) {
118			Poll::Ready(v) => Poll::Ready(Ok(v)),
119			Poll::Pending => Poll::Pending,
120		}
121	}
122}
123
124/// WASM join error (compatible with tokio::task::JoinError API).
125#[cfg(target_arch = "wasm32")]
126#[derive(Debug)]
127pub struct WasmJoinError;
128
129#[cfg(target_arch = "wasm32")]
130impl fmt::Display for WasmJoinError {
131	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132		write!(f, "WASM task failed")
133	}
134}
135
136#[cfg(target_arch = "wasm32")]
137use std::error::Error;
138
139#[cfg(target_arch = "wasm32")]
140impl Error for WasmJoinError {}
141
142/// Inner shared state for the runtime (native).
143#[cfg(all(not(target_arch = "wasm32"), not(reifydb_target = "dst")))]
144struct SharedRuntimeInner {
145	system: ActorSystem,
146	pools: Pools,
147	clock: Clock,
148	rng: context::rng::Rng,
149}
150
151#[cfg(all(not(target_arch = "wasm32"), not(reifydb_target = "dst")))]
152impl Drop for SharedRuntimeInner {
153	fn drop(&mut self) {
154		self.system.shutdown();
155		let _ = self.system.join();
156	}
157}
158
159/// Inner shared state for the runtime (WASM).
160#[cfg(target_arch = "wasm32")]
161struct SharedRuntimeInner {
162	system: ActorSystem,
163	pools: Pools,
164	clock: Clock,
165	rng: context::rng::Rng,
166}
167
168/// Inner shared state for the runtime (DST).
169#[cfg(reifydb_target = "dst")]
170struct SharedRuntimeInner {
171	system: ActorSystem,
172	pools: Pools,
173	clock: Clock,
174	rng: context::rng::Rng,
175}
176
177/// Shared runtime that can be cloned and passed across subsystems.
178///
179/// Platform-agnostic facade over:
180/// - Native: tokio multi-threaded runtime + unified actor system
181/// - WASM: Single-threaded execution
182///
183/// Uses Arc internally, so cloning is cheap and all clones share the same
184/// underlying runtime and actor system.
185#[derive(Clone)]
186pub struct SharedRuntime(Arc<SharedRuntimeInner>);
187
188impl SharedRuntime {
189	/// Create a new shared runtime from configuration.
190	pub fn from_config(config: SharedRuntimeConfig) -> Self {
191		let pools = Pools::new(PoolConfig {
192			system_threads: config.system_threads,
193			query_threads: config.query_threads,
194			async_threads: config.async_threads,
195		});
196		let system = ActorSystem::new(pools.clone(), config.clock.clone());
197
198		Self(Arc::new(SharedRuntimeInner {
199			system,
200			pools,
201			clock: config.clock,
202			rng: config.rng,
203		}))
204	}
205
206	/// Get the unified actor system for spawning actors and compute.
207	pub fn actor_system(&self) -> ActorSystem {
208		self.0.system.clone()
209	}
210
211	/// Get the clock for this runtime (shared across all threads).
212	pub fn clock(&self) -> &Clock {
213		&self.0.clock
214	}
215
216	/// Get the RNG for this runtime (shared across all threads).
217	pub fn rng(&self) -> &context::rng::Rng {
218		&self.0.rng
219	}
220
221	/// Get the pools.
222	pub fn pools(&self) -> Pools {
223		self.0.pools.clone()
224	}
225
226	/// Get a handle to the async runtime.
227	#[cfg(all(not(target_arch = "wasm32"), not(reifydb_target = "dst")))]
228	pub fn handle(&self) -> tokio_runtime::Handle {
229		self.0.pools.handle()
230	}
231
232	/// Get a handle to the async runtime.
233	#[cfg(target_arch = "wasm32")]
234	pub fn handle(&self) -> WasmHandle {
235		WasmHandle
236	}
237
238	/// Spawn a future onto the runtime.
239	#[cfg(all(not(target_arch = "wasm32"), not(reifydb_target = "dst")))]
240	pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
241	where
242		F: Future + Send + 'static,
243		F::Output: Send + 'static,
244	{
245		self.0.pools.spawn(future)
246	}
247
248	/// Spawn a future onto the runtime.
249	#[cfg(target_arch = "wasm32")]
250	pub fn spawn<F>(&self, future: F) -> WasmJoinHandle<F::Output>
251	where
252		F: Future + 'static,
253		F::Output: 'static,
254	{
255		WasmJoinHandle {
256			future: Box::pin(future),
257		}
258	}
259
260	/// Block the current thread until the future completes.
261	#[cfg(all(not(target_arch = "wasm32"), not(reifydb_target = "dst")))]
262	pub fn block_on<F>(&self, future: F) -> F::Output
263	where
264		F: Future,
265	{
266		self.0.pools.block_on(future)
267	}
268
269	/// Block the current thread until the future completes.
270	#[cfg(target_arch = "wasm32")]
271	pub fn block_on<F>(&self, _future: F) -> F::Output
272	where
273		F: Future,
274	{
275		unimplemented!("block_on not supported in WASM - use async execution instead")
276	}
277}
278
279impl fmt::Debug for SharedRuntime {
280	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
281		f.debug_struct("SharedRuntime").finish_non_exhaustive()
282	}
283}
284
285// Keep existing tests but gate them by target
286#[cfg(all(test, not(reifydb_single_threaded)))]
287mod tests {
288	use super::*;
289
290	fn test_config() -> SharedRuntimeConfig {
291		SharedRuntimeConfig::default().async_threads(2).system_threads(2).query_threads(2)
292	}
293
294	#[test]
295	fn test_runtime_creation() {
296		let runtime = SharedRuntime::from_config(test_config());
297		let result = runtime.block_on(async { 42 });
298		assert_eq!(result, 42);
299	}
300
301	#[test]
302	fn test_runtime_clone_shares_same_runtime() {
303		let rt1 = SharedRuntime::from_config(test_config());
304		let rt2 = rt1.clone();
305		assert!(Arc::ptr_eq(&rt1.0, &rt2.0));
306	}
307
308	#[test]
309	fn test_spawn() {
310		let runtime = SharedRuntime::from_config(test_config());
311		let handle = runtime.spawn(async { 123 });
312		let result = runtime.block_on(handle).unwrap();
313		assert_eq!(result, 123);
314	}
315
316	#[test]
317	fn test_actor_system_accessible() {
318		let runtime = SharedRuntime::from_config(test_config());
319		let _system = runtime.actor_system();
320	}
321}