1#![allow(dead_code)]
42
43pub mod context;
44
45pub mod hash;
46
47pub mod sync;
48
49pub mod actor;
50
51use std::{future::Future, sync::Arc};
52#[cfg(not(target_arch = "wasm32"))]
53use std::{mem::ManuallyDrop, time::Duration};
54
55use crate::{
56 actor::system::{ActorSystem, ActorSystemConfig},
57 context::clock::{Clock, MockClock},
58};
59
60#[derive(Clone)]
62pub struct SharedRuntimeConfig {
63 pub async_threads: usize,
65 pub compute_threads: usize,
67 pub compute_max_in_flight: usize,
69 pub clock: Clock,
71 pub rng: context::rng::Rng,
73}
74
75impl Default for SharedRuntimeConfig {
76 fn default() -> Self {
77 Self {
78 async_threads: 1,
79 compute_threads: 1,
80 compute_max_in_flight: 32,
81 clock: Clock::Real,
82 rng: context::rng::Rng::default(),
83 }
84 }
85}
86
87impl SharedRuntimeConfig {
88 pub fn async_threads(mut self, threads: usize) -> Self {
90 self.async_threads = threads;
91 self
92 }
93
94 pub fn compute_threads(mut self, threads: usize) -> Self {
96 self.compute_threads = threads;
97 self
98 }
99
100 pub fn compute_max_in_flight(mut self, max: usize) -> Self {
102 self.compute_max_in_flight = max;
103 self
104 }
105
106 pub fn deterministic_testing(mut self, seed: u64) -> Self {
109 self.clock = Clock::Mock(MockClock::from_millis(seed));
110 self.rng = context::rng::Rng::seeded(seed);
111 self
112 }
113
114 pub fn actor_system_config(&self) -> ActorSystemConfig {
116 ActorSystemConfig {
117 pool_threads: self.compute_threads,
118 max_in_flight: self.compute_max_in_flight,
119 }
120 }
121}
122
123use std::fmt;
125#[cfg(target_arch = "wasm32")]
126use std::{
127 pin::Pin,
128 task::{Context, Poll},
129};
130
131#[cfg(target_arch = "wasm32")]
132use futures_util::future::LocalBoxFuture;
133#[cfg(not(target_arch = "wasm32"))]
134use tokio::runtime::{self as tokio_runtime, Runtime};
135#[cfg(not(target_arch = "wasm32"))]
136use tokio::task::JoinHandle;
137
138#[cfg(target_arch = "wasm32")]
140#[derive(Clone, Copy, Debug)]
141pub struct WasmHandle;
142
143#[cfg(target_arch = "wasm32")]
147pub struct WasmJoinHandle<T> {
148 future: LocalBoxFuture<'static, T>,
149}
150
151#[cfg(target_arch = "wasm32")]
152impl<T> Future for WasmJoinHandle<T> {
153 type Output = Result<T, WasmJoinError>;
154
155 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
156 match self.future.as_mut().poll(cx) {
157 Poll::Ready(v) => Poll::Ready(Ok(v)),
158 Poll::Pending => Poll::Pending,
159 }
160 }
161}
162
163#[cfg(target_arch = "wasm32")]
165#[derive(Debug)]
166pub struct WasmJoinError;
167
168#[cfg(target_arch = "wasm32")]
169impl fmt::Display for WasmJoinError {
170 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
171 write!(f, "WASM task failed")
172 }
173}
174
175#[cfg(target_arch = "wasm32")]
176use std::error::Error;
177
178#[cfg(target_arch = "wasm32")]
179impl Error for WasmJoinError {}
180
181#[cfg(not(target_arch = "wasm32"))]
183struct SharedRuntimeInner {
184 tokio: ManuallyDrop<Runtime>,
185 system: ActorSystem,
186 clock: Clock,
187 rng: context::rng::Rng,
188}
189
190#[cfg(not(target_arch = "wasm32"))]
191impl Drop for SharedRuntimeInner {
192 fn drop(&mut self) {
193 let rt = unsafe { ManuallyDrop::take(&mut self.tokio) };
199 rt.shutdown_timeout(Duration::from_secs(5));
200 }
201}
202
203#[cfg(target_arch = "wasm32")]
205struct SharedRuntimeInner {
206 system: ActorSystem,
207 clock: Clock,
208 rng: context::rng::Rng,
209}
210
211#[derive(Clone)]
220pub struct SharedRuntime(Arc<SharedRuntimeInner>);
221
222impl SharedRuntime {
223 #[cfg(not(target_arch = "wasm32"))]
229 pub fn from_config(config: SharedRuntimeConfig) -> Self {
230 let tokio = tokio_runtime::Builder::new_multi_thread()
231 .worker_threads(config.async_threads)
232 .thread_name("async")
233 .enable_all()
234 .build()
235 .expect("Failed to create tokio runtime");
236
237 let system = ActorSystem::new(config.actor_system_config());
238
239 Self(Arc::new(SharedRuntimeInner {
240 tokio: ManuallyDrop::new(tokio),
241 system,
242 clock: config.clock,
243 rng: config.rng,
244 }))
245 }
246
247 #[cfg(target_arch = "wasm32")]
249 pub fn from_config(config: SharedRuntimeConfig) -> Self {
250 let system = ActorSystem::new(config.actor_system_config());
251
252 Self(Arc::new(SharedRuntimeInner {
253 system,
254 clock: config.clock,
255 rng: config.rng,
256 }))
257 }
258
259 pub fn actor_system(&self) -> ActorSystem {
261 self.0.system.clone()
262 }
263
264 pub fn clock(&self) -> &Clock {
266 &self.0.clock
267 }
268
269 pub fn rng(&self) -> &context::rng::Rng {
271 &self.0.rng
272 }
273
274 #[cfg(not(target_arch = "wasm32"))]
280 pub fn handle(&self) -> tokio_runtime::Handle {
281 self.0.tokio.handle().clone()
282 }
283
284 #[cfg(target_arch = "wasm32")]
286 pub fn handle(&self) -> WasmHandle {
287 WasmHandle
288 }
289
290 #[cfg(not(target_arch = "wasm32"))]
296 pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
297 where
298 F: Future + Send + 'static,
299 F::Output: Send + 'static,
300 {
301 self.0.tokio.spawn(future)
302 }
303
304 #[cfg(target_arch = "wasm32")]
306 pub fn spawn<F>(&self, future: F) -> WasmJoinHandle<F::Output>
307 where
308 F: Future + 'static,
309 F::Output: 'static,
310 {
311 WasmJoinHandle {
312 future: Box::pin(future),
313 }
314 }
315
316 #[cfg(not(target_arch = "wasm32"))]
320 pub fn block_on<F>(&self, future: F) -> F::Output
321 where
322 F: Future,
323 {
324 self.0.tokio.block_on(future)
325 }
326
327 #[cfg(target_arch = "wasm32")]
331 pub fn block_on<F>(&self, _future: F) -> F::Output
332 where
333 F: Future,
334 {
335 unimplemented!("block_on not supported in WASM - use async execution instead")
336 }
337
338 pub fn install<R, F>(&self, f: F) -> R
343 where
344 R: Send,
345 F: FnOnce() -> R + Send,
346 {
347 self.0.system.install(f)
348 }
349}
350
351impl fmt::Debug for SharedRuntime {
352 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
353 f.debug_struct("SharedRuntime").finish_non_exhaustive()
354 }
355}
356
357#[cfg(all(test, not(reifydb_single_threaded)))]
359mod tests {
360 use super::*;
361
362 fn test_config() -> SharedRuntimeConfig {
363 SharedRuntimeConfig::default().async_threads(2).compute_threads(2).compute_max_in_flight(4)
364 }
365
366 #[test]
367 fn test_runtime_creation() {
368 let runtime = SharedRuntime::from_config(test_config());
369 let result = runtime.block_on(async { 42 });
370 assert_eq!(result, 42);
371 }
372
373 #[test]
374 fn test_runtime_clone_shares_same_runtime() {
375 let rt1 = SharedRuntime::from_config(test_config());
376 let rt2 = rt1.clone();
377 assert!(Arc::ptr_eq(&rt1.0, &rt2.0));
378 }
379
380 #[test]
381 fn test_spawn() {
382 let runtime = SharedRuntime::from_config(test_config());
383 let handle = runtime.spawn(async { 123 });
384 let result = runtime.block_on(handle).unwrap();
385 assert_eq!(result, 123);
386 }
387
388 #[test]
389 fn test_actor_system_accessible() {
390 let runtime = SharedRuntime::from_config(test_config());
391 let system = runtime.actor_system();
392 let result = system.install(|| 42);
393 assert_eq!(result, 42);
394 }
395
396 #[test]
397 fn test_install() {
398 let runtime = SharedRuntime::from_config(test_config());
399 let result = runtime.install(|| 42);
400 assert_eq!(result, 42);
401 }
402}
403
404#[cfg(all(test, reifydb_single_threaded))]
405mod wasm_tests {
406 use super::*;
407
408 #[test]
409 fn test_wasm_runtime_creation() {
410 let runtime = SharedRuntime::from_config(SharedRuntimeConfig::default());
411 let system = runtime.actor_system();
412 let result = system.install(|| 42);
413 assert_eq!(result, 42);
414 }
415}