Skip to main content

rustbridge_consumer/
plugin.rs

1//! Native plugin wrapper for calling plugins loaded via FFI.
2
3use crate::error::{ConsumerError, ConsumerResult};
4use crate::ffi_bindings::{
5    FfiBuffer, FfiPluginHandle, PluginCallFn, PluginCallRawFn, PluginFreeBufferFn,
6    PluginGetRejectedCountFn, PluginGetStateFn, PluginSetLogLevelFn, PluginShutdownFn, RbResponse,
7    RbResponseFreeFn,
8};
9use libloading::Library;
10use rustbridge_core::{LifecycleState, LogLevel, PluginError};
11use rustbridge_transport::ResponseEnvelope;
12use serde::{Serialize, de::DeserializeOwned};
13use std::ffi::CString;
14use std::sync::atomic::{AtomicBool, Ordering};
15
16/// A loaded native plugin that can be called via FFI.
17///
18/// The plugin is automatically shut down when dropped.
19pub struct NativePlugin {
20    /// The loaded shared library (must be kept alive).
21    #[allow(dead_code)]
22    library: Library,
23
24    /// Handle to the initialized plugin.
25    handle: FfiPluginHandle,
26
27    /// Whether the plugin has been shut down.
28    shutdown: AtomicBool,
29
30    // Cached function pointers
31    call_fn: PluginCallFn,
32    call_raw_fn: Option<PluginCallRawFn>,
33    shutdown_fn: PluginShutdownFn,
34    get_state_fn: PluginGetStateFn,
35    get_rejected_count_fn: PluginGetRejectedCountFn,
36    set_log_level_fn: PluginSetLogLevelFn,
37    free_buffer_fn: PluginFreeBufferFn,
38    rb_response_free_fn: Option<RbResponseFreeFn>,
39}
40
41impl NativePlugin {
42    /// Create a new NativePlugin from loaded library and handle.
43    ///
44    /// # Safety
45    ///
46    /// The caller must ensure:
47    /// - `library` contains valid FFI exports
48    /// - `handle` is a valid plugin handle from `plugin_init`
49    /// - The function pointers are valid for the library's lifetime
50    #[allow(clippy::too_many_arguments)]
51    pub(crate) unsafe fn new(
52        library: Library,
53        handle: FfiPluginHandle,
54        call_fn: PluginCallFn,
55        call_raw_fn: Option<PluginCallRawFn>,
56        shutdown_fn: PluginShutdownFn,
57        get_state_fn: PluginGetStateFn,
58        get_rejected_count_fn: PluginGetRejectedCountFn,
59        set_log_level_fn: PluginSetLogLevelFn,
60        free_buffer_fn: PluginFreeBufferFn,
61        rb_response_free_fn: Option<RbResponseFreeFn>,
62    ) -> Self {
63        Self {
64            library,
65            handle,
66            shutdown: AtomicBool::new(false),
67            call_fn,
68            call_raw_fn,
69            shutdown_fn,
70            get_state_fn,
71            get_rejected_count_fn,
72            set_log_level_fn,
73            free_buffer_fn,
74            rb_response_free_fn,
75        }
76    }
77
78    /// Make a JSON call to the plugin.
79    ///
80    /// # Arguments
81    ///
82    /// * `type_tag` - Message type identifier (e.g., "echo", "user.create")
83    /// * `request` - JSON request payload as a string
84    ///
85    /// # Returns
86    ///
87    /// The JSON response payload as a string, or an error.
88    ///
89    /// # Example
90    ///
91    /// ```ignore
92    /// let response = plugin.call("echo", r#"{"message": "Hello"}"#)?;
93    /// ```
94    pub fn call(&self, type_tag: &str, request: &str) -> ConsumerResult<String> {
95        self.ensure_active()?;
96
97        let type_tag_cstr =
98            CString::new(type_tag).map_err(|e| ConsumerError::InvalidResponse(e.to_string()))?;
99
100        let request_bytes = request.as_bytes();
101
102        // SAFETY: We validated that the plugin is active and the handle is valid
103        let buffer: FfiBuffer = unsafe {
104            (self.call_fn)(
105                self.handle,
106                type_tag_cstr.as_ptr(),
107                request_bytes.as_ptr(),
108                request_bytes.len(),
109            )
110        };
111
112        // Always free the buffer when we're done
113        let result = self.process_buffer(&buffer);
114
115        // SAFETY: buffer was returned from plugin_call and hasn't been freed
116        unsafe {
117            let mut buffer = buffer;
118            (self.free_buffer_fn)(&mut buffer);
119        }
120
121        result
122    }
123
124    /// Make a typed call to the plugin with automatic serialization.
125    ///
126    /// # Arguments
127    ///
128    /// * `type_tag` - Message type identifier
129    /// * `request` - Request value to serialize to JSON
130    ///
131    /// # Returns
132    ///
133    /// The deserialized response, or an error.
134    ///
135    /// # Example
136    ///
137    /// ```ignore
138    /// #[derive(Serialize)]
139    /// struct EchoRequest { message: String }
140    ///
141    /// #[derive(Deserialize)]
142    /// struct EchoResponse { message: String, length: usize }
143    ///
144    /// let response: EchoResponse = plugin.call_typed("echo", &EchoRequest {
145    ///     message: "Hello".to_string(),
146    /// })?;
147    /// ```
148    pub fn call_typed<Req, Res>(&self, type_tag: &str, request: &Req) -> ConsumerResult<Res>
149    where
150        Req: Serialize,
151        Res: DeserializeOwned,
152    {
153        let request_json = serde_json::to_string(request)?;
154        let response_json = self.call(type_tag, &request_json)?;
155        let response: Res = serde_json::from_str(&response_json)?;
156        Ok(response)
157    }
158
159    /// Make a binary call to the plugin.
160    ///
161    /// This is used for high-performance binary transport.
162    ///
163    /// # Arguments
164    ///
165    /// * `message_id` - Numeric message identifier
166    /// * `request` - Raw request bytes
167    ///
168    /// # Returns
169    ///
170    /// The raw response bytes, or an error.
171    ///
172    /// # Errors
173    ///
174    /// Returns `ConsumerError::MissingSymbol` if binary transport is not available.
175    pub fn call_raw(&self, message_id: u32, request: &[u8]) -> ConsumerResult<Vec<u8>> {
176        self.ensure_active()?;
177
178        let call_raw_fn = self.call_raw_fn.ok_or_else(|| {
179            ConsumerError::MissingSymbol("plugin_call_raw (binary transport not available)".into())
180        })?;
181
182        let rb_response_free_fn = self.rb_response_free_fn.ok_or_else(|| {
183            ConsumerError::MissingSymbol("rb_response_free (binary transport not available)".into())
184        })?;
185
186        // SAFETY: We validated that the plugin is active and the handle is valid
187        let response: RbResponse = unsafe {
188            call_raw_fn(
189                self.handle,
190                message_id,
191                request.as_ptr() as *const std::ffi::c_void,
192                request.len(),
193            )
194        };
195
196        // Extract data before freeing
197        let result = if response.is_error() {
198            // SAFETY: error response data is a null-terminated string
199            let error_msg = if response.data.is_null() {
200                "Unknown error".to_string()
201            } else {
202                let slice = unsafe { response.as_slice() };
203                String::from_utf8_lossy(slice).into_owned()
204            };
205            Err(ConsumerError::CallFailed(PluginError::from_code(
206                response.error_code,
207                error_msg,
208            )))
209        } else {
210            // SAFETY: success response data is valid for len bytes
211            let data = unsafe { response.as_slice().to_vec() };
212            Ok(data)
213        };
214
215        // SAFETY: response was returned from plugin_call_raw and hasn't been freed
216        unsafe {
217            let mut response = response;
218            rb_response_free_fn(&mut response);
219        }
220
221        result
222    }
223
224    /// Get the current lifecycle state of the plugin.
225    pub fn state(&self) -> LifecycleState {
226        // SAFETY: handle is valid
227        let state_code = unsafe { (self.get_state_fn)(self.handle) };
228        state_from_u8(state_code)
229    }
230
231    /// Get the number of requests rejected due to concurrency limits.
232    pub fn rejected_request_count(&self) -> u64 {
233        // SAFETY: handle is valid
234        unsafe { (self.get_rejected_count_fn)(self.handle) }
235    }
236
237    /// Check if binary transport is available.
238    pub fn has_binary_transport(&self) -> bool {
239        self.call_raw_fn.is_some() && self.rb_response_free_fn.is_some()
240    }
241
242    /// Set the log level for the plugin.
243    pub fn set_log_level(&self, level: LogLevel) {
244        // SAFETY: handle is valid
245        unsafe { (self.set_log_level_fn)(self.handle, level as u8) }
246    }
247
248    /// Shutdown the plugin gracefully.
249    ///
250    /// This is called automatically when the plugin is dropped, but can be
251    /// called explicitly to handle shutdown errors.
252    pub fn shutdown(&self) -> ConsumerResult<()> {
253        // Only shutdown once
254        if self
255            .shutdown
256            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
257            .is_err()
258        {
259            return Ok(());
260        }
261
262        // SAFETY: handle is valid and we're only calling shutdown once
263        let success = unsafe { (self.shutdown_fn)(self.handle) };
264
265        if success {
266            Ok(())
267        } else {
268            Err(ConsumerError::InitFailed(
269                "plugin shutdown returned false".to_string(),
270            ))
271        }
272    }
273
274    /// Ensure the plugin is in Active state.
275    fn ensure_active(&self) -> ConsumerResult<()> {
276        let state = self.state();
277        if state.can_handle_requests() {
278            Ok(())
279        } else {
280            Err(ConsumerError::NotActive(state))
281        }
282    }
283
284    /// Process a buffer response from the plugin.
285    fn process_buffer(&self, buffer: &FfiBuffer) -> ConsumerResult<String> {
286        if buffer.is_error() {
287            // SAFETY: error buffer data is valid
288            let error_msg = if buffer.is_empty() {
289                "Unknown error".to_string()
290            } else {
291                let slice = unsafe { buffer.as_slice() };
292                String::from_utf8_lossy(slice).into_owned()
293            };
294            return Err(ConsumerError::CallFailed(PluginError::from_code(
295                buffer.error_code,
296                error_msg,
297            )));
298        }
299
300        // SAFETY: success buffer data is valid JSON
301        let data = unsafe { buffer.as_slice() };
302
303        // Parse the response envelope
304        let envelope: ResponseEnvelope = serde_json::from_slice(data)
305            .map_err(|e| ConsumerError::InvalidResponse(e.to_string()))?;
306
307        if envelope.is_success() {
308            // Extract payload as JSON string
309            match envelope.payload {
310                Some(payload) => Ok(serde_json::to_string(&payload)?),
311                None => Ok("null".to_string()),
312            }
313        } else {
314            let code = envelope.error_code.unwrap_or(11);
315            let message = envelope.error_message.unwrap_or_default();
316            Err(ConsumerError::CallFailed(PluginError::from_code(
317                code, message,
318            )))
319        }
320    }
321}
322
323impl Drop for NativePlugin {
324    fn drop(&mut self) {
325        // Ignore errors on drop
326        let _ = self.shutdown();
327    }
328}
329
330// NativePlugin is Send because it owns all its data and the library is thread-safe.
331// The FFI functions are designed to be called from any thread.
332unsafe impl Send for NativePlugin {}
333
334// NativePlugin is Sync because the FFI calls are thread-safe.
335// The plugin's internal state uses proper synchronization (Arc<RwLock>, DashMap, etc.)
336// and all function pointers are constant after initialization.
337unsafe impl Sync for NativePlugin {}
338
339/// Convert a u8 state code to LifecycleState.
340fn state_from_u8(code: u8) -> LifecycleState {
341    match code {
342        0 => LifecycleState::Installed,
343        1 => LifecycleState::Starting,
344        2 => LifecycleState::Active,
345        3 => LifecycleState::Stopping,
346        4 => LifecycleState::Stopped,
347        5 => LifecycleState::Failed,
348        _ => LifecycleState::Failed, // Unknown state treated as failed
349    }
350}
351
352#[cfg(test)]
353mod tests {
354    #![allow(non_snake_case)]
355
356    use super::*;
357
358    #[test]
359    fn state_from_u8___valid_codes___returns_correct_state() {
360        assert_eq!(state_from_u8(0), LifecycleState::Installed);
361        assert_eq!(state_from_u8(1), LifecycleState::Starting);
362        assert_eq!(state_from_u8(2), LifecycleState::Active);
363        assert_eq!(state_from_u8(3), LifecycleState::Stopping);
364        assert_eq!(state_from_u8(4), LifecycleState::Stopped);
365        assert_eq!(state_from_u8(5), LifecycleState::Failed);
366    }
367
368    #[test]
369    fn state_from_u8___invalid_code___returns_failed() {
370        assert_eq!(state_from_u8(255), LifecycleState::Failed);
371        assert_eq!(state_from_u8(100), LifecycleState::Failed);
372    }
373}