Skip to main content

rustbridge_ffi/
handle.rs

1//! Plugin handle management
2
3use dashmap::DashMap;
4use once_cell::sync::OnceCell;
5use parking_lot::RwLock;
6use rustbridge_core::{
7    LifecycleState, Plugin, PluginConfig, PluginContext, PluginError, PluginResult,
8};
9use rustbridge_logging::LogCallbackManager;
10use rustbridge_runtime::{AsyncBridge, AsyncRuntime, RuntimeConfig};
11use std::sync::Arc;
12use std::sync::atomic::{AtomicU64, Ordering};
13
14/// Global handle manager
15static HANDLE_MANAGER: OnceCell<PluginHandleManager> = OnceCell::new();
16
17/// Manages plugin handles
18pub struct PluginHandleManager {
19    handles: DashMap<u64, Arc<PluginHandle>>,
20    next_id: AtomicU64,
21}
22
23impl PluginHandleManager {
24    /// Create a new handle manager
25    pub fn new() -> Self {
26        Self {
27            handles: DashMap::new(),
28            next_id: AtomicU64::new(1),
29        }
30    }
31
32    /// Get the global handle manager
33    pub fn global() -> &'static PluginHandleManager {
34        HANDLE_MANAGER.get_or_init(PluginHandleManager::new)
35    }
36
37    /// Register a new handle
38    pub fn register(&self, handle: PluginHandle) -> u64 {
39        let id = self.next_id.fetch_add(1, Ordering::SeqCst);
40        self.handles.insert(id, Arc::new(handle));
41        id
42    }
43
44    /// Get a handle by ID
45    pub fn get(&self, id: u64) -> Option<Arc<PluginHandle>> {
46        self.handles.get(&id).map(|r| r.clone())
47    }
48
49    /// Remove a handle
50    pub fn remove(&self, id: u64) -> Option<Arc<PluginHandle>> {
51        self.handles.remove(&id).map(|(_, v)| v)
52    }
53}
54
55impl Default for PluginHandleManager {
56    fn default() -> Self {
57        Self::new()
58    }
59}
60
61/// Handle to an initialized plugin instance
62pub struct PluginHandle {
63    /// The plugin implementation
64    plugin: Box<dyn Plugin>,
65    /// Plugin context (state, config)
66    context: PluginContext,
67    /// Async runtime
68    runtime: Arc<AsyncRuntime>,
69    /// Async bridge for FFI calls
70    bridge: AsyncBridge,
71    /// Handle ID (set after registration)
72    id: RwLock<Option<u64>>,
73    /// Optional request limiter (None = unlimited)
74    request_limiter: Option<Arc<tokio::sync::Semaphore>>,
75    /// Counter for requests rejected due to concurrency limit
76    rejected_requests: AtomicU64,
77}
78
79impl PluginHandle {
80    /// Create a new plugin handle
81    pub fn new(plugin: Box<dyn Plugin>, config: PluginConfig) -> PluginResult<Self> {
82        // Create runtime configuration from plugin config
83        let runtime_config = RuntimeConfig {
84            worker_threads: config.worker_threads,
85            ..Default::default()
86        };
87
88        // Create the async runtime
89        let runtime = Arc::new(AsyncRuntime::new(runtime_config)?);
90
91        // Create the async bridge
92        let bridge = AsyncBridge::new(runtime.clone());
93
94        // Create request limiter based on max_concurrent_ops
95        let request_limiter = if config.max_concurrent_ops > 0 {
96            Some(Arc::new(tokio::sync::Semaphore::new(
97                config.max_concurrent_ops,
98            )))
99        } else {
100            None // 0 means unlimited
101        };
102
103        // Create plugin context
104        let context = PluginContext::new(config);
105
106        Ok(Self {
107            plugin,
108            context,
109            runtime,
110            bridge,
111            id: RwLock::new(None),
112            request_limiter,
113            rejected_requests: AtomicU64::new(0),
114        })
115    }
116
117    /// Get the handle ID
118    pub fn id(&self) -> Option<u64> {
119        *self.id.read()
120    }
121
122    /// Set the handle ID (called after registration)
123    pub(crate) fn set_id(&self, id: u64) {
124        *self.id.write() = Some(id);
125    }
126
127    /// Get the current lifecycle state
128    pub fn state(&self) -> LifecycleState {
129        self.context.state()
130    }
131
132    /// Start the plugin
133    pub fn start(&self) -> PluginResult<()> {
134        // Transition to Starting state
135        self.context.transition_to(LifecycleState::Starting)?;
136
137        // Call plugin's on_start
138        let result = self.bridge.call_sync(self.plugin.on_start(&self.context));
139
140        match result {
141            Ok(()) => {
142                // Transition to Active
143                self.context.transition_to(LifecycleState::Active)?;
144                tracing::info!("Plugin started successfully");
145                Ok(())
146            }
147            Err(e) => {
148                // Transition to Failed
149                self.context.set_state(LifecycleState::Failed);
150                tracing::error!("Plugin failed to start: {}", e);
151                Err(e)
152            }
153        }
154    }
155
156    /// Handle a request
157    pub fn call(&self, type_tag: &str, request: &[u8]) -> PluginResult<Vec<u8>> {
158        // Check state
159        if !self.context.state().can_handle_requests() {
160            return Err(PluginError::InvalidState {
161                expected: "Active".to_string(),
162                actual: self.context.state().to_string(),
163            });
164        }
165
166        // Try to acquire permit (immediate, non-blocking)
167        let _permit = if let Some(sem) = &self.request_limiter {
168            match sem.try_acquire() {
169                Ok(permit) => Some(permit),
170                Err(_) => {
171                    self.rejected_requests.fetch_add(1, Ordering::Relaxed);
172                    return Err(PluginError::TooManyRequests);
173                }
174            }
175        } else {
176            None
177        };
178
179        // Call the plugin handler
180        // Permit is automatically released when dropped
181        self.bridge
182            .call_sync(self.plugin.handle_request(&self.context, type_tag, request))
183    }
184
185    /// Shutdown the plugin
186    pub fn shutdown(&self, timeout_ms: u64) -> PluginResult<()> {
187        let current_state = self.context.state();
188
189        // Can only shutdown from Active state
190        if current_state != LifecycleState::Active {
191            if current_state.is_terminal() {
192                return Ok(()); // Already stopped/failed
193            }
194            return Err(PluginError::InvalidState {
195                expected: "Active".to_string(),
196                actual: current_state.to_string(),
197            });
198        }
199
200        // Transition to Stopping
201        self.context.transition_to(LifecycleState::Stopping)?;
202
203        // Call plugin's on_stop with timeout
204        let timeout = std::time::Duration::from_millis(timeout_ms);
205        let result = self
206            .bridge
207            .call_sync_timeout(self.plugin.on_stop(&self.context), timeout);
208
209        // Shutdown runtime
210        let runtime_timeout = std::time::Duration::from_millis(timeout_ms / 2);
211        let _ = self.runtime.shutdown(runtime_timeout);
212
213        match result {
214            Ok(()) => {
215                self.context.transition_to(LifecycleState::Stopped)?;
216                tracing::info!("Plugin shutdown complete");
217                Ok(())
218            }
219            Err(PluginError::Timeout) => {
220                self.context.set_state(LifecycleState::Stopped);
221                tracing::warn!("Plugin shutdown timed out");
222                Ok(()) // Consider timeout as successful shutdown
223            }
224            Err(e) => {
225                self.context.set_state(LifecycleState::Failed);
226                tracing::error!("Plugin shutdown failed: {}", e);
227                Err(e)
228            }
229        }
230    }
231
232    /// Set the log level
233    pub fn set_log_level(&self, level: rustbridge_core::LogLevel) {
234        // Update the log callback manager's level
235        LogCallbackManager::global().set_level(level);
236
237        // Reload the tracing subscriber's filter
238        if let Err(e) = rustbridge_logging::ReloadHandle::global().reload_level(level) {
239            tracing::warn!("Failed to reload tracing filter: {}", e);
240        }
241    }
242
243    /// Mark the plugin as failed
244    ///
245    /// This is called when a panic is caught at the FFI boundary or when
246    /// an unrecoverable error occurs. It immediately transitions the plugin
247    /// to the Failed state.
248    ///
249    /// After calling this method, the plugin will reject all further requests.
250    /// The host should call plugin_shutdown to clean up resources.
251    pub fn mark_failed(&self) {
252        tracing::error!("Marking plugin as failed due to panic or unrecoverable error");
253        self.context.set_state(LifecycleState::Failed);
254    }
255
256    /// Get the count of requests rejected due to concurrency limits
257    pub fn rejected_request_count(&self) -> u64 {
258        self.rejected_requests.load(Ordering::Relaxed)
259    }
260}
261
262// Plugin handles are thread-safe
263unsafe impl Send for PluginHandle {}
264unsafe impl Sync for PluginHandle {}
265
266#[cfg(test)]
267#[path = "handle/handle_tests.rs"]
268mod handle_tests;