Skip to main content

reifydb_runtime/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Process-level runtime: actor system, thread pools, async executors, time and randomness, and the synchronisation
5//! primitives the rest of the workspace builds on. The `SharedRuntime` handle carries the actor system, the pool set,
6//! the clock, and the seeded RNG together so any subsystem that needs to spawn work, sleep, or generate ids gets a
7//! consistent view of the world.
8//!
9//! The crate abstracts platform differences: native targets get a tokio-backed pool, WebAssembly gets a single-task
10//! executor, the deterministic-simulation target (`reifydb_target = "dst"`) gets a virtual scheduler. All three sit
11//! behind the same `SharedRuntime` API so callers do not branch on platform.
12//!
13//! Invariant: `SharedRuntime::seeded(...)` is what produces a deterministic ReifyDB - same seed, same trace. Any
14//! source of non-determinism inside the runtime (an unmocked clock, an unseeded RNG, a pool that schedules outside
15//! the seeded executor) defeats DST replays and breaks the simulation harness.
16
17#![cfg_attr(not(debug_assertions), deny(clippy::disallowed_methods))]
18#![cfg_attr(debug_assertions, warn(clippy::disallowed_methods))]
19#![allow(clippy::tabs_in_doc_comments)]
20#![allow(dead_code)]
21
22pub mod context;
23
24pub mod hash;
25
26pub mod pool;
27
28pub mod sync;
29
30pub mod actor;
31
32#[cfg(not(reifydb_target = "dst"))]
33use std::future::Future;
34use std::sync::Arc;
35
36use crate::{
37	actor::system::ActorSystem,
38	context::clock::{Clock, MockClock},
39	pool::{PoolConfig, Pools},
40};
41
42#[derive(Clone)]
43pub struct SharedRuntimeConfig {
44	pub clock: Clock,
45	pub rng: context::rng::Rng,
46}
47
48impl Default for SharedRuntimeConfig {
49	fn default() -> Self {
50		Self {
51			clock: Clock::Real,
52			rng: context::rng::Rng::default(),
53		}
54	}
55}
56
57impl SharedRuntimeConfig {
58	pub fn seeded(mut self, seed: u64) -> Self {
59		self.clock = Clock::Mock(MockClock::from_millis(seed));
60		self.rng = context::rng::Rng::seeded(seed);
61		self
62	}
63}
64
65use std::fmt;
66#[cfg(target_arch = "wasm32")]
67use std::{
68	pin::Pin,
69	task::{Context, Poll},
70};
71
72#[cfg(target_arch = "wasm32")]
73use futures_util::future::LocalBoxFuture;
74#[cfg(all(not(target_arch = "wasm32"), not(reifydb_target = "dst")))]
75use tokio::runtime as tokio_runtime;
76#[cfg(all(not(target_arch = "wasm32"), not(reifydb_target = "dst")))]
77use tokio::task::JoinHandle;
78
79#[cfg(target_arch = "wasm32")]
80#[derive(Clone, Copy, Debug)]
81pub struct WasmHandle;
82
83#[cfg(target_arch = "wasm32")]
84pub struct WasmJoinHandle<T> {
85	future: LocalBoxFuture<'static, T>,
86}
87
88#[cfg(target_arch = "wasm32")]
89impl<T> Future for WasmJoinHandle<T> {
90	type Output = Result<T, WasmJoinError>;
91
92	fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
93		match self.future.as_mut().poll(cx) {
94			Poll::Ready(v) => Poll::Ready(Ok(v)),
95			Poll::Pending => Poll::Pending,
96		}
97	}
98}
99
100#[cfg(target_arch = "wasm32")]
101#[derive(Debug)]
102pub struct WasmJoinError;
103
104#[cfg(target_arch = "wasm32")]
105impl fmt::Display for WasmJoinError {
106	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
107		write!(f, "WASM task failed")
108	}
109}
110
111#[cfg(target_arch = "wasm32")]
112use std::error::Error;
113
114#[cfg(target_arch = "wasm32")]
115impl Error for WasmJoinError {}
116
117#[cfg(all(not(target_arch = "wasm32"), not(reifydb_target = "dst")))]
118struct SharedRuntimeInner {
119	system: ActorSystem,
120	pools: Pools,
121	clock: Clock,
122	rng: context::rng::Rng,
123}
124
125#[cfg(all(not(target_arch = "wasm32"), not(reifydb_target = "dst")))]
126impl Drop for SharedRuntimeInner {
127	fn drop(&mut self) {
128		self.system.shutdown();
129		let _ = self.system.join();
130	}
131}
132
133#[cfg(target_arch = "wasm32")]
134struct SharedRuntimeInner {
135	system: ActorSystem,
136	pools: Pools,
137	clock: Clock,
138	rng: context::rng::Rng,
139}
140
141#[cfg(reifydb_target = "dst")]
142struct SharedRuntimeInner {
143	system: ActorSystem,
144	pools: Pools,
145	clock: Clock,
146	rng: context::rng::Rng,
147}
148
149#[derive(Clone)]
150pub struct SharedRuntime(Arc<SharedRuntimeInner>);
151
152impl SharedRuntime {
153	pub fn from_config(config: SharedRuntimeConfig, pools: PoolConfig) -> Self {
154		let pools = Pools::new(pools);
155		let system = ActorSystem::new(pools.clone(), config.clock.clone());
156
157		Self(Arc::new(SharedRuntimeInner {
158			system,
159			pools,
160			clock: config.clock,
161			rng: config.rng,
162		}))
163	}
164
165	pub fn actor_system(&self) -> ActorSystem {
166		self.0.system.clone()
167	}
168
169	pub fn clock(&self) -> &Clock {
170		&self.0.clock
171	}
172
173	pub fn rng(&self) -> &context::rng::Rng {
174		&self.0.rng
175	}
176
177	pub fn pools(&self) -> Pools {
178		self.0.pools.clone()
179	}
180
181	#[cfg(all(not(target_arch = "wasm32"), not(reifydb_target = "dst")))]
182	pub fn handle(&self) -> tokio_runtime::Handle {
183		self.0.pools.handle()
184	}
185
186	#[cfg(target_arch = "wasm32")]
187	pub fn handle(&self) -> WasmHandle {
188		WasmHandle
189	}
190
191	#[cfg(all(not(target_arch = "wasm32"), not(reifydb_target = "dst")))]
192	pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
193	where
194		F: Future + Send + 'static,
195		F::Output: Send + 'static,
196	{
197		self.0.pools.spawn(future)
198	}
199
200	#[cfg(target_arch = "wasm32")]
201	pub fn spawn<F>(&self, future: F) -> WasmJoinHandle<F::Output>
202	where
203		F: Future + 'static,
204		F::Output: 'static,
205	{
206		WasmJoinHandle {
207			future: Box::pin(future),
208		}
209	}
210
211	#[cfg(all(not(target_arch = "wasm32"), not(reifydb_target = "dst")))]
212	pub fn block_on<F>(&self, future: F) -> F::Output
213	where
214		F: Future,
215	{
216		self.0.pools.block_on(future)
217	}
218
219	#[cfg(target_arch = "wasm32")]
220	pub fn block_on<F>(&self, _future: F) -> F::Output
221	where
222		F: Future,
223	{
224		unimplemented!("block_on not supported in WASM - use async execution instead")
225	}
226}
227
228impl fmt::Debug for SharedRuntime {
229	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
230		f.debug_struct("SharedRuntime").finish_non_exhaustive()
231	}
232}
233
234#[cfg(all(test, not(reifydb_single_threaded)))]
235mod tests {
236	use super::*;
237
238	fn test_config() -> SharedRuntimeConfig {
239		SharedRuntimeConfig::default()
240	}
241
242	fn test_pools() -> PoolConfig {
243		PoolConfig {
244			async_threads: 2,
245			system_threads: 2,
246			query_threads: 2,
247		}
248	}
249
250	#[test]
251	fn test_runtime_creation() {
252		let runtime = SharedRuntime::from_config(test_config(), test_pools());
253		let result = runtime.block_on(async { 42 });
254		assert_eq!(result, 42);
255	}
256
257	#[test]
258	fn test_runtime_clone_shares_same_runtime() {
259		let rt1 = SharedRuntime::from_config(test_config(), test_pools());
260		let rt2 = rt1.clone();
261		assert!(Arc::ptr_eq(&rt1.0, &rt2.0));
262	}
263
264	#[test]
265	fn test_spawn() {
266		let runtime = SharedRuntime::from_config(test_config(), test_pools());
267		let handle = runtime.spawn(async { 123 });
268		let result = runtime.block_on(handle).unwrap();
269		assert_eq!(result, 123);
270	}
271
272	#[test]
273	fn test_actor_system_accessible() {
274		let runtime = SharedRuntime::from_config(test_config(), test_pools());
275		let _system = runtime.actor_system();
276	}
277}