reifydb_sub_server/
runtime.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later
3
4//! Shared tokio runtime for all network subsystems.
5//!
6//! This module provides a centralized tokio runtime that can be shared across
7//! HTTP, WebSocket, and admin subsystems for efficient resource utilization.
8
9use tokio::runtime::{Handle, Runtime};
10
11/// Shared tokio runtime for all network subsystems.
12///
13/// Created once at server startup and passed to all subsystems via Handle clones.
14/// This ensures efficient work-stealing across HTTP, WebSocket, and admin servers.
15///
16/// # Example
17///
18/// ```ignore
19/// let runtime = SharedRuntime::new(num_cpus::get());
20/// let handle = runtime.handle();
21///
22/// // Pass handle to subsystems
23/// let http = HttpSubsystem::new(addr, state, handle.clone());
24/// let ws = WsSubsystem::new(addr, state, handle.clone());
25/// ```
26pub struct SharedRuntime {
27	runtime: Runtime,
28}
29
30impl SharedRuntime {
31	/// Create a new shared runtime with the specified number of worker threads.
32	///
33	/// # Arguments
34	///
35	/// * `worker_threads` - Number of tokio worker threads. Typically `num_cpus::get()`.
36	///
37	/// # Panics
38	///
39	/// Panics if the tokio runtime cannot be created.
40	pub fn new(worker_threads: usize) -> Self {
41		let runtime = tokio::runtime::Builder::new_multi_thread()
42			.worker_threads(worker_threads)
43			.thread_name("server")
44			.enable_all()
45			.build()
46			.expect("Failed to create tokio runtime");
47
48		Self {
49			runtime,
50		}
51	}
52
53	/// Get a handle for spawning tasks on this runtime.
54	///
55	/// The handle can be cloned and passed to multiple subsystems.
56	/// Tasks spawned via the handle run on the shared runtime's thread pool.
57	pub fn handle(&self) -> Handle {
58		self.runtime.handle().clone()
59	}
60
61	/// Block the current thread until the future completes.
62	///
63	/// Used during server startup/shutdown from synchronous context.
64	/// Should not be called from within an async context.
65	pub fn block_on<F: std::future::Future>(&self, future: F) -> F::Output {
66		self.runtime.block_on(future)
67	}
68
69	/// Spawn a future on this runtime.
70	///
71	/// Returns a JoinHandle that can be used to await the result.
72	pub fn spawn<F>(&self, future: F) -> tokio::task::JoinHandle<F::Output>
73	where
74		F: std::future::Future + Send + 'static,
75		F::Output: Send + 'static,
76	{
77		self.runtime.spawn(future)
78	}
79}
80
81impl Default for SharedRuntime {
82	fn default() -> Self {
83		Self::new(num_cpus::get())
84	}
85}
86
87// Re-export num_cpus::get for convenience
88pub use num_cpus::get as get_num_cpus;
89
90#[cfg(test)]
91mod tests {
92	use super::*;
93
94	#[test]
95	fn test_runtime_creation() {
96		let runtime = SharedRuntime::new(2);
97		let handle = runtime.handle();
98
99		// Verify we can spawn and await a task
100		let result = runtime.block_on(async { handle.spawn(async { 42 }).await.unwrap() });
101
102		assert_eq!(result, 42);
103	}
104
105	#[test]
106	fn test_runtime_default() {
107		let runtime = SharedRuntime::default();
108		assert!(runtime.handle().runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread);
109	}
110}