1#![cfg_attr(not(debug_assertions), deny(clippy::disallowed_methods))]
18#![cfg_attr(debug_assertions, warn(clippy::disallowed_methods))]
19#![allow(clippy::tabs_in_doc_comments)]
20#![allow(dead_code)]
21
22pub mod context;
23
24pub mod hash;
25
26pub mod pool;
27
28pub mod sync;
29
30pub mod actor;
31
32#[cfg(not(reifydb_target = "dst"))]
33use std::future::Future;
34use std::sync::Arc;
35
36use crate::{
37 actor::system::ActorSystem,
38 context::clock::{Clock, MockClock},
39 pool::{PoolConfig, Pools},
40};
41
42#[derive(Clone)]
43pub struct SharedRuntimeConfig {
44 pub clock: Clock,
45 pub rng: context::rng::Rng,
46}
47
48impl Default for SharedRuntimeConfig {
49 fn default() -> Self {
50 Self {
51 clock: Clock::Real,
52 rng: context::rng::Rng::default(),
53 }
54 }
55}
56
57impl SharedRuntimeConfig {
58 pub fn seeded(mut self, seed: u64) -> Self {
59 self.clock = Clock::Mock(MockClock::from_millis(seed));
60 self.rng = context::rng::Rng::seeded(seed);
61 self
62 }
63}
64
65use std::fmt;
66#[cfg(target_arch = "wasm32")]
67use std::{
68 pin::Pin,
69 task::{Context, Poll},
70};
71
72#[cfg(target_arch = "wasm32")]
73use futures_util::future::LocalBoxFuture;
74#[cfg(all(not(target_arch = "wasm32"), not(reifydb_target = "dst")))]
75use tokio::runtime as tokio_runtime;
76#[cfg(all(not(target_arch = "wasm32"), not(reifydb_target = "dst")))]
77use tokio::task::JoinHandle;
78
79#[cfg(target_arch = "wasm32")]
80#[derive(Clone, Copy, Debug)]
81pub struct WasmHandle;
82
83#[cfg(target_arch = "wasm32")]
84pub struct WasmJoinHandle<T> {
85 future: LocalBoxFuture<'static, T>,
86}
87
88#[cfg(target_arch = "wasm32")]
89impl<T> Future for WasmJoinHandle<T> {
90 type Output = Result<T, WasmJoinError>;
91
92 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
93 match self.future.as_mut().poll(cx) {
94 Poll::Ready(v) => Poll::Ready(Ok(v)),
95 Poll::Pending => Poll::Pending,
96 }
97 }
98}
99
100#[cfg(target_arch = "wasm32")]
101#[derive(Debug)]
102pub struct WasmJoinError;
103
104#[cfg(target_arch = "wasm32")]
105impl fmt::Display for WasmJoinError {
106 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
107 write!(f, "WASM task failed")
108 }
109}
110
111#[cfg(target_arch = "wasm32")]
112use std::error::Error;
113
114#[cfg(target_arch = "wasm32")]
115impl Error for WasmJoinError {}
116
117#[cfg(all(not(target_arch = "wasm32"), not(reifydb_target = "dst")))]
118struct SharedRuntimeInner {
119 system: ActorSystem,
120 pools: Pools,
121 clock: Clock,
122 rng: context::rng::Rng,
123}
124
125#[cfg(all(not(target_arch = "wasm32"), not(reifydb_target = "dst")))]
126impl Drop for SharedRuntimeInner {
127 fn drop(&mut self) {
128 self.system.shutdown();
129 let _ = self.system.join();
130 }
131}
132
133#[cfg(target_arch = "wasm32")]
134struct SharedRuntimeInner {
135 system: ActorSystem,
136 pools: Pools,
137 clock: Clock,
138 rng: context::rng::Rng,
139}
140
141#[cfg(reifydb_target = "dst")]
142struct SharedRuntimeInner {
143 system: ActorSystem,
144 pools: Pools,
145 clock: Clock,
146 rng: context::rng::Rng,
147}
148
149#[derive(Clone)]
150pub struct SharedRuntime(Arc<SharedRuntimeInner>);
151
152impl SharedRuntime {
153 pub fn from_config(config: SharedRuntimeConfig, pools: PoolConfig) -> Self {
154 let pools = Pools::new(pools);
155 let system = ActorSystem::new(pools.clone(), config.clock.clone());
156
157 Self(Arc::new(SharedRuntimeInner {
158 system,
159 pools,
160 clock: config.clock,
161 rng: config.rng,
162 }))
163 }
164
165 pub fn actor_system(&self) -> ActorSystem {
166 self.0.system.clone()
167 }
168
169 pub fn clock(&self) -> &Clock {
170 &self.0.clock
171 }
172
173 pub fn rng(&self) -> &context::rng::Rng {
174 &self.0.rng
175 }
176
177 pub fn pools(&self) -> Pools {
178 self.0.pools.clone()
179 }
180
181 #[cfg(all(not(target_arch = "wasm32"), not(reifydb_target = "dst")))]
182 pub fn handle(&self) -> tokio_runtime::Handle {
183 self.0.pools.handle()
184 }
185
186 #[cfg(target_arch = "wasm32")]
187 pub fn handle(&self) -> WasmHandle {
188 WasmHandle
189 }
190
191 #[cfg(all(not(target_arch = "wasm32"), not(reifydb_target = "dst")))]
192 pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
193 where
194 F: Future + Send + 'static,
195 F::Output: Send + 'static,
196 {
197 self.0.pools.spawn(future)
198 }
199
200 #[cfg(target_arch = "wasm32")]
201 pub fn spawn<F>(&self, future: F) -> WasmJoinHandle<F::Output>
202 where
203 F: Future + 'static,
204 F::Output: 'static,
205 {
206 WasmJoinHandle {
207 future: Box::pin(future),
208 }
209 }
210
211 #[cfg(all(not(target_arch = "wasm32"), not(reifydb_target = "dst")))]
212 pub fn block_on<F>(&self, future: F) -> F::Output
213 where
214 F: Future,
215 {
216 self.0.pools.block_on(future)
217 }
218
219 #[cfg(target_arch = "wasm32")]
220 pub fn block_on<F>(&self, _future: F) -> F::Output
221 where
222 F: Future,
223 {
224 unimplemented!("block_on not supported in WASM - use async execution instead")
225 }
226}
227
228impl fmt::Debug for SharedRuntime {
229 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
230 f.debug_struct("SharedRuntime").finish_non_exhaustive()
231 }
232}
233
234#[cfg(all(test, not(reifydb_single_threaded)))]
235mod tests {
236 use super::*;
237
238 fn test_config() -> SharedRuntimeConfig {
239 SharedRuntimeConfig::default()
240 }
241
242 fn test_pools() -> PoolConfig {
243 PoolConfig {
244 async_threads: 2,
245 system_threads: 2,
246 query_threads: 2,
247 }
248 }
249
250 #[test]
251 fn test_runtime_creation() {
252 let runtime = SharedRuntime::from_config(test_config(), test_pools());
253 let result = runtime.block_on(async { 42 });
254 assert_eq!(result, 42);
255 }
256
257 #[test]
258 fn test_runtime_clone_shares_same_runtime() {
259 let rt1 = SharedRuntime::from_config(test_config(), test_pools());
260 let rt2 = rt1.clone();
261 assert!(Arc::ptr_eq(&rt1.0, &rt2.0));
262 }
263
264 #[test]
265 fn test_spawn() {
266 let runtime = SharedRuntime::from_config(test_config(), test_pools());
267 let handle = runtime.spawn(async { 123 });
268 let result = runtime.block_on(handle).unwrap();
269 assert_eq!(result, 123);
270 }
271
272 #[test]
273 fn test_actor_system_accessible() {
274 let runtime = SharedRuntime::from_config(test_config(), test_pools());
275 let _system = runtime.actor_system();
276 }
277}