#![allow(dead_code)]
pub mod context;
pub mod hash;
pub mod sync;
pub mod actor;
use std::{future::Future, sync::Arc};
#[cfg(not(target_arch = "wasm32"))]
use std::{mem::ManuallyDrop, time::Duration};
use crate::{
actor::system::{ActorSystem, ActorSystemConfig},
context::clock::{Clock, MockClock},
};
#[derive(Clone)]
pub struct SharedRuntimeConfig {
pub async_threads: usize,
pub compute_threads: usize,
pub compute_max_in_flight: usize,
pub clock: Clock,
pub rng: context::rng::Rng,
}
impl Default for SharedRuntimeConfig {
fn default() -> Self {
Self {
async_threads: 1,
compute_threads: 1,
compute_max_in_flight: 32,
clock: Clock::Real,
rng: context::rng::Rng::default(),
}
}
}
impl SharedRuntimeConfig {
pub fn async_threads(mut self, threads: usize) -> Self {
self.async_threads = threads;
self
}
pub fn compute_threads(mut self, threads: usize) -> Self {
self.compute_threads = threads;
self
}
pub fn compute_max_in_flight(mut self, max: usize) -> Self {
self.compute_max_in_flight = max;
self
}
pub fn deterministic_testing(mut self, seed: u64) -> Self {
self.clock = Clock::Mock(MockClock::from_millis(seed));
self.rng = context::rng::Rng::seeded(seed);
self
}
pub fn actor_system_config(&self) -> ActorSystemConfig {
ActorSystemConfig {
pool_threads: self.compute_threads,
max_in_flight: self.compute_max_in_flight,
}
}
}
use std::fmt;
#[cfg(target_arch = "wasm32")]
use std::{
pin::Pin,
task::{Context, Poll},
};
#[cfg(target_arch = "wasm32")]
use futures_util::future::LocalBoxFuture;
#[cfg(not(target_arch = "wasm32"))]
use tokio::runtime::{self as tokio_runtime, Runtime};
#[cfg(not(target_arch = "wasm32"))]
use tokio::task::JoinHandle;
#[cfg(target_arch = "wasm32")]
#[derive(Clone, Copy, Debug)]
pub struct WasmHandle;
#[cfg(target_arch = "wasm32")]
pub struct WasmJoinHandle<T> {
future: LocalBoxFuture<'static, T>,
}
#[cfg(target_arch = "wasm32")]
impl<T> Future for WasmJoinHandle<T> {
type Output = Result<T, WasmJoinError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.future.as_mut().poll(cx) {
Poll::Ready(v) => Poll::Ready(Ok(v)),
Poll::Pending => Poll::Pending,
}
}
}
#[cfg(target_arch = "wasm32")]
#[derive(Debug)]
pub struct WasmJoinError;
#[cfg(target_arch = "wasm32")]
impl fmt::Display for WasmJoinError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "WASM task failed")
}
}
#[cfg(target_arch = "wasm32")]
use std::error::Error;
#[cfg(target_arch = "wasm32")]
impl Error for WasmJoinError {}
#[cfg(not(target_arch = "wasm32"))]
struct SharedRuntimeInner {
tokio: ManuallyDrop<Runtime>,
system: ActorSystem,
clock: Clock,
rng: context::rng::Rng,
}
#[cfg(not(target_arch = "wasm32"))]
impl Drop for SharedRuntimeInner {
fn drop(&mut self) {
let rt = unsafe { ManuallyDrop::take(&mut self.tokio) };
rt.shutdown_timeout(Duration::from_secs(5));
}
}
#[cfg(target_arch = "wasm32")]
struct SharedRuntimeInner {
system: ActorSystem,
clock: Clock,
rng: context::rng::Rng,
}
#[derive(Clone)]
pub struct SharedRuntime(Arc<SharedRuntimeInner>);
impl SharedRuntime {
#[cfg(not(target_arch = "wasm32"))]
pub fn from_config(config: SharedRuntimeConfig) -> Self {
let tokio = tokio_runtime::Builder::new_multi_thread()
.worker_threads(config.async_threads)
.thread_name("async")
.enable_all()
.build()
.expect("Failed to create tokio runtime");
let system = ActorSystem::new(config.actor_system_config());
Self(Arc::new(SharedRuntimeInner {
tokio: ManuallyDrop::new(tokio),
system,
clock: config.clock,
rng: config.rng,
}))
}
#[cfg(target_arch = "wasm32")]
pub fn from_config(config: SharedRuntimeConfig) -> Self {
let system = ActorSystem::new(config.actor_system_config());
Self(Arc::new(SharedRuntimeInner {
system,
clock: config.clock,
rng: config.rng,
}))
}
pub fn actor_system(&self) -> ActorSystem {
self.0.system.clone()
}
pub fn clock(&self) -> &Clock {
&self.0.clock
}
pub fn rng(&self) -> &context::rng::Rng {
&self.0.rng
}
#[cfg(not(target_arch = "wasm32"))]
pub fn handle(&self) -> tokio_runtime::Handle {
self.0.tokio.handle().clone()
}
#[cfg(target_arch = "wasm32")]
pub fn handle(&self) -> WasmHandle {
WasmHandle
}
#[cfg(not(target_arch = "wasm32"))]
pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
self.0.tokio.spawn(future)
}
#[cfg(target_arch = "wasm32")]
pub fn spawn<F>(&self, future: F) -> WasmJoinHandle<F::Output>
where
F: Future + 'static,
F::Output: 'static,
{
WasmJoinHandle {
future: Box::pin(future),
}
}
#[cfg(not(target_arch = "wasm32"))]
pub fn block_on<F>(&self, future: F) -> F::Output
where
F: Future,
{
self.0.tokio.block_on(future)
}
#[cfg(target_arch = "wasm32")]
pub fn block_on<F>(&self, _future: F) -> F::Output
where
F: Future,
{
unimplemented!("block_on not supported in WASM - use async execution instead")
}
pub fn install<R, F>(&self, f: F) -> R
where
R: Send,
F: FnOnce() -> R + Send,
{
self.0.system.install(f)
}
}
impl fmt::Debug for SharedRuntime {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SharedRuntime").finish_non_exhaustive()
}
}
#[cfg(all(test, not(reifydb_single_threaded)))]
mod tests {
use super::*;
fn test_config() -> SharedRuntimeConfig {
SharedRuntimeConfig::default().async_threads(2).compute_threads(2).compute_max_in_flight(4)
}
#[test]
fn test_runtime_creation() {
let runtime = SharedRuntime::from_config(test_config());
let result = runtime.block_on(async { 42 });
assert_eq!(result, 42);
}
#[test]
fn test_runtime_clone_shares_same_runtime() {
let rt1 = SharedRuntime::from_config(test_config());
let rt2 = rt1.clone();
assert!(Arc::ptr_eq(&rt1.0, &rt2.0));
}
#[test]
fn test_spawn() {
let runtime = SharedRuntime::from_config(test_config());
let handle = runtime.spawn(async { 123 });
let result = runtime.block_on(handle).unwrap();
assert_eq!(result, 123);
}
#[test]
fn test_actor_system_accessible() {
let runtime = SharedRuntime::from_config(test_config());
let system = runtime.actor_system();
let result = system.install(|| 42);
assert_eq!(result, 42);
}
#[test]
fn test_install() {
let runtime = SharedRuntime::from_config(test_config());
let result = runtime.install(|| 42);
assert_eq!(result, 42);
}
}
#[cfg(all(test, reifydb_single_threaded))]
mod wasm_tests {
use super::*;
#[test]
fn test_wasm_runtime_creation() {
let runtime = SharedRuntime::from_config(SharedRuntimeConfig::default());
let system = runtime.actor_system();
let result = system.install(|| 42);
assert_eq!(result, 42);
}
}