1#![cfg_attr(not(debug_assertions), deny(clippy::disallowed_methods))]
4#![cfg_attr(debug_assertions, warn(clippy::disallowed_methods))]
5#![allow(clippy::tabs_in_doc_comments)]
6
7#![allow(dead_code)]
42
43pub mod context;
44
45pub mod hash;
46
47pub mod pool;
48
49pub mod sync;
50
51pub mod actor;
52
53#[cfg(not(reifydb_target = "dst"))]
54use std::future::Future;
55use std::{sync::Arc, thread::available_parallelism};
56
57use crate::{
58 actor::system::ActorSystem,
59 context::clock::{Clock, MockClock},
60 pool::{PoolConfig, Pools},
61};
62
63#[derive(Clone)]
65pub struct SharedRuntimeConfig {
66 pub async_threads: usize,
68 pub system_threads: usize,
70 pub query_threads: usize,
72 pub clock: Clock,
74 pub rng: context::rng::Rng,
76}
77
78impl Default for SharedRuntimeConfig {
79 fn default() -> Self {
80 let cpus = available_parallelism().map_or(1, |n| n.get());
81 Self {
82 async_threads: 1,
83 system_threads: cpus.min(4),
84 query_threads: cpus,
85 clock: Clock::Real,
86 rng: context::rng::Rng::default(),
87 }
88 }
89}
90
91impl SharedRuntimeConfig {
92 pub fn async_threads(mut self, threads: usize) -> Self {
94 self.async_threads = threads;
95 self
96 }
97
98 pub fn system_threads(mut self, threads: usize) -> Self {
100 self.system_threads = threads;
101 self
102 }
103
104 pub fn query_threads(mut self, threads: usize) -> Self {
106 self.query_threads = threads;
107 self
108 }
109
110 pub fn deterministic_testing(mut self, seed: u64) -> Self {
113 self.clock = Clock::Mock(MockClock::from_millis(seed));
114 self.rng = context::rng::Rng::seeded(seed);
115 self
116 }
117}
118
119use std::fmt;
121#[cfg(target_arch = "wasm32")]
122use std::{
123 pin::Pin,
124 task::{Context, Poll},
125};
126
127#[cfg(target_arch = "wasm32")]
128use futures_util::future::LocalBoxFuture;
129#[cfg(all(not(target_arch = "wasm32"), not(reifydb_target = "dst")))]
130use tokio::runtime as tokio_runtime;
131#[cfg(all(not(target_arch = "wasm32"), not(reifydb_target = "dst")))]
132use tokio::task::JoinHandle;
133
134#[cfg(target_arch = "wasm32")]
136#[derive(Clone, Copy, Debug)]
137pub struct WasmHandle;
138
139#[cfg(target_arch = "wasm32")]
143pub struct WasmJoinHandle<T> {
144 future: LocalBoxFuture<'static, T>,
145}
146
147#[cfg(target_arch = "wasm32")]
148impl<T> Future for WasmJoinHandle<T> {
149 type Output = Result<T, WasmJoinError>;
150
151 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
152 match self.future.as_mut().poll(cx) {
153 Poll::Ready(v) => Poll::Ready(Ok(v)),
154 Poll::Pending => Poll::Pending,
155 }
156 }
157}
158
159#[cfg(target_arch = "wasm32")]
161#[derive(Debug)]
162pub struct WasmJoinError;
163
164#[cfg(target_arch = "wasm32")]
165impl fmt::Display for WasmJoinError {
166 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
167 write!(f, "WASM task failed")
168 }
169}
170
171#[cfg(target_arch = "wasm32")]
172use std::error::Error;
173
174#[cfg(target_arch = "wasm32")]
175impl Error for WasmJoinError {}
176
177#[cfg(all(not(target_arch = "wasm32"), not(reifydb_target = "dst")))]
179struct SharedRuntimeInner {
180 system: ActorSystem,
181 pools: Pools,
182 clock: Clock,
183 rng: context::rng::Rng,
184}
185
186#[cfg(all(not(target_arch = "wasm32"), not(reifydb_target = "dst")))]
187impl Drop for SharedRuntimeInner {
188 fn drop(&mut self) {
189 self.system.shutdown();
190 let _ = self.system.join();
191 }
192}
193
194#[cfg(target_arch = "wasm32")]
196struct SharedRuntimeInner {
197 system: ActorSystem,
198 pools: Pools,
199 clock: Clock,
200 rng: context::rng::Rng,
201}
202
203#[cfg(reifydb_target = "dst")]
205struct SharedRuntimeInner {
206 system: ActorSystem,
207 pools: Pools,
208 clock: Clock,
209 rng: context::rng::Rng,
210}
211
212#[derive(Clone)]
221pub struct SharedRuntime(Arc<SharedRuntimeInner>);
222
223impl SharedRuntime {
224 pub fn from_config(config: SharedRuntimeConfig) -> Self {
226 let pools = Pools::new(PoolConfig {
227 system_threads: config.system_threads,
228 query_threads: config.query_threads,
229 async_threads: config.async_threads,
230 });
231 let system = ActorSystem::new(pools.clone(), config.clock.clone());
232
233 Self(Arc::new(SharedRuntimeInner {
234 system,
235 pools,
236 clock: config.clock,
237 rng: config.rng,
238 }))
239 }
240
241 pub fn actor_system(&self) -> ActorSystem {
243 self.0.system.clone()
244 }
245
246 pub fn clock(&self) -> &Clock {
248 &self.0.clock
249 }
250
251 pub fn rng(&self) -> &context::rng::Rng {
253 &self.0.rng
254 }
255
256 pub fn pools(&self) -> Pools {
258 self.0.pools.clone()
259 }
260
261 #[cfg(all(not(target_arch = "wasm32"), not(reifydb_target = "dst")))]
263 pub fn handle(&self) -> tokio_runtime::Handle {
264 self.0.pools.handle()
265 }
266
267 #[cfg(target_arch = "wasm32")]
269 pub fn handle(&self) -> WasmHandle {
270 WasmHandle
271 }
272
273 #[cfg(all(not(target_arch = "wasm32"), not(reifydb_target = "dst")))]
275 pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
276 where
277 F: Future + Send + 'static,
278 F::Output: Send + 'static,
279 {
280 self.0.pools.spawn(future)
281 }
282
283 #[cfg(target_arch = "wasm32")]
285 pub fn spawn<F>(&self, future: F) -> WasmJoinHandle<F::Output>
286 where
287 F: Future + 'static,
288 F::Output: 'static,
289 {
290 WasmJoinHandle {
291 future: Box::pin(future),
292 }
293 }
294
295 #[cfg(all(not(target_arch = "wasm32"), not(reifydb_target = "dst")))]
297 pub fn block_on<F>(&self, future: F) -> F::Output
298 where
299 F: Future,
300 {
301 self.0.pools.block_on(future)
302 }
303
304 #[cfg(target_arch = "wasm32")]
306 pub fn block_on<F>(&self, _future: F) -> F::Output
307 where
308 F: Future,
309 {
310 unimplemented!("block_on not supported in WASM - use async execution instead")
311 }
312}
313
314impl fmt::Debug for SharedRuntime {
315 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
316 f.debug_struct("SharedRuntime").finish_non_exhaustive()
317 }
318}
319
320#[cfg(all(test, not(reifydb_single_threaded)))]
322mod tests {
323 use super::*;
324
325 fn test_config() -> SharedRuntimeConfig {
326 SharedRuntimeConfig::default().async_threads(2).system_threads(2).query_threads(2)
327 }
328
329 #[test]
330 fn test_runtime_creation() {
331 let runtime = SharedRuntime::from_config(test_config());
332 let result = runtime.block_on(async { 42 });
333 assert_eq!(result, 42);
334 }
335
336 #[test]
337 fn test_runtime_clone_shares_same_runtime() {
338 let rt1 = SharedRuntime::from_config(test_config());
339 let rt2 = rt1.clone();
340 assert!(Arc::ptr_eq(&rt1.0, &rt2.0));
341 }
342
343 #[test]
344 fn test_spawn() {
345 let runtime = SharedRuntime::from_config(test_config());
346 let handle = runtime.spawn(async { 123 });
347 let result = runtime.block_on(handle).unwrap();
348 assert_eq!(result, 123);
349 }
350
351 #[test]
352 fn test_actor_system_accessible() {
353 let runtime = SharedRuntime::from_config(test_config());
354 let _system = runtime.actor_system();
355 }
356}