reifydb_sub_server/
runtime.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later
3
4//! Shared tokio runtime for all network subsystems.
5//!
6//! This module provides a centralized tokio runtime that can be shared across
7//! HTTP, WebSocket, and admin subsystems for efficient resource utilization.
8
9use std::sync::Arc;
10
11use tokio::runtime::{Handle, Runtime};
12
13struct SharedRuntimeInner {
14	runtime: Runtime,
15}
16
17/// Shared tokio runtime for all network subsystems.
18///
19/// Created once at server startup and passed to all subsystems via cloning.
20/// This ensures efficient work-stealing across HTTP, WebSocket, and admin servers.
21///
22/// # Example
23///
24/// ```ignore
25/// let runtime = SharedRuntime::new(num_cpus::get());
26///
27/// // Pass cloned runtime to subsystems
28/// let http = HttpSubsystem::new(addr, state, runtime.clone());
29/// let ws = WsSubsystem::new(addr, state, runtime.clone());
30/// ```
31#[derive(Clone)]
32pub struct SharedRuntime(Arc<SharedRuntimeInner>);
33
34impl SharedRuntime {
35	/// Create a new shared runtime with the specified number of worker threads.
36	///
37	/// # Arguments
38	///
39	/// * `worker_threads` - Number of tokio worker threads. Typically `num_cpus::get()`.
40	///
41	/// # Panics
42	///
43	/// Panics if the tokio runtime cannot be created.
44	pub fn new(worker_threads: usize) -> Self {
45		let runtime = tokio::runtime::Builder::new_multi_thread()
46			.worker_threads(worker_threads)
47			.thread_name("server")
48			.enable_all()
49			.build()
50			.expect("Failed to create tokio runtime");
51
52		Self(Arc::new(SharedRuntimeInner {
53			runtime,
54		}))
55	}
56
57	/// Get a handle for spawning tasks on this runtime.
58	///
59	/// The handle can be cloned and passed to multiple subsystems.
60	/// Tasks spawned via the handle run on the shared runtime's thread pool.
61	pub fn handle(&self) -> Handle {
62		self.0.runtime.handle().clone()
63	}
64
65	/// Block the current thread until the future completes.
66	///
67	/// Used during server startup/shutdown from synchronous context.
68	/// Should not be called from within an async context.
69	pub fn block_on<F: std::future::Future>(&self, future: F) -> F::Output {
70		self.0.runtime.block_on(future)
71	}
72
73	/// Spawn a future on this runtime.
74	///
75	/// Returns a JoinHandle that can be used to await the result.
76	pub fn spawn<F>(&self, future: F) -> tokio::task::JoinHandle<F::Output>
77	where
78		F: std::future::Future + Send + 'static,
79		F::Output: Send + 'static,
80	{
81		self.0.runtime.spawn(future)
82	}
83}
84
85impl Default for SharedRuntime {
86	fn default() -> Self {
87		Self::new(num_cpus::get())
88	}
89}
90
91// Re-export num_cpus::get for convenience
92pub use num_cpus::get as get_num_cpus;
93
94#[cfg(test)]
95mod tests {
96	use super::*;
97
98	#[test]
99	fn test_runtime_creation() {
100		let runtime = SharedRuntime::new(2);
101		let handle = runtime.handle();
102
103		// Verify we can spawn and await a task
104		let result = runtime.block_on(async { handle.spawn(async { 42 }).await.unwrap() });
105
106		assert_eq!(result, 42);
107	}
108
109	#[test]
110	fn test_runtime_default() {
111		let runtime = SharedRuntime::default();
112		assert!(runtime.handle().runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread);
113	}
114}