Skip to main content

rustbridge_runtime/
bridge.rs

1//! Bridge between sync FFI calls and async handlers
2
3use crate::{AsyncRuntime, ShutdownSignal};
4use rustbridge_core::{PluginError, PluginResult};
5use std::future::Future;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicU64, Ordering};
8
9/// Bridge for executing async operations from sync FFI context
10pub struct AsyncBridge {
11    runtime: Arc<AsyncRuntime>,
12    request_counter: AtomicU64,
13}
14
15impl AsyncBridge {
16    /// Create a new async bridge
17    pub fn new(runtime: Arc<AsyncRuntime>) -> Self {
18        Self {
19            runtime,
20            request_counter: AtomicU64::new(0),
21        }
22    }
23
24    /// Get the next request ID
25    pub fn next_request_id(&self) -> u64 {
26        self.request_counter.fetch_add(1, Ordering::SeqCst)
27    }
28
29    /// Execute an async operation synchronously (blocking)
30    ///
31    /// This is the primary method for handling sync FFI calls.
32    pub fn call_sync<F, T>(&self, future: F) -> PluginResult<T>
33    where
34        F: Future<Output = PluginResult<T>>,
35    {
36        if self.runtime.is_shutting_down() {
37            return Err(PluginError::RuntimeError(
38                "Runtime is shutting down".to_string(),
39            ));
40        }
41        self.runtime.block_on(future)
42    }
43
44    /// Execute an async operation with timeout
45    pub fn call_sync_timeout<F, T>(
46        &self,
47        future: F,
48        timeout: std::time::Duration,
49    ) -> PluginResult<T>
50    where
51        F: Future<Output = PluginResult<T>>,
52    {
53        self.runtime.block_on(async move {
54            match tokio::time::timeout(timeout, future).await {
55                Ok(result) => result,
56                Err(_) => Err(PluginError::Timeout),
57            }
58        })
59    }
60
61    /// Spawn an async task and return a handle
62    pub fn spawn<F, T>(&self, future: F) -> tokio::task::JoinHandle<T>
63    where
64        F: Future<Output = T> + Send + 'static,
65        T: Send + 'static,
66    {
67        self.runtime.spawn(future)
68    }
69
70    /// Get a shutdown signal
71    pub fn shutdown_signal(&self) -> ShutdownSignal {
72        self.runtime.shutdown_signal()
73    }
74
75    /// Check if the runtime is shutting down
76    pub fn is_shutting_down(&self) -> bool {
77        self.runtime.is_shutting_down()
78    }
79}
80
81/// Type alias for async completion callback used in FFI
82#[allow(dead_code)] // Reserved for future async API
83pub type CompletionCallback = extern "C" fn(
84    context: *mut std::ffi::c_void,
85    request_id: u64,
86    data: *const u8,
87    len: usize,
88    error_code: u32,
89);
90
91/// Pending async request tracker
92#[allow(dead_code)] // Reserved for future async API
93pub struct PendingRequest {
94    pub request_id: u64,
95    pub callback: CompletionCallback,
96    pub context: *mut std::ffi::c_void,
97    pub cancel_handle: Option<tokio::task::JoinHandle<()>>,
98}
99
100// Safety: The context pointer is provided by the host and assumed to be thread-safe
101unsafe impl Send for PendingRequest {}
102unsafe impl Sync for PendingRequest {}
103
104#[allow(dead_code)] // Reserved for future async API
105impl PendingRequest {
106    /// Create a new pending request
107    pub fn new(
108        request_id: u64,
109        callback: CompletionCallback,
110        context: *mut std::ffi::c_void,
111    ) -> Self {
112        Self {
113            request_id,
114            callback,
115            context,
116            cancel_handle: None,
117        }
118    }
119
120    /// Complete the request with success
121    ///
122    /// # Safety
123    /// The callback and context must be valid for the duration of this call.
124    pub unsafe fn complete_success(&self, data: &[u8]) {
125        (self.callback)(self.context, self.request_id, data.as_ptr(), data.len(), 0);
126    }
127
128    /// Complete the request with error
129    ///
130    /// # Safety
131    /// The callback and context must be valid for the duration of this call.
132    pub unsafe fn complete_error(&self, error_code: u32, message: &str) {
133        let msg_bytes = message.as_bytes();
134        (self.callback)(
135            self.context,
136            self.request_id,
137            msg_bytes.as_ptr(),
138            msg_bytes.len(),
139            error_code,
140        );
141    }
142}
143
144#[cfg(test)]
145#[path = "bridge/bridge_tests.rs"]
146mod bridge_tests;