1#![cfg_attr(not(debug_assertions), deny(clippy::disallowed_methods))]
4#![cfg_attr(debug_assertions, warn(clippy::disallowed_methods))]
5#![allow(clippy::tabs_in_doc_comments)]
6#![allow(dead_code)]
7
8pub mod context;
9
10pub mod hash;
11
12pub mod pool;
13
14pub mod sync;
15
16pub mod actor;
17
18#[cfg(not(reifydb_target = "dst"))]
19use std::future::Future;
20use std::{sync::Arc, thread::available_parallelism};
21
22use crate::{
23 actor::system::ActorSystem,
24 context::clock::{Clock, MockClock},
25 pool::{PoolConfig, Pools},
26};
27
28#[derive(Clone)]
30pub struct SharedRuntimeConfig {
31 pub async_threads: usize,
33 pub system_threads: usize,
35 pub query_threads: usize,
37 pub clock: Clock,
39 pub rng: context::rng::Rng,
41}
42
43impl Default for SharedRuntimeConfig {
44 fn default() -> Self {
45 let cpus = available_parallelism().map_or(1, |n| n.get());
46 Self {
47 async_threads: 1,
48 system_threads: cpus.min(4),
49 query_threads: cpus,
50 clock: Clock::Real,
51 rng: context::rng::Rng::default(),
52 }
53 }
54}
55
56impl SharedRuntimeConfig {
57 pub fn async_threads(mut self, threads: usize) -> Self {
59 self.async_threads = threads;
60 self
61 }
62
63 pub fn system_threads(mut self, threads: usize) -> Self {
65 self.system_threads = threads;
66 self
67 }
68
69 pub fn query_threads(mut self, threads: usize) -> Self {
71 self.query_threads = threads;
72 self
73 }
74
75 pub fn seeded(mut self, seed: u64) -> Self {
78 self.clock = Clock::Mock(MockClock::from_millis(seed));
79 self.rng = context::rng::Rng::seeded(seed);
80 self
81 }
82}
83
84use std::fmt;
86#[cfg(target_arch = "wasm32")]
87use std::{
88 pin::Pin,
89 task::{Context, Poll},
90};
91
92#[cfg(target_arch = "wasm32")]
93use futures_util::future::LocalBoxFuture;
94#[cfg(all(not(target_arch = "wasm32"), not(reifydb_target = "dst")))]
95use tokio::runtime as tokio_runtime;
96#[cfg(all(not(target_arch = "wasm32"), not(reifydb_target = "dst")))]
97use tokio::task::JoinHandle;
98
99#[cfg(target_arch = "wasm32")]
101#[derive(Clone, Copy, Debug)]
102pub struct WasmHandle;
103
104#[cfg(target_arch = "wasm32")]
108pub struct WasmJoinHandle<T> {
109 future: LocalBoxFuture<'static, T>,
110}
111
112#[cfg(target_arch = "wasm32")]
113impl<T> Future for WasmJoinHandle<T> {
114 type Output = Result<T, WasmJoinError>;
115
116 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
117 match self.future.as_mut().poll(cx) {
118 Poll::Ready(v) => Poll::Ready(Ok(v)),
119 Poll::Pending => Poll::Pending,
120 }
121 }
122}
123
124#[cfg(target_arch = "wasm32")]
126#[derive(Debug)]
127pub struct WasmJoinError;
128
129#[cfg(target_arch = "wasm32")]
130impl fmt::Display for WasmJoinError {
131 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132 write!(f, "WASM task failed")
133 }
134}
135
136#[cfg(target_arch = "wasm32")]
137use std::error::Error;
138
139#[cfg(target_arch = "wasm32")]
140impl Error for WasmJoinError {}
141
142#[cfg(all(not(target_arch = "wasm32"), not(reifydb_target = "dst")))]
144struct SharedRuntimeInner {
145 system: ActorSystem,
146 pools: Pools,
147 clock: Clock,
148 rng: context::rng::Rng,
149}
150
151#[cfg(all(not(target_arch = "wasm32"), not(reifydb_target = "dst")))]
152impl Drop for SharedRuntimeInner {
153 fn drop(&mut self) {
154 self.system.shutdown();
155 let _ = self.system.join();
156 }
157}
158
159#[cfg(target_arch = "wasm32")]
161struct SharedRuntimeInner {
162 system: ActorSystem,
163 pools: Pools,
164 clock: Clock,
165 rng: context::rng::Rng,
166}
167
168#[cfg(reifydb_target = "dst")]
170struct SharedRuntimeInner {
171 system: ActorSystem,
172 pools: Pools,
173 clock: Clock,
174 rng: context::rng::Rng,
175}
176
177#[derive(Clone)]
186pub struct SharedRuntime(Arc<SharedRuntimeInner>);
187
188impl SharedRuntime {
189 pub fn from_config(config: SharedRuntimeConfig) -> Self {
191 let pools = Pools::new(PoolConfig {
192 system_threads: config.system_threads,
193 query_threads: config.query_threads,
194 async_threads: config.async_threads,
195 });
196 let system = ActorSystem::new(pools.clone(), config.clock.clone());
197
198 Self(Arc::new(SharedRuntimeInner {
199 system,
200 pools,
201 clock: config.clock,
202 rng: config.rng,
203 }))
204 }
205
206 pub fn actor_system(&self) -> ActorSystem {
208 self.0.system.clone()
209 }
210
211 pub fn clock(&self) -> &Clock {
213 &self.0.clock
214 }
215
216 pub fn rng(&self) -> &context::rng::Rng {
218 &self.0.rng
219 }
220
221 pub fn pools(&self) -> Pools {
223 self.0.pools.clone()
224 }
225
226 #[cfg(all(not(target_arch = "wasm32"), not(reifydb_target = "dst")))]
228 pub fn handle(&self) -> tokio_runtime::Handle {
229 self.0.pools.handle()
230 }
231
232 #[cfg(target_arch = "wasm32")]
234 pub fn handle(&self) -> WasmHandle {
235 WasmHandle
236 }
237
238 #[cfg(all(not(target_arch = "wasm32"), not(reifydb_target = "dst")))]
240 pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
241 where
242 F: Future + Send + 'static,
243 F::Output: Send + 'static,
244 {
245 self.0.pools.spawn(future)
246 }
247
248 #[cfg(target_arch = "wasm32")]
250 pub fn spawn<F>(&self, future: F) -> WasmJoinHandle<F::Output>
251 where
252 F: Future + 'static,
253 F::Output: 'static,
254 {
255 WasmJoinHandle {
256 future: Box::pin(future),
257 }
258 }
259
260 #[cfg(all(not(target_arch = "wasm32"), not(reifydb_target = "dst")))]
262 pub fn block_on<F>(&self, future: F) -> F::Output
263 where
264 F: Future,
265 {
266 self.0.pools.block_on(future)
267 }
268
269 #[cfg(target_arch = "wasm32")]
271 pub fn block_on<F>(&self, _future: F) -> F::Output
272 where
273 F: Future,
274 {
275 unimplemented!("block_on not supported in WASM - use async execution instead")
276 }
277}
278
279impl fmt::Debug for SharedRuntime {
280 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
281 f.debug_struct("SharedRuntime").finish_non_exhaustive()
282 }
283}
284
285#[cfg(all(test, not(reifydb_single_threaded)))]
287mod tests {
288 use super::*;
289
290 fn test_config() -> SharedRuntimeConfig {
291 SharedRuntimeConfig::default().async_threads(2).system_threads(2).query_threads(2)
292 }
293
294 #[test]
295 fn test_runtime_creation() {
296 let runtime = SharedRuntime::from_config(test_config());
297 let result = runtime.block_on(async { 42 });
298 assert_eq!(result, 42);
299 }
300
301 #[test]
302 fn test_runtime_clone_shares_same_runtime() {
303 let rt1 = SharedRuntime::from_config(test_config());
304 let rt2 = rt1.clone();
305 assert!(Arc::ptr_eq(&rt1.0, &rt2.0));
306 }
307
308 #[test]
309 fn test_spawn() {
310 let runtime = SharedRuntime::from_config(test_config());
311 let handle = runtime.spawn(async { 123 });
312 let result = runtime.block_on(handle).unwrap();
313 assert_eq!(result, 123);
314 }
315
316 #[test]
317 fn test_actor_system_accessible() {
318 let runtime = SharedRuntime::from_config(test_config());
319 let _system = runtime.actor_system();
320 }
321}