rustbridge_runtime/
runtime.rs1use crate::shutdown::{ShutdownHandle, ShutdownSignal};
4use parking_lot::Mutex;
5use rustbridge_core::{PluginError, PluginResult};
6use std::sync::Arc;
7use tokio::runtime::{Builder, Runtime};
8
9#[derive(Debug, Clone)]
11pub struct RuntimeConfig {
12 pub worker_threads: Option<usize>,
14 pub thread_name: String,
16 pub enable_io: bool,
18 pub enable_time: bool,
20 pub max_blocking_threads: usize,
22}
23
24impl Default for RuntimeConfig {
25 fn default() -> Self {
26 Self {
27 worker_threads: None,
28 thread_name: "rustbridge-worker".to_string(),
29 enable_io: true,
30 enable_time: true,
31 max_blocking_threads: 512,
32 }
33 }
34}
35
36impl RuntimeConfig {
37 pub fn new() -> Self {
39 Self::default()
40 }
41
42 pub fn with_worker_threads(mut self, threads: usize) -> Self {
44 self.worker_threads = Some(threads);
45 self
46 }
47
48 pub fn with_thread_name(mut self, name: impl Into<String>) -> Self {
50 self.thread_name = name.into();
51 self
52 }
53}
54
55pub struct AsyncRuntime {
57 runtime: Arc<Runtime>,
58 shutdown_handle: ShutdownHandle,
59 config: RuntimeConfig,
60}
61
62impl AsyncRuntime {
63 pub fn new(config: RuntimeConfig) -> PluginResult<Self> {
65 let mut builder = Builder::new_multi_thread();
66
67 if let Some(threads) = config.worker_threads {
68 builder.worker_threads(threads);
69 }
70
71 builder
72 .thread_name(&config.thread_name)
73 .max_blocking_threads(config.max_blocking_threads);
74
75 if config.enable_io {
76 builder.enable_io();
77 }
78
79 if config.enable_time {
80 builder.enable_time();
81 }
82
83 let runtime = builder
84 .build()
85 .map_err(|e| PluginError::RuntimeError(format!("Failed to create runtime: {}", e)))?;
86
87 Ok(Self {
88 runtime: Arc::new(runtime),
89 shutdown_handle: ShutdownHandle::new(),
90 config,
91 })
92 }
93
94 pub fn with_defaults() -> PluginResult<Self> {
96 Self::new(RuntimeConfig::default())
97 }
98
99 pub fn config(&self) -> &RuntimeConfig {
101 &self.config
102 }
103
104 pub fn handle(&self) -> tokio::runtime::Handle {
106 self.runtime.handle().clone()
107 }
108
109 pub fn shutdown_signal(&self) -> ShutdownSignal {
111 self.shutdown_handle.signal()
112 }
113
114 pub fn block_on<F>(&self, future: F) -> F::Output
118 where
119 F: std::future::Future,
120 {
121 self.runtime.block_on(future)
122 }
123
124 pub fn spawn<F>(&self, future: F) -> tokio::task::JoinHandle<F::Output>
126 where
127 F: std::future::Future + Send + 'static,
128 F::Output: Send + 'static,
129 {
130 self.runtime.spawn(future)
131 }
132
133 pub fn spawn_blocking<F, R>(&self, func: F) -> tokio::task::JoinHandle<R>
135 where
136 F: FnOnce() -> R + Send + 'static,
137 R: Send + 'static,
138 {
139 self.runtime.spawn_blocking(func)
140 }
141
142 pub fn shutdown(&self, _timeout: std::time::Duration) -> PluginResult<()> {
148 tracing::info!("Initiating runtime shutdown");
149
150 self.shutdown_handle.trigger();
152
153 self.runtime.block_on(async {
156 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
157 });
158
159 tracing::info!("Runtime shutdown complete");
160 Ok(())
161 }
162
163 pub fn is_shutting_down(&self) -> bool {
165 self.shutdown_handle.is_triggered()
166 }
167}
168
169impl Drop for AsyncRuntime {
170 fn drop(&mut self) {
171 self.shutdown_handle.trigger();
173 }
174}
175
176#[allow(dead_code)] pub struct RuntimeHolder {
179 runtime: Mutex<Option<AsyncRuntime>>,
180}
181
182#[allow(dead_code)] impl RuntimeHolder {
184 pub fn new() -> Self {
186 Self {
187 runtime: Mutex::new(None),
188 }
189 }
190
191 pub fn init(&self, config: RuntimeConfig) -> PluginResult<()> {
193 let mut guard = self.runtime.lock();
194 if guard.is_some() {
195 return Err(PluginError::RuntimeError(
196 "Runtime already initialized".to_string(),
197 ));
198 }
199 *guard = Some(AsyncRuntime::new(config)?);
200 Ok(())
201 }
202
203 pub fn with<F, R>(&self, f: F) -> PluginResult<R>
205 where
206 F: FnOnce(&AsyncRuntime) -> R,
207 {
208 let guard = self.runtime.lock();
209 match guard.as_ref() {
210 Some(rt) => Ok(f(rt)),
211 None => Err(PluginError::RuntimeError(
212 "Runtime not initialized".to_string(),
213 )),
214 }
215 }
216
217 pub fn shutdown(&self, timeout: std::time::Duration) -> PluginResult<()> {
219 let mut guard = self.runtime.lock();
220 if let Some(rt) = guard.take() {
221 rt.shutdown(timeout)?;
222 }
223 Ok(())
224 }
225}
226
227impl Default for RuntimeHolder {
228 fn default() -> Self {
229 Self::new()
230 }
231}
232
233#[cfg(test)]
234#[path = "runtime/runtime_tests.rs"]
235mod runtime_tests;