1#![allow(dead_code)]
42
43pub mod clock;
44
45pub mod hash;
46
47pub mod sync;
48
49pub mod actor;
50
51use std::{future::Future, mem::ManuallyDrop, sync::Arc, time::Duration};
52
53use crate::{
54 actor::system::{ActorSystem, ActorSystemConfig},
55 clock::{Clock, MockClock},
56};
57
58#[derive(Clone)]
60pub struct SharedRuntimeConfig {
61 pub async_threads: usize,
63 pub compute_threads: usize,
65 pub compute_max_in_flight: usize,
67 pub clock: Clock,
69}
70
71impl Default for SharedRuntimeConfig {
72 fn default() -> Self {
73 Self {
74 async_threads: 1,
75 compute_threads: 1,
76 compute_max_in_flight: 32,
77 clock: Clock::Real,
78 }
79 }
80}
81
82impl SharedRuntimeConfig {
83 pub fn async_threads(mut self, threads: usize) -> Self {
85 self.async_threads = threads;
86 self
87 }
88
89 pub fn compute_threads(mut self, threads: usize) -> Self {
91 self.compute_threads = threads;
92 self
93 }
94
95 pub fn compute_max_in_flight(mut self, max: usize) -> Self {
97 self.compute_max_in_flight = max;
98 self
99 }
100
101 pub fn mock_clock(mut self, initial_millis: u64) -> Self {
103 self.clock = Clock::Mock(MockClock::from_millis(initial_millis));
104 self
105 }
106
107 pub fn clock(mut self, clock: Clock) -> Self {
109 self.clock = clock;
110 self
111 }
112
113 pub fn actor_system_config(&self) -> ActorSystemConfig {
115 ActorSystemConfig {
116 pool_threads: self.compute_threads,
117 max_in_flight: self.compute_max_in_flight,
118 }
119 }
120}
121
122#[cfg(target_arch = "wasm32")]
124use std::{pin::Pin, task::Poll};
125
126#[cfg(target_arch = "wasm32")]
127use futures_util::future::LocalBoxFuture;
128#[cfg(not(target_arch = "wasm32"))]
129use tokio::runtime::Runtime;
130
131#[cfg(target_arch = "wasm32")]
133#[derive(Clone, Copy, Debug)]
134pub struct WasmHandle;
135
136#[cfg(target_arch = "wasm32")]
140pub struct WasmJoinHandle<T> {
141 future: LocalBoxFuture<'static, T>,
142}
143
144#[cfg(target_arch = "wasm32")]
145impl<T> Future for WasmJoinHandle<T> {
146 type Output = Result<T, WasmJoinError>;
147
148 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
149 match self.future.as_mut().poll(cx) {
150 Poll::Ready(v) => Poll::Ready(Ok(v)),
151 Poll::Pending => Poll::Pending,
152 }
153 }
154}
155
156#[cfg(target_arch = "wasm32")]
158#[derive(Debug)]
159pub struct WasmJoinError;
160
161#[cfg(target_arch = "wasm32")]
162impl std::fmt::Display for WasmJoinError {
163 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
164 write!(f, "WASM task failed")
165 }
166}
167
168#[cfg(target_arch = "wasm32")]
169impl std::error::Error for WasmJoinError {}
170
171#[cfg(not(target_arch = "wasm32"))]
173struct SharedRuntimeInner {
174 tokio: ManuallyDrop<Runtime>,
175 system: ActorSystem,
176 clock: Clock,
177}
178
179#[cfg(not(target_arch = "wasm32"))]
180impl Drop for SharedRuntimeInner {
181 fn drop(&mut self) {
182 let rt = unsafe { ManuallyDrop::take(&mut self.tokio) };
188 rt.shutdown_timeout(Duration::from_secs(5));
189 }
190}
191
192#[cfg(target_arch = "wasm32")]
194struct SharedRuntimeInner {
195 system: ActorSystem,
196 clock: Clock,
197}
198
199#[derive(Clone)]
208pub struct SharedRuntime(Arc<SharedRuntimeInner>);
209
210impl SharedRuntime {
211 #[cfg(not(target_arch = "wasm32"))]
217 pub fn from_config(config: SharedRuntimeConfig) -> Self {
218 let tokio = tokio::runtime::Builder::new_multi_thread()
219 .worker_threads(config.async_threads)
220 .thread_name("async")
221 .enable_all()
222 .build()
223 .expect("Failed to create tokio runtime");
224
225 let system = ActorSystem::new(config.actor_system_config());
226
227 Self(Arc::new(SharedRuntimeInner {
228 tokio: ManuallyDrop::new(tokio),
229 system,
230 clock: config.clock,
231 }))
232 }
233
234 #[cfg(target_arch = "wasm32")]
236 pub fn from_config(config: SharedRuntimeConfig) -> Self {
237 let system = ActorSystem::new(config.actor_system_config());
238
239 Self(Arc::new(SharedRuntimeInner {
240 system,
241 clock: config.clock,
242 }))
243 }
244
245 pub fn actor_system(&self) -> ActorSystem {
247 self.0.system.clone()
248 }
249
250 pub fn clock(&self) -> &Clock {
252 &self.0.clock
253 }
254
255 #[cfg(not(target_arch = "wasm32"))]
261 pub fn handle(&self) -> tokio::runtime::Handle {
262 self.0.tokio.handle().clone()
263 }
264
265 #[cfg(target_arch = "wasm32")]
267 pub fn handle(&self) -> WasmHandle {
268 WasmHandle
269 }
270
271 #[cfg(not(target_arch = "wasm32"))]
277 pub fn spawn<F>(&self, future: F) -> tokio::task::JoinHandle<F::Output>
278 where
279 F: Future + Send + 'static,
280 F::Output: Send + 'static,
281 {
282 self.0.tokio.spawn(future)
283 }
284
285 #[cfg(target_arch = "wasm32")]
287 pub fn spawn<F>(&self, future: F) -> WasmJoinHandle<F::Output>
288 where
289 F: Future + 'static,
290 F::Output: 'static,
291 {
292 WasmJoinHandle {
293 future: Box::pin(future),
294 }
295 }
296
297 #[cfg(not(target_arch = "wasm32"))]
301 pub fn block_on<F>(&self, future: F) -> F::Output
302 where
303 F: Future,
304 {
305 self.0.tokio.block_on(future)
306 }
307
308 #[cfg(target_arch = "wasm32")]
312 pub fn block_on<F>(&self, _future: F) -> F::Output
313 where
314 F: Future,
315 {
316 unimplemented!("block_on not supported in WASM - use async execution instead")
317 }
318
319 pub fn install<R, F>(&self, f: F) -> R
324 where
325 R: Send,
326 F: FnOnce() -> R + Send,
327 {
328 self.0.system.install(f)
329 }
330}
331
332impl std::fmt::Debug for SharedRuntime {
333 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
334 f.debug_struct("SharedRuntime").finish_non_exhaustive()
335 }
336}
337
338#[cfg(all(test, reifydb_target = "native"))]
340mod tests {
341 use super::*;
342
343 fn test_config() -> SharedRuntimeConfig {
344 SharedRuntimeConfig::default().async_threads(2).compute_threads(2).compute_max_in_flight(4)
345 }
346
347 #[test]
348 fn test_runtime_creation() {
349 let runtime = SharedRuntime::from_config(test_config());
350 let result = runtime.block_on(async { 42 });
351 assert_eq!(result, 42);
352 }
353
354 #[test]
355 fn test_runtime_clone_shares_same_runtime() {
356 let rt1 = SharedRuntime::from_config(test_config());
357 let rt2 = rt1.clone();
358 assert!(Arc::ptr_eq(&rt1.0, &rt2.0));
359 }
360
361 #[test]
362 fn test_spawn() {
363 let runtime = SharedRuntime::from_config(test_config());
364 let handle = runtime.spawn(async { 123 });
365 let result = runtime.block_on(handle).unwrap();
366 assert_eq!(result, 123);
367 }
368
369 #[test]
370 fn test_actor_system_accessible() {
371 let runtime = SharedRuntime::from_config(test_config());
372 let system = runtime.actor_system();
373 let result = system.install(|| 42);
374 assert_eq!(result, 42);
375 }
376
377 #[test]
378 fn test_install() {
379 let runtime = SharedRuntime::from_config(test_config());
380 let result = runtime.install(|| 42);
381 assert_eq!(result, 42);
382 }
383}
384
385#[cfg(all(test, reifydb_target = "wasm"))]
386mod wasm_tests {
387 use super::*;
388
389 #[test]
390 fn test_wasm_runtime_creation() {
391 let runtime = SharedRuntime::from_config(SharedRuntimeConfig::default());
392 let system = runtime.actor_system();
393 let result = system.install(|| 42);
394 assert_eq!(result, 42);
395 }
396}