1use dashmap::DashMap;
4use once_cell::sync::OnceCell;
5use parking_lot::RwLock;
6use rustbridge_core::{
7 LifecycleState, Plugin, PluginConfig, PluginContext, PluginError, PluginResult,
8};
9use rustbridge_logging::LogCallbackManager;
10use rustbridge_runtime::{AsyncBridge, AsyncRuntime, RuntimeConfig};
11use std::sync::Arc;
12use std::sync::atomic::{AtomicU64, Ordering};
13
14static HANDLE_MANAGER: OnceCell<PluginHandleManager> = OnceCell::new();
16
17pub struct PluginHandleManager {
19 handles: DashMap<u64, Arc<PluginHandle>>,
20 next_id: AtomicU64,
21}
22
23impl PluginHandleManager {
24 pub fn new() -> Self {
26 Self {
27 handles: DashMap::new(),
28 next_id: AtomicU64::new(1),
29 }
30 }
31
32 pub fn global() -> &'static PluginHandleManager {
34 HANDLE_MANAGER.get_or_init(PluginHandleManager::new)
35 }
36
37 pub fn register(&self, handle: PluginHandle) -> u64 {
39 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
40 self.handles.insert(id, Arc::new(handle));
41 id
42 }
43
44 pub fn get(&self, id: u64) -> Option<Arc<PluginHandle>> {
46 self.handles.get(&id).map(|r| r.clone())
47 }
48
49 pub fn remove(&self, id: u64) -> Option<Arc<PluginHandle>> {
51 self.handles.remove(&id).map(|(_, v)| v)
52 }
53}
54
55impl Default for PluginHandleManager {
56 fn default() -> Self {
57 Self::new()
58 }
59}
60
61pub struct PluginHandle {
63 plugin: Box<dyn Plugin>,
65 context: PluginContext,
67 runtime: Arc<AsyncRuntime>,
69 bridge: AsyncBridge,
71 id: RwLock<Option<u64>>,
73 request_limiter: Option<Arc<tokio::sync::Semaphore>>,
75 rejected_requests: AtomicU64,
77}
78
79impl PluginHandle {
80 pub fn new(plugin: Box<dyn Plugin>, config: PluginConfig) -> PluginResult<Self> {
82 let runtime_config = RuntimeConfig {
84 worker_threads: config.worker_threads,
85 ..Default::default()
86 };
87
88 let runtime = Arc::new(AsyncRuntime::new(runtime_config)?);
90
91 let bridge = AsyncBridge::new(runtime.clone());
93
94 let request_limiter = if config.max_concurrent_ops > 0 {
96 Some(Arc::new(tokio::sync::Semaphore::new(
97 config.max_concurrent_ops,
98 )))
99 } else {
100 None };
102
103 let context = PluginContext::new(config);
105
106 Ok(Self {
107 plugin,
108 context,
109 runtime,
110 bridge,
111 id: RwLock::new(None),
112 request_limiter,
113 rejected_requests: AtomicU64::new(0),
114 })
115 }
116
117 pub fn id(&self) -> Option<u64> {
119 *self.id.read()
120 }
121
122 pub(crate) fn set_id(&self, id: u64) {
124 *self.id.write() = Some(id);
125 }
126
127 pub fn state(&self) -> LifecycleState {
129 self.context.state()
130 }
131
132 pub fn start(&self) -> PluginResult<()> {
134 self.context.transition_to(LifecycleState::Starting)?;
136
137 let result = self.bridge.call_sync(self.plugin.on_start(&self.context));
139
140 match result {
141 Ok(()) => {
142 self.context.transition_to(LifecycleState::Active)?;
144 tracing::info!("Plugin started successfully");
145 Ok(())
146 }
147 Err(e) => {
148 self.context.set_state(LifecycleState::Failed);
150 tracing::error!("Plugin failed to start: {}", e);
151 Err(e)
152 }
153 }
154 }
155
156 pub fn call(&self, type_tag: &str, request: &[u8]) -> PluginResult<Vec<u8>> {
158 if !self.context.state().can_handle_requests() {
160 return Err(PluginError::InvalidState {
161 expected: "Active".to_string(),
162 actual: self.context.state().to_string(),
163 });
164 }
165
166 let _permit = if let Some(sem) = &self.request_limiter {
168 match sem.try_acquire() {
169 Ok(permit) => Some(permit),
170 Err(_) => {
171 self.rejected_requests.fetch_add(1, Ordering::Relaxed);
172 return Err(PluginError::TooManyRequests);
173 }
174 }
175 } else {
176 None
177 };
178
179 self.bridge
182 .call_sync(self.plugin.handle_request(&self.context, type_tag, request))
183 }
184
185 pub fn shutdown(&self, timeout_ms: u64) -> PluginResult<()> {
187 let current_state = self.context.state();
188
189 if current_state != LifecycleState::Active {
191 if current_state.is_terminal() {
192 return Ok(()); }
194 return Err(PluginError::InvalidState {
195 expected: "Active".to_string(),
196 actual: current_state.to_string(),
197 });
198 }
199
200 self.context.transition_to(LifecycleState::Stopping)?;
202
203 let timeout = std::time::Duration::from_millis(timeout_ms);
205 let result = self
206 .bridge
207 .call_sync_timeout(self.plugin.on_stop(&self.context), timeout);
208
209 let runtime_timeout = std::time::Duration::from_millis(timeout_ms / 2);
211 let _ = self.runtime.shutdown(runtime_timeout);
212
213 match result {
214 Ok(()) => {
215 self.context.transition_to(LifecycleState::Stopped)?;
216 tracing::info!("Plugin shutdown complete");
217 Ok(())
218 }
219 Err(PluginError::Timeout) => {
220 self.context.set_state(LifecycleState::Stopped);
221 tracing::warn!("Plugin shutdown timed out");
222 Ok(()) }
224 Err(e) => {
225 self.context.set_state(LifecycleState::Failed);
226 tracing::error!("Plugin shutdown failed: {}", e);
227 Err(e)
228 }
229 }
230 }
231
232 pub fn set_log_level(&self, level: rustbridge_core::LogLevel) {
234 LogCallbackManager::global().set_level(level);
236
237 if let Err(e) = rustbridge_logging::ReloadHandle::global().reload_level(level) {
239 tracing::warn!("Failed to reload tracing filter: {}", e);
240 }
241 }
242
243 pub fn mark_failed(&self) {
252 tracing::error!("Marking plugin as failed due to panic or unrecoverable error");
253 self.context.set_state(LifecycleState::Failed);
254 }
255
256 pub fn rejected_request_count(&self) -> u64 {
258 self.rejected_requests.load(Ordering::Relaxed)
259 }
260}
261
262unsafe impl Send for PluginHandle {}
264unsafe impl Sync for PluginHandle {}
265
266#[cfg(test)]
267#[path = "handle/handle_tests.rs"]
268mod handle_tests;