Skip to main content

rustbridge_consumer/
plugin.rs

1//! Native plugin wrapper for calling plugins loaded via FFI.
2
3use crate::error::{ConsumerError, ConsumerResult};
4use crate::ffi_bindings::{
5    FfiBuffer, FfiPluginHandle, PluginCallFn, PluginCallRawFn, PluginFreeBufferFn,
6    PluginGetRejectedCountFn, PluginGetStateFn, PluginSetLogLevelFn, PluginShutdownFn, RbResponse,
7    RbResponseFreeFn,
8};
9use libloading::Library;
10use rustbridge_core::{LifecycleState, LogLevel, PluginError};
11use rustbridge_transport::ResponseEnvelope;
12use serde::{Serialize, de::DeserializeOwned};
13use std::ffi::CString;
14use std::sync::atomic::{AtomicBool, Ordering};
15
16/// A loaded native plugin that can be called via FFI.
17///
18/// The plugin is automatically shut down when dropped.
19pub struct NativePlugin {
20    /// The loaded shared library (must be kept alive).
21    #[allow(dead_code)]
22    library: Library,
23
24    /// Handle to the initialized plugin.
25    handle: FfiPluginHandle,
26
27    /// Whether the plugin has been shut down.
28    shutdown: AtomicBool,
29
30    // Cached function pointers
31    call_fn: PluginCallFn,
32    call_raw_fn: Option<PluginCallRawFn>,
33    shutdown_fn: PluginShutdownFn,
34    get_state_fn: PluginGetStateFn,
35    get_rejected_count_fn: PluginGetRejectedCountFn,
36    set_log_level_fn: PluginSetLogLevelFn,
37    free_buffer_fn: PluginFreeBufferFn,
38    rb_response_free_fn: Option<RbResponseFreeFn>,
39}
40
41impl NativePlugin {
42    /// Create a new NativePlugin from loaded library and handle.
43    ///
44    /// # Safety
45    ///
46    /// The caller must ensure:
47    /// - `library` contains valid FFI exports
48    /// - `handle` is a valid plugin handle from `plugin_init`
49    /// - The function pointers are valid for the library's lifetime
50    #[allow(clippy::too_many_arguments)]
51    pub(crate) unsafe fn new(
52        library: Library,
53        handle: FfiPluginHandle,
54        call_fn: PluginCallFn,
55        call_raw_fn: Option<PluginCallRawFn>,
56        shutdown_fn: PluginShutdownFn,
57        get_state_fn: PluginGetStateFn,
58        get_rejected_count_fn: PluginGetRejectedCountFn,
59        set_log_level_fn: PluginSetLogLevelFn,
60        free_buffer_fn: PluginFreeBufferFn,
61        rb_response_free_fn: Option<RbResponseFreeFn>,
62    ) -> Self {
63        Self {
64            library,
65            handle,
66            shutdown: AtomicBool::new(false),
67            call_fn,
68            call_raw_fn,
69            shutdown_fn,
70            get_state_fn,
71            get_rejected_count_fn,
72            set_log_level_fn,
73            free_buffer_fn,
74            rb_response_free_fn,
75        }
76    }
77
78    /// Make a JSON call to the plugin.
79    ///
80    /// # Arguments
81    ///
82    /// * `type_tag` - Message type identifier (e.g., "echo", "user.create")
83    /// * `request` - JSON request payload as a string
84    ///
85    /// # Returns
86    ///
87    /// The JSON response payload as a string, or an error.
88    ///
89    /// # Example
90    ///
91    /// ```ignore
92    /// let response = plugin.call("echo", r#"{"message": "Hello"}"#)?;
93    /// ```
94    pub fn call(&self, type_tag: &str, request: &str) -> ConsumerResult<String> {
95        self.ensure_active()?;
96
97        let type_tag_cstr =
98            CString::new(type_tag).map_err(|e| ConsumerError::InvalidResponse(e.to_string()))?;
99
100        let request_bytes = request.as_bytes();
101
102        // SAFETY: We validated that the plugin is active and the handle is valid
103        let buffer: FfiBuffer = unsafe {
104            (self.call_fn)(
105                self.handle,
106                type_tag_cstr.as_ptr(),
107                request_bytes.as_ptr(),
108                request_bytes.len(),
109            )
110        };
111
112        // Always free the buffer when we're done
113        let result = self.process_buffer(&buffer);
114
115        // SAFETY: buffer was returned from plugin_call and hasn't been freed
116        unsafe {
117            let mut buffer = buffer;
118            (self.free_buffer_fn)(&mut buffer);
119        }
120
121        result
122    }
123
124    /// Make a typed call to the plugin with automatic serialization.
125    ///
126    /// # Arguments
127    ///
128    /// * `type_tag` - Message type identifier
129    /// * `request` - Request value to serialize to JSON
130    ///
131    /// # Returns
132    ///
133    /// The deserialized response, or an error.
134    ///
135    /// # Example
136    ///
137    /// ```ignore
138    /// #[derive(Serialize)]
139    /// struct EchoRequest { message: String }
140    ///
141    /// #[derive(Deserialize)]
142    /// struct EchoResponse { message: String, length: usize }
143    ///
144    /// let response: EchoResponse = plugin.call_typed("echo", &EchoRequest {
145    ///     message: "Hello".to_string(),
146    /// })?;
147    /// ```
148    pub fn call_typed<Req, Res>(&self, type_tag: &str, request: &Req) -> ConsumerResult<Res>
149    where
150        Req: Serialize,
151        Res: DeserializeOwned,
152    {
153        let request_json = serde_json::to_string(request)?;
154        let response_json = self.call(type_tag, &request_json)?;
155        let response: Res = serde_json::from_str(&response_json)?;
156        Ok(response)
157    }
158
159    /// Make a binary call to the plugin.
160    ///
161    /// This is used for high-performance binary transport.
162    ///
163    /// # Arguments
164    ///
165    /// * `message_id` - Numeric message identifier
166    /// * `request` - Raw request bytes
167    ///
168    /// # Returns
169    ///
170    /// The raw response bytes, or an error.
171    ///
172    /// # Errors
173    ///
174    /// Returns `ConsumerError::MissingSymbol` if binary transport is not available.
175    pub fn call_raw(&self, message_id: u32, request: &[u8]) -> ConsumerResult<Vec<u8>> {
176        self.ensure_active()?;
177
178        let call_raw_fn = self.call_raw_fn.ok_or_else(|| {
179            ConsumerError::MissingSymbol("plugin_call_raw (binary transport not available)".into())
180        })?;
181
182        let rb_response_free_fn = self.rb_response_free_fn.ok_or_else(|| {
183            ConsumerError::MissingSymbol("rb_response_free (binary transport not available)".into())
184        })?;
185
186        // SAFETY: We validated that the plugin is active and the handle is valid
187        let response: RbResponse = unsafe {
188            call_raw_fn(
189                self.handle,
190                message_id,
191                request.as_ptr() as *const std::ffi::c_void,
192                request.len(),
193            )
194        };
195
196        // Extract data before freeing
197        let result = if response.is_error() {
198            // SAFETY: error response data is a null-terminated string
199            let error_msg = if response.data.is_null() {
200                "Unknown error".to_string()
201            } else {
202                let slice = unsafe { response.as_slice() };
203                String::from_utf8_lossy(slice).into_owned()
204            };
205            Err(ConsumerError::CallFailed(PluginError::from_code(
206                response.error_code,
207                error_msg,
208            )))
209        } else {
210            // SAFETY: success response data is valid for len bytes
211            let data = unsafe { response.as_slice().to_vec() };
212            Ok(data)
213        };
214
215        // SAFETY: response was returned from plugin_call_raw and hasn't been freed
216        unsafe {
217            let mut response = response;
218            rb_response_free_fn(&mut response);
219        }
220
221        result
222    }
223
224    /// Get the current lifecycle state of the plugin.
225    pub fn state(&self) -> LifecycleState {
226        // After shutdown, the FFI handle is removed from the manager,
227        // so plugin_get_state would return 255 (unknown/Failed).
228        // Return Stopped directly since we know shutdown completed.
229        if self.shutdown.load(Ordering::SeqCst) {
230            return LifecycleState::Stopped;
231        }
232
233        // SAFETY: handle is valid (not yet shut down)
234        let state_code = unsafe { (self.get_state_fn)(self.handle) };
235        state_from_u8(state_code)
236    }
237
238    /// Get the number of requests rejected due to concurrency limits.
239    pub fn rejected_request_count(&self) -> u64 {
240        // SAFETY: handle is valid
241        unsafe { (self.get_rejected_count_fn)(self.handle) }
242    }
243
244    /// Check if binary transport is available.
245    pub fn has_binary_transport(&self) -> bool {
246        self.call_raw_fn.is_some() && self.rb_response_free_fn.is_some()
247    }
248
249    /// Set the log level for the plugin.
250    pub fn set_log_level(&self, level: LogLevel) {
251        // SAFETY: handle is valid
252        unsafe { (self.set_log_level_fn)(self.handle, level as u8) }
253    }
254
255    /// Shutdown the plugin gracefully.
256    ///
257    /// This is called automatically when the plugin is dropped, but can be
258    /// called explicitly to handle shutdown errors.
259    pub fn shutdown(&self) -> ConsumerResult<()> {
260        // Only shutdown once
261        if self
262            .shutdown
263            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
264            .is_err()
265        {
266            return Ok(());
267        }
268
269        // SAFETY: handle is valid and we're only calling shutdown once
270        let success = unsafe { (self.shutdown_fn)(self.handle) };
271
272        if success {
273            Ok(())
274        } else {
275            Err(ConsumerError::InitFailed(
276                "plugin shutdown returned false".to_string(),
277            ))
278        }
279    }
280
281    /// Ensure the plugin is in Active state.
282    fn ensure_active(&self) -> ConsumerResult<()> {
283        let state = self.state();
284        if state.can_handle_requests() {
285            Ok(())
286        } else {
287            Err(ConsumerError::NotActive(state))
288        }
289    }
290
291    /// Process a buffer response from the plugin.
292    fn process_buffer(&self, buffer: &FfiBuffer) -> ConsumerResult<String> {
293        if buffer.is_error() {
294            // SAFETY: error buffer data is valid
295            let error_msg = if buffer.is_empty() {
296                "Unknown error".to_string()
297            } else {
298                let slice = unsafe { buffer.as_slice() };
299                String::from_utf8_lossy(slice).into_owned()
300            };
301            return Err(ConsumerError::CallFailed(PluginError::from_code(
302                buffer.error_code,
303                error_msg,
304            )));
305        }
306
307        // SAFETY: success buffer data is valid JSON
308        let data = unsafe { buffer.as_slice() };
309
310        // Parse the response envelope
311        let envelope: ResponseEnvelope = serde_json::from_slice(data)
312            .map_err(|e| ConsumerError::InvalidResponse(e.to_string()))?;
313
314        if envelope.is_success() {
315            // Extract payload as JSON string
316            match envelope.payload {
317                Some(payload) => Ok(serde_json::to_string(&payload)?),
318                None => Ok("null".to_string()),
319            }
320        } else {
321            let code = envelope.error_code.unwrap_or(11);
322            let message = envelope.error_message.unwrap_or_default();
323            Err(ConsumerError::CallFailed(PluginError::from_code(
324                code, message,
325            )))
326        }
327    }
328}
329
330impl Drop for NativePlugin {
331    fn drop(&mut self) {
332        // Ignore errors on drop
333        let _ = self.shutdown();
334    }
335}
336
337// NativePlugin is Send because it owns all its data and the library is thread-safe.
338// The FFI functions are designed to be called from any thread.
339unsafe impl Send for NativePlugin {}
340
341// NativePlugin is Sync because the FFI calls are thread-safe.
342// The plugin's internal state uses proper synchronization (Arc<RwLock>, DashMap, etc.)
343// and all function pointers are constant after initialization.
344unsafe impl Sync for NativePlugin {}
345
346/// Convert a u8 state code to LifecycleState.
347fn state_from_u8(code: u8) -> LifecycleState {
348    match code {
349        0 => LifecycleState::Installed,
350        1 => LifecycleState::Starting,
351        2 => LifecycleState::Active,
352        3 => LifecycleState::Stopping,
353        4 => LifecycleState::Stopped,
354        5 => LifecycleState::Failed,
355        _ => LifecycleState::Failed, // Unknown state treated as failed
356    }
357}
358
359#[cfg(test)]
360mod tests {
361    #![allow(non_snake_case)]
362
363    use super::*;
364
365    #[test]
366    fn state_from_u8___valid_codes___returns_correct_state() {
367        assert_eq!(state_from_u8(0), LifecycleState::Installed);
368        assert_eq!(state_from_u8(1), LifecycleState::Starting);
369        assert_eq!(state_from_u8(2), LifecycleState::Active);
370        assert_eq!(state_from_u8(3), LifecycleState::Stopping);
371        assert_eq!(state_from_u8(4), LifecycleState::Stopped);
372        assert_eq!(state_from_u8(5), LifecycleState::Failed);
373    }
374
375    #[test]
376    fn state_from_u8___invalid_code___returns_failed() {
377        assert_eq!(state_from_u8(255), LifecycleState::Failed);
378        assert_eq!(state_from_u8(100), LifecycleState::Failed);
379    }
380}