1#![allow(dead_code)]
42
43pub mod clock;
44
45pub mod hash;
46
47pub mod sync;
48
49pub mod actor;
50
51use std::{future::Future, sync::Arc};
52#[cfg(not(target_arch = "wasm32"))]
53use std::{mem::ManuallyDrop, time::Duration};
54
55use crate::{
56 actor::system::{ActorSystem, ActorSystemConfig},
57 clock::{Clock, MockClock},
58};
59
60#[derive(Clone)]
62pub struct SharedRuntimeConfig {
63 pub async_threads: usize,
65 pub compute_threads: usize,
67 pub compute_max_in_flight: usize,
69 pub clock: Clock,
71}
72
73impl Default for SharedRuntimeConfig {
74 fn default() -> Self {
75 Self {
76 async_threads: 1,
77 compute_threads: 1,
78 compute_max_in_flight: 32,
79 clock: Clock::Real,
80 }
81 }
82}
83
84impl SharedRuntimeConfig {
85 pub fn async_threads(mut self, threads: usize) -> Self {
87 self.async_threads = threads;
88 self
89 }
90
91 pub fn compute_threads(mut self, threads: usize) -> Self {
93 self.compute_threads = threads;
94 self
95 }
96
97 pub fn compute_max_in_flight(mut self, max: usize) -> Self {
99 self.compute_max_in_flight = max;
100 self
101 }
102
103 pub fn mock_clock(mut self, initial_millis: u64) -> Self {
105 self.clock = Clock::Mock(MockClock::from_millis(initial_millis));
106 self
107 }
108
109 pub fn clock(mut self, clock: Clock) -> Self {
111 self.clock = clock;
112 self
113 }
114
115 pub fn actor_system_config(&self) -> ActorSystemConfig {
117 ActorSystemConfig {
118 pool_threads: self.compute_threads,
119 max_in_flight: self.compute_max_in_flight,
120 }
121 }
122}
123
124use std::fmt;
126#[cfg(target_arch = "wasm32")]
127use std::{
128 pin::Pin,
129 task::{Context, Poll},
130};
131
132#[cfg(target_arch = "wasm32")]
133use futures_util::future::LocalBoxFuture;
134#[cfg(not(target_arch = "wasm32"))]
135use tokio::runtime::{self as tokio_runtime, Runtime};
136#[cfg(not(target_arch = "wasm32"))]
137use tokio::task::JoinHandle;
138
139#[cfg(target_arch = "wasm32")]
141#[derive(Clone, Copy, Debug)]
142pub struct WasmHandle;
143
144#[cfg(target_arch = "wasm32")]
148pub struct WasmJoinHandle<T> {
149 future: LocalBoxFuture<'static, T>,
150}
151
152#[cfg(target_arch = "wasm32")]
153impl<T> Future for WasmJoinHandle<T> {
154 type Output = Result<T, WasmJoinError>;
155
156 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
157 match self.future.as_mut().poll(cx) {
158 Poll::Ready(v) => Poll::Ready(Ok(v)),
159 Poll::Pending => Poll::Pending,
160 }
161 }
162}
163
164#[cfg(target_arch = "wasm32")]
166#[derive(Debug)]
167pub struct WasmJoinError;
168
169#[cfg(target_arch = "wasm32")]
170impl fmt::Display for WasmJoinError {
171 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
172 write!(f, "WASM task failed")
173 }
174}
175
176#[cfg(target_arch = "wasm32")]
177use std::error::Error;
178
179#[cfg(target_arch = "wasm32")]
180impl Error for WasmJoinError {}
181
182#[cfg(not(target_arch = "wasm32"))]
184struct SharedRuntimeInner {
185 tokio: ManuallyDrop<Runtime>,
186 system: ActorSystem,
187 clock: Clock,
188}
189
190#[cfg(not(target_arch = "wasm32"))]
191impl Drop for SharedRuntimeInner {
192 fn drop(&mut self) {
193 let rt = unsafe { ManuallyDrop::take(&mut self.tokio) };
199 rt.shutdown_timeout(Duration::from_secs(5));
200 }
201}
202
203#[cfg(target_arch = "wasm32")]
205struct SharedRuntimeInner {
206 system: ActorSystem,
207 clock: Clock,
208}
209
210#[derive(Clone)]
219pub struct SharedRuntime(Arc<SharedRuntimeInner>);
220
221impl SharedRuntime {
222 #[cfg(not(target_arch = "wasm32"))]
228 pub fn from_config(config: SharedRuntimeConfig) -> Self {
229 let tokio = tokio_runtime::Builder::new_multi_thread()
230 .worker_threads(config.async_threads)
231 .thread_name("async")
232 .enable_all()
233 .build()
234 .expect("Failed to create tokio runtime");
235
236 let system = ActorSystem::new(config.actor_system_config());
237
238 Self(Arc::new(SharedRuntimeInner {
239 tokio: ManuallyDrop::new(tokio),
240 system,
241 clock: config.clock,
242 }))
243 }
244
245 #[cfg(target_arch = "wasm32")]
247 pub fn from_config(config: SharedRuntimeConfig) -> Self {
248 let system = ActorSystem::new(config.actor_system_config());
249
250 Self(Arc::new(SharedRuntimeInner {
251 system,
252 clock: config.clock,
253 }))
254 }
255
256 pub fn actor_system(&self) -> ActorSystem {
258 self.0.system.clone()
259 }
260
261 pub fn clock(&self) -> &Clock {
263 &self.0.clock
264 }
265
266 #[cfg(not(target_arch = "wasm32"))]
272 pub fn handle(&self) -> tokio_runtime::Handle {
273 self.0.tokio.handle().clone()
274 }
275
276 #[cfg(target_arch = "wasm32")]
278 pub fn handle(&self) -> WasmHandle {
279 WasmHandle
280 }
281
282 #[cfg(not(target_arch = "wasm32"))]
288 pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
289 where
290 F: Future + Send + 'static,
291 F::Output: Send + 'static,
292 {
293 self.0.tokio.spawn(future)
294 }
295
296 #[cfg(target_arch = "wasm32")]
298 pub fn spawn<F>(&self, future: F) -> WasmJoinHandle<F::Output>
299 where
300 F: Future + 'static,
301 F::Output: 'static,
302 {
303 WasmJoinHandle {
304 future: Box::pin(future),
305 }
306 }
307
308 #[cfg(not(target_arch = "wasm32"))]
312 pub fn block_on<F>(&self, future: F) -> F::Output
313 where
314 F: Future,
315 {
316 self.0.tokio.block_on(future)
317 }
318
319 #[cfg(target_arch = "wasm32")]
323 pub fn block_on<F>(&self, _future: F) -> F::Output
324 where
325 F: Future,
326 {
327 unimplemented!("block_on not supported in WASM - use async execution instead")
328 }
329
330 pub fn install<R, F>(&self, f: F) -> R
335 where
336 R: Send,
337 F: FnOnce() -> R + Send,
338 {
339 self.0.system.install(f)
340 }
341}
342
343impl fmt::Debug for SharedRuntime {
344 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
345 f.debug_struct("SharedRuntime").finish_non_exhaustive()
346 }
347}
348
349#[cfg(all(test, reifydb_target = "native"))]
351mod tests {
352 use super::*;
353
354 fn test_config() -> SharedRuntimeConfig {
355 SharedRuntimeConfig::default().async_threads(2).compute_threads(2).compute_max_in_flight(4)
356 }
357
358 #[test]
359 fn test_runtime_creation() {
360 let runtime = SharedRuntime::from_config(test_config());
361 let result = runtime.block_on(async { 42 });
362 assert_eq!(result, 42);
363 }
364
365 #[test]
366 fn test_runtime_clone_shares_same_runtime() {
367 let rt1 = SharedRuntime::from_config(test_config());
368 let rt2 = rt1.clone();
369 assert!(Arc::ptr_eq(&rt1.0, &rt2.0));
370 }
371
372 #[test]
373 fn test_spawn() {
374 let runtime = SharedRuntime::from_config(test_config());
375 let handle = runtime.spawn(async { 123 });
376 let result = runtime.block_on(handle).unwrap();
377 assert_eq!(result, 123);
378 }
379
380 #[test]
381 fn test_actor_system_accessible() {
382 let runtime = SharedRuntime::from_config(test_config());
383 let system = runtime.actor_system();
384 let result = system.install(|| 42);
385 assert_eq!(result, 42);
386 }
387
388 #[test]
389 fn test_install() {
390 let runtime = SharedRuntime::from_config(test_config());
391 let result = runtime.install(|| 42);
392 assert_eq!(result, 42);
393 }
394}
395
396#[cfg(all(test, reifydb_target = "wasm"))]
397mod wasm_tests {
398 use super::*;
399
400 #[test]
401 fn test_wasm_runtime_creation() {
402 let runtime = SharedRuntime::from_config(SharedRuntimeConfig::default());
403 let system = runtime.actor_system();
404 let result = system.install(|| 42);
405 assert_eq!(result, 42);
406 }
407}