reifydb_sub_server/runtime.rs
1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later
3
4//! Shared tokio runtime for all network subsystems.
5//!
6//! This module provides a centralized tokio runtime that can be shared across
7//! HTTP, WebSocket, and admin subsystems for efficient resource utilization.
8
9use std::sync::Arc;
10
11use tokio::runtime::{Handle, Runtime};
12
13struct SharedRuntimeInner {
14 runtime: Runtime,
15}
16
17/// Shared tokio runtime for all network subsystems.
18///
19/// Created once at server startup and passed to all subsystems via cloning.
20/// This ensures efficient work-stealing across HTTP, WebSocket, and admin servers.
21///
22/// # Example
23///
24/// ```ignore
25/// let runtime = SharedRuntime::new(num_cpus::get());
26///
27/// // Pass cloned runtime to subsystems
28/// let http = HttpSubsystem::new(addr, state, runtime.clone());
29/// let ws = WsSubsystem::new(addr, state, runtime.clone());
30/// ```
31#[derive(Clone)]
32pub struct SharedRuntime(Arc<SharedRuntimeInner>);
33
34impl SharedRuntime {
35 /// Create a new shared runtime with the specified number of worker threads.
36 ///
37 /// # Arguments
38 ///
39 /// * `worker_threads` - Number of tokio worker threads. Typically `num_cpus::get()`.
40 ///
41 /// # Panics
42 ///
43 /// Panics if the tokio runtime cannot be created.
44 pub fn new(worker_threads: usize) -> Self {
45 let runtime = tokio::runtime::Builder::new_multi_thread()
46 .worker_threads(worker_threads)
47 .thread_name("server")
48 .enable_all()
49 .build()
50 .expect("Failed to create tokio runtime");
51
52 Self(Arc::new(SharedRuntimeInner {
53 runtime,
54 }))
55 }
56
57 /// Get a handle for spawning tasks on this runtime.
58 ///
59 /// The handle can be cloned and passed to multiple subsystems.
60 /// Tasks spawned via the handle run on the shared runtime's thread pool.
61 pub fn handle(&self) -> Handle {
62 self.0.runtime.handle().clone()
63 }
64
65 /// Block the current thread until the future completes.
66 ///
67 /// Used during server startup/shutdown from synchronous context.
68 /// Should not be called from within an async context.
69 pub fn block_on<F: std::future::Future>(&self, future: F) -> F::Output {
70 self.0.runtime.block_on(future)
71 }
72
73 /// Spawn a future on this runtime.
74 ///
75 /// Returns a JoinHandle that can be used to await the result.
76 pub fn spawn<F>(&self, future: F) -> tokio::task::JoinHandle<F::Output>
77 where
78 F: std::future::Future + Send + 'static,
79 F::Output: Send + 'static,
80 {
81 self.0.runtime.spawn(future)
82 }
83}
84
85impl Default for SharedRuntime {
86 fn default() -> Self {
87 Self::new(num_cpus::get())
88 }
89}
90
91// Re-export num_cpus::get for convenience
92pub use num_cpus::get as get_num_cpus;
93
94#[cfg(test)]
95mod tests {
96 use super::*;
97
98 #[test]
99 fn test_runtime_creation() {
100 let runtime = SharedRuntime::new(2);
101 let handle = runtime.handle();
102
103 // Verify we can spawn and await a task
104 let result = runtime.block_on(async { handle.spawn(async { 42 }).await.unwrap() });
105
106 assert_eq!(result, 42);
107 }
108
109 #[test]
110 fn test_runtime_default() {
111 let runtime = SharedRuntime::default();
112 assert!(runtime.handle().runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread);
113 }
114}