reifydb_sub_server/runtime.rs
1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later
3
4//! Shared tokio runtime for all network subsystems.
5//!
6//! This module provides a centralized tokio runtime that can be shared across
7//! HTTP, WebSocket, and admin subsystems for efficient resource utilization.
8
9use tokio::runtime::{Handle, Runtime};
10
11/// Shared tokio runtime for all network subsystems.
12///
13/// Created once at server startup and passed to all subsystems via Handle clones.
14/// This ensures efficient work-stealing across HTTP, WebSocket, and admin servers.
15///
16/// # Example
17///
18/// ```ignore
19/// let runtime = SharedRuntime::new(num_cpus::get());
20/// let handle = runtime.handle();
21///
22/// // Pass handle to subsystems
23/// let http = HttpSubsystem::new(addr, state, handle.clone());
24/// let ws = WsSubsystem::new(addr, state, handle.clone());
25/// ```
26pub struct SharedRuntime {
27 runtime: Runtime,
28}
29
30impl SharedRuntime {
31 /// Create a new shared runtime with the specified number of worker threads.
32 ///
33 /// # Arguments
34 ///
35 /// * `worker_threads` - Number of tokio worker threads. Typically `num_cpus::get()`.
36 ///
37 /// # Panics
38 ///
39 /// Panics if the tokio runtime cannot be created.
40 pub fn new(worker_threads: usize) -> Self {
41 let runtime = tokio::runtime::Builder::new_multi_thread()
42 .worker_threads(worker_threads)
43 .thread_name("server")
44 .enable_all()
45 .build()
46 .expect("Failed to create tokio runtime");
47
48 Self {
49 runtime,
50 }
51 }
52
53 /// Get a handle for spawning tasks on this runtime.
54 ///
55 /// The handle can be cloned and passed to multiple subsystems.
56 /// Tasks spawned via the handle run on the shared runtime's thread pool.
57 pub fn handle(&self) -> Handle {
58 self.runtime.handle().clone()
59 }
60
61 /// Block the current thread until the future completes.
62 ///
63 /// Used during server startup/shutdown from synchronous context.
64 /// Should not be called from within an async context.
65 pub fn block_on<F: std::future::Future>(&self, future: F) -> F::Output {
66 self.runtime.block_on(future)
67 }
68
69 /// Spawn a future on this runtime.
70 ///
71 /// Returns a JoinHandle that can be used to await the result.
72 pub fn spawn<F>(&self, future: F) -> tokio::task::JoinHandle<F::Output>
73 where
74 F: std::future::Future + Send + 'static,
75 F::Output: Send + 'static,
76 {
77 self.runtime.spawn(future)
78 }
79}
80
81impl Default for SharedRuntime {
82 fn default() -> Self {
83 Self::new(num_cpus::get())
84 }
85}
86
87// Re-export num_cpus::get for convenience
88pub use num_cpus::get as get_num_cpus;
89
90#[cfg(test)]
91mod tests {
92 use super::*;
93
94 #[test]
95 fn test_runtime_creation() {
96 let runtime = SharedRuntime::new(2);
97 let handle = runtime.handle();
98
99 // Verify we can spawn and await a task
100 let result = runtime.block_on(async { handle.spawn(async { 42 }).await.unwrap() });
101
102 assert_eq!(result, 42);
103 }
104
105 #[test]
106 fn test_runtime_default() {
107 let runtime = SharedRuntime::default();
108 assert!(runtime.handle().runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread);
109 }
110}