Skip to main content

rustbridge_runtime/
runtime.rs

1//! Tokio runtime management
2
3use crate::shutdown::{ShutdownHandle, ShutdownSignal};
4use parking_lot::Mutex;
5use rustbridge_core::{PluginError, PluginResult};
6use std::sync::Arc;
7use tokio::runtime::{Builder, Runtime};
8
9/// Configuration for the async runtime
10#[derive(Debug, Clone)]
11pub struct RuntimeConfig {
12    /// Number of worker threads (None = number of CPU cores)
13    pub worker_threads: Option<usize>,
14    /// Name prefix for worker threads
15    pub thread_name: String,
16    /// Enable I/O driver
17    pub enable_io: bool,
18    /// Enable time driver
19    pub enable_time: bool,
20    /// Maximum blocking threads
21    pub max_blocking_threads: usize,
22}
23
24impl Default for RuntimeConfig {
25    fn default() -> Self {
26        Self {
27            worker_threads: None,
28            thread_name: "rustbridge-worker".to_string(),
29            enable_io: true,
30            enable_time: true,
31            max_blocking_threads: 512,
32        }
33    }
34}
35
36impl RuntimeConfig {
37    /// Create a new runtime configuration
38    pub fn new() -> Self {
39        Self::default()
40    }
41
42    /// Set the number of worker threads
43    pub fn with_worker_threads(mut self, threads: usize) -> Self {
44        self.worker_threads = Some(threads);
45        self
46    }
47
48    /// Set the thread name prefix
49    pub fn with_thread_name(mut self, name: impl Into<String>) -> Self {
50        self.thread_name = name.into();
51        self
52    }
53}
54
55/// Manages the Tokio async runtime for a plugin
56pub struct AsyncRuntime {
57    runtime: Arc<Runtime>,
58    shutdown_handle: ShutdownHandle,
59    config: RuntimeConfig,
60}
61
62impl AsyncRuntime {
63    /// Create a new async runtime with the given configuration
64    pub fn new(config: RuntimeConfig) -> PluginResult<Self> {
65        let mut builder = Builder::new_multi_thread();
66
67        if let Some(threads) = config.worker_threads {
68            builder.worker_threads(threads);
69        }
70
71        builder
72            .thread_name(&config.thread_name)
73            .max_blocking_threads(config.max_blocking_threads);
74
75        if config.enable_io {
76            builder.enable_io();
77        }
78
79        if config.enable_time {
80            builder.enable_time();
81        }
82
83        let runtime = builder
84            .build()
85            .map_err(|e| PluginError::RuntimeError(format!("Failed to create runtime: {}", e)))?;
86
87        Ok(Self {
88            runtime: Arc::new(runtime),
89            shutdown_handle: ShutdownHandle::new(),
90            config,
91        })
92    }
93
94    /// Create a runtime with default configuration
95    pub fn with_defaults() -> PluginResult<Self> {
96        Self::new(RuntimeConfig::default())
97    }
98
99    /// Get the runtime configuration
100    pub fn config(&self) -> &RuntimeConfig {
101        &self.config
102    }
103
104    /// Get a handle to the underlying Tokio runtime
105    pub fn handle(&self) -> tokio::runtime::Handle {
106        self.runtime.handle().clone()
107    }
108
109    /// Get a shutdown signal that can be used to detect shutdown
110    pub fn shutdown_signal(&self) -> ShutdownSignal {
111        self.shutdown_handle.signal()
112    }
113
114    /// Block on a future from a sync context
115    ///
116    /// This is the primary method for bridging sync FFI calls to async handlers.
117    pub fn block_on<F>(&self, future: F) -> F::Output
118    where
119        F: std::future::Future,
120    {
121        self.runtime.block_on(future)
122    }
123
124    /// Spawn a task on the runtime
125    pub fn spawn<F>(&self, future: F) -> tokio::task::JoinHandle<F::Output>
126    where
127        F: std::future::Future + Send + 'static,
128        F::Output: Send + 'static,
129    {
130        self.runtime.spawn(future)
131    }
132
133    /// Spawn a blocking task
134    pub fn spawn_blocking<F, R>(&self, func: F) -> tokio::task::JoinHandle<R>
135    where
136        F: FnOnce() -> R + Send + 'static,
137        R: Send + 'static,
138    {
139        self.runtime.spawn_blocking(func)
140    }
141
142    /// Initiate graceful shutdown
143    ///
144    /// This signals all tasks to stop. The runtime will complete shutdown
145    /// when dropped. We don't unconditionally sleep for the timeout since
146    /// most plugins have no long-running background tasks.
147    pub fn shutdown(&self, _timeout: std::time::Duration) -> PluginResult<()> {
148        tracing::info!("Initiating runtime shutdown");
149
150        // Signal shutdown to all tasks
151        self.shutdown_handle.trigger();
152
153        // Give tasks a brief moment to notice the shutdown signal
154        // This is enough for cooperative tasks to start cleanup
155        self.runtime.block_on(async {
156            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
157        });
158
159        tracing::info!("Runtime shutdown complete");
160        Ok(())
161    }
162
163    /// Check if shutdown has been triggered
164    pub fn is_shutting_down(&self) -> bool {
165        self.shutdown_handle.is_triggered()
166    }
167}
168
169impl Drop for AsyncRuntime {
170    fn drop(&mut self) {
171        // Ensure shutdown is triggered when runtime is dropped
172        self.shutdown_handle.trigger();
173    }
174}
175
176/// Thread-safe wrapper for optional runtime
177#[allow(dead_code)] // Reserved for future use
178pub struct RuntimeHolder {
179    runtime: Mutex<Option<AsyncRuntime>>,
180}
181
182#[allow(dead_code)] // Reserved for future use
183impl RuntimeHolder {
184    /// Create a new empty runtime holder
185    pub fn new() -> Self {
186        Self {
187            runtime: Mutex::new(None),
188        }
189    }
190
191    /// Initialize the runtime
192    pub fn init(&self, config: RuntimeConfig) -> PluginResult<()> {
193        let mut guard = self.runtime.lock();
194        if guard.is_some() {
195            return Err(PluginError::RuntimeError(
196                "Runtime already initialized".to_string(),
197            ));
198        }
199        *guard = Some(AsyncRuntime::new(config)?);
200        Ok(())
201    }
202
203    /// Execute a closure with the runtime
204    pub fn with<F, R>(&self, f: F) -> PluginResult<R>
205    where
206        F: FnOnce(&AsyncRuntime) -> R,
207    {
208        let guard = self.runtime.lock();
209        match guard.as_ref() {
210            Some(rt) => Ok(f(rt)),
211            None => Err(PluginError::RuntimeError(
212                "Runtime not initialized".to_string(),
213            )),
214        }
215    }
216
217    /// Shutdown and remove the runtime
218    pub fn shutdown(&self, timeout: std::time::Duration) -> PluginResult<()> {
219        let mut guard = self.runtime.lock();
220        if let Some(rt) = guard.take() {
221            rt.shutdown(timeout)?;
222        }
223        Ok(())
224    }
225}
226
227impl Default for RuntimeHolder {
228    fn default() -> Self {
229        Self::new()
230    }
231}
232
233#[cfg(test)]
234#[path = "runtime/runtime_tests.rs"]
235mod runtime_tests;