1#![cfg_attr(not(debug_assertions), deny(clippy::disallowed_methods))]
4#![cfg_attr(debug_assertions, warn(clippy::disallowed_methods))]
5#![allow(clippy::tabs_in_doc_comments)]
6
7#![allow(dead_code)]
41
42pub mod context;
43
44pub mod hash;
45
46pub mod sync;
47
48pub mod actor;
49
50use std::{future::Future, sync::Arc, thread::available_parallelism};
51#[cfg(not(target_arch = "wasm32"))]
52use std::{mem::ManuallyDrop, time::Duration};
53
54use crate::{
55 actor::system::ActorSystem,
56 context::clock::{Clock, MockClock},
57};
58
59#[derive(Clone)]
61pub struct SharedRuntimeConfig {
62 pub async_threads: usize,
64 pub compute_threads: usize,
66 pub clock: Clock,
68 pub rng: context::rng::Rng,
70}
71
72impl Default for SharedRuntimeConfig {
73 fn default() -> Self {
74 Self {
75 async_threads: 1,
76 compute_threads: available_parallelism().map_or(1, |n| n.get()),
77 clock: Clock::Real,
78 rng: context::rng::Rng::default(),
79 }
80 }
81}
82
83impl SharedRuntimeConfig {
84 pub fn async_threads(mut self, threads: usize) -> Self {
86 self.async_threads = threads;
87 self
88 }
89
90 pub fn compute_threads(mut self, threads: usize) -> Self {
92 self.compute_threads = threads;
93 self
94 }
95
96 pub fn deterministic_testing(mut self, seed: u64) -> Self {
99 self.clock = Clock::Mock(MockClock::from_millis(seed));
100 self.rng = context::rng::Rng::seeded(seed);
101 self
102 }
103}
104
105use std::fmt;
107#[cfg(target_arch = "wasm32")]
108use std::{
109 pin::Pin,
110 task::{Context, Poll},
111};
112
113#[cfg(target_arch = "wasm32")]
114use futures_util::future::LocalBoxFuture;
115#[cfg(not(target_arch = "wasm32"))]
116use tokio::runtime::{self as tokio_runtime, Runtime};
117#[cfg(not(target_arch = "wasm32"))]
118use tokio::task::JoinHandle;
119
120#[cfg(target_arch = "wasm32")]
122#[derive(Clone, Copy, Debug)]
123pub struct WasmHandle;
124
125#[cfg(target_arch = "wasm32")]
129pub struct WasmJoinHandle<T> {
130 future: LocalBoxFuture<'static, T>,
131}
132
133#[cfg(target_arch = "wasm32")]
134impl<T> Future for WasmJoinHandle<T> {
135 type Output = Result<T, WasmJoinError>;
136
137 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
138 match self.future.as_mut().poll(cx) {
139 Poll::Ready(v) => Poll::Ready(Ok(v)),
140 Poll::Pending => Poll::Pending,
141 }
142 }
143}
144
145#[cfg(target_arch = "wasm32")]
147#[derive(Debug)]
148pub struct WasmJoinError;
149
150#[cfg(target_arch = "wasm32")]
151impl fmt::Display for WasmJoinError {
152 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
153 write!(f, "WASM task failed")
154 }
155}
156
157#[cfg(target_arch = "wasm32")]
158use std::error::Error;
159
160#[cfg(target_arch = "wasm32")]
161impl Error for WasmJoinError {}
162
163#[cfg(not(target_arch = "wasm32"))]
165struct SharedRuntimeInner {
166 tokio: ManuallyDrop<Runtime>,
167 system: ActorSystem,
168 clock: Clock,
169 rng: context::rng::Rng,
170}
171
172#[cfg(not(target_arch = "wasm32"))]
173impl Drop for SharedRuntimeInner {
174 fn drop(&mut self) {
175 self.system.shutdown();
178 let _ = self.system.join();
179
180 let rt = unsafe { ManuallyDrop::take(&mut self.tokio) };
186 rt.shutdown_timeout(Duration::from_secs(5));
187 }
188}
189
190#[cfg(target_arch = "wasm32")]
192struct SharedRuntimeInner {
193 system: ActorSystem,
194 clock: Clock,
195 rng: context::rng::Rng,
196}
197
198#[derive(Clone)]
207pub struct SharedRuntime(Arc<SharedRuntimeInner>);
208
209impl SharedRuntime {
210 #[cfg(not(target_arch = "wasm32"))]
216 pub fn from_config(config: SharedRuntimeConfig) -> Self {
217 let tokio = tokio_runtime::Builder::new_multi_thread()
218 .worker_threads(config.async_threads)
219 .thread_name("async")
220 .enable_all()
221 .build()
222 .expect("Failed to create tokio runtime");
223
224 let system = ActorSystem::with_clock(config.compute_threads, config.clock.clone());
225
226 Self(Arc::new(SharedRuntimeInner {
227 tokio: ManuallyDrop::new(tokio),
228 system,
229 clock: config.clock,
230 rng: config.rng,
231 }))
232 }
233
234 #[cfg(target_arch = "wasm32")]
236 pub fn from_config(config: SharedRuntimeConfig) -> Self {
237 let system = ActorSystem::with_clock(config.compute_threads, config.clock.clone());
238
239 Self(Arc::new(SharedRuntimeInner {
240 system,
241 clock: config.clock,
242 rng: config.rng,
243 }))
244 }
245
246 pub fn actor_system(&self) -> ActorSystem {
248 self.0.system.clone()
249 }
250
251 pub fn clock(&self) -> &Clock {
253 &self.0.clock
254 }
255
256 pub fn rng(&self) -> &context::rng::Rng {
258 &self.0.rng
259 }
260
261 #[cfg(not(target_arch = "wasm32"))]
267 pub fn handle(&self) -> tokio_runtime::Handle {
268 self.0.tokio.handle().clone()
269 }
270
271 #[cfg(target_arch = "wasm32")]
273 pub fn handle(&self) -> WasmHandle {
274 WasmHandle
275 }
276
277 #[cfg(not(target_arch = "wasm32"))]
283 pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
284 where
285 F: Future + Send + 'static,
286 F::Output: Send + 'static,
287 {
288 self.0.tokio.spawn(future)
289 }
290
291 #[cfg(target_arch = "wasm32")]
293 pub fn spawn<F>(&self, future: F) -> WasmJoinHandle<F::Output>
294 where
295 F: Future + 'static,
296 F::Output: 'static,
297 {
298 WasmJoinHandle {
299 future: Box::pin(future),
300 }
301 }
302
303 #[cfg(not(target_arch = "wasm32"))]
307 pub fn block_on<F>(&self, future: F) -> F::Output
308 where
309 F: Future,
310 {
311 self.0.tokio.block_on(future)
312 }
313
314 #[cfg(target_arch = "wasm32")]
318 pub fn block_on<F>(&self, _future: F) -> F::Output
319 where
320 F: Future,
321 {
322 unimplemented!("block_on not supported in WASM - use async execution instead")
323 }
324}
325
326impl fmt::Debug for SharedRuntime {
327 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
328 f.debug_struct("SharedRuntime").finish_non_exhaustive()
329 }
330}
331
332#[cfg(all(test, not(reifydb_single_threaded)))]
334mod tests {
335 use super::*;
336
337 fn test_config() -> SharedRuntimeConfig {
338 SharedRuntimeConfig::default().async_threads(2).compute_threads(2)
339 }
340
341 #[test]
342 fn test_runtime_creation() {
343 let runtime = SharedRuntime::from_config(test_config());
344 let result = runtime.block_on(async { 42 });
345 assert_eq!(result, 42);
346 }
347
348 #[test]
349 fn test_runtime_clone_shares_same_runtime() {
350 let rt1 = SharedRuntime::from_config(test_config());
351 let rt2 = rt1.clone();
352 assert!(Arc::ptr_eq(&rt1.0, &rt2.0));
353 }
354
355 #[test]
356 fn test_spawn() {
357 let runtime = SharedRuntime::from_config(test_config());
358 let handle = runtime.spawn(async { 123 });
359 let result = runtime.block_on(handle).unwrap();
360 assert_eq!(result, 123);
361 }
362
363 #[test]
364 fn test_actor_system_accessible() {
365 let runtime = SharedRuntime::from_config(test_config());
366 let _system = runtime.actor_system();
367 }
368}