Skip to main content

marketdata_core/websocket/aio/
runtime.rs

1//! FFI-safe async runtime wrapper
2//!
3//! This module provides a safe wrapper around Tokio's async runtime for use across FFI boundaries.
4//! Key features:
5//! - Single runtime instance pattern
6//! - Panic boundary for FFI safety
7//! - Handle-based task spawning
8//! - Graceful shutdown support
9
10use crate::errors::MarketDataError;
11use std::future::Future;
12use std::panic::{catch_unwind, AssertUnwindSafe};
13use std::ptr;
14use tokio::runtime::{Handle, Runtime};
15
16/// Macro for catching panics at FFI boundaries (returns pointer)
17///
18/// This prevents panics from crossing FFI boundaries, which would be undefined behavior.
19/// Instead, panics are caught and converted to null pointers.
20macro_rules! ffi_catch_ptr {
21    ($body:expr) => {
22        match catch_unwind(AssertUnwindSafe(|| $body)) {
23            Ok(result) => result,
24            Err(_) => {
25                eprintln!("PANIC: Caught panic at FFI boundary");
26                ptr::null_mut()
27            }
28        }
29    };
30}
31
32/// Macro for catching panics at FFI boundaries (returns void)
33///
34/// This prevents panics from crossing FFI boundaries, which would be undefined behavior.
35/// Panics are caught and logged.
36macro_rules! ffi_catch_void {
37    ($body:expr) => {
38        if let Err(_) = catch_unwind(AssertUnwindSafe(|| $body)) {
39            eprintln!("PANIC: Caught panic at FFI boundary");
40        }
41    };
42}
43
44/// FFI-safe async runtime wrapper
45///
46/// This struct wraps Tokio's Runtime and provides safe methods for:
47/// - Creating/destroying runtime across FFI boundary
48/// - Spawning async tasks
49/// - Blocking on futures
50/// - Graceful shutdown
51pub struct AsyncRuntime {
52    runtime: Runtime,
53}
54
55impl AsyncRuntime {
56    /// Create a new multi-threaded Tokio runtime
57    ///
58    /// # Errors
59    /// Returns RuntimeError if the runtime cannot be created
60    pub fn new() -> Result<Self, MarketDataError> {
61        let runtime = tokio::runtime::Builder::new_multi_thread()
62            .enable_all()
63            .build()
64            .map_err(|e| MarketDataError::RuntimeError {
65                msg: format!("Failed to create runtime: {}", e),
66            })?;
67
68        Ok(Self { runtime })
69    }
70
71    /// Get a handle to the runtime for spawning tasks
72    pub fn handle(&self) -> Handle {
73        self.runtime.handle().clone()
74    }
75
76    /// Block on a future until it completes
77    pub fn block_on<F>(&self, future: F) -> F::Output
78    where
79        F: Future,
80    {
81        self.runtime.block_on(future)
82    }
83
84    /// Spawn a task on the runtime
85    pub fn spawn<F>(&self, future: F) -> tokio::task::JoinHandle<F::Output>
86    where
87        F: Future + Send + 'static,
88        F::Output: Send + 'static,
89    {
90        self.runtime.spawn(future)
91    }
92
93    /// Shutdown the runtime gracefully
94    pub fn shutdown(self) {
95        // Drop the runtime, which triggers graceful shutdown
96        drop(self.runtime);
97    }
98}
99
100// FFI functions for external language bindings
101
102/// Create a new async runtime (FFI-safe)
103///
104/// Returns an opaque pointer to the runtime, or null on error.
105/// The caller must call `destroy_runtime` when done.
106///
107/// # Safety
108/// - Caller must eventually call `destroy_runtime` with the returned pointer
109/// - Returned pointer must not be used after `destroy_runtime` is called
110#[no_mangle]
111pub extern "C" fn create_runtime() -> *mut AsyncRuntime {
112    ffi_catch_ptr!({
113        match AsyncRuntime::new() {
114            Ok(runtime) => Box::into_raw(Box::new(runtime)),
115            Err(e) => {
116                crate::tracing_compat::error!(
117                    target: "fugle_marketdata::runtime",
118                    error = %e,
119                    "failed to create async runtime"
120                );
121                let _ = e;
122                ptr::null_mut()
123            }
124        }
125    })
126}
127
128/// Destroy an async runtime (FFI-safe)
129///
130/// # Safety
131/// - `runtime_ptr` must be a valid pointer from `create_runtime`
132/// - `runtime_ptr` must not be used after this call
133/// - Calling with null pointer is safe (no-op)
134#[no_mangle]
135pub unsafe extern "C" fn destroy_runtime(runtime_ptr: *mut AsyncRuntime) {
136    ffi_catch_void!({
137        if !runtime_ptr.is_null() {
138            unsafe {
139                let runtime = Box::from_raw(runtime_ptr);
140                runtime.shutdown();
141            }
142        }
143    })
144}
145
146/// Check if a runtime pointer is valid (non-null)
147///
148/// # Safety
149/// This only checks if the pointer is non-null, not if it's still valid.
150#[no_mangle]
151pub extern "C" fn runtime_is_valid(runtime_ptr: *const AsyncRuntime) -> bool {
152    !runtime_ptr.is_null()
153}
154
155#[cfg(test)]
156mod tests {
157    use super::*;
158    use std::sync::atomic::{AtomicU32, Ordering};
159    use std::sync::Arc;
160    use std::time::Duration;
161
162    #[test]
163    fn test_runtime_creation() {
164        let runtime = AsyncRuntime::new();
165        assert!(runtime.is_ok());
166    }
167
168    #[test]
169    fn test_runtime_block_on() {
170        let runtime = AsyncRuntime::new().unwrap();
171        let result = runtime.block_on(async { 42 });
172        assert_eq!(result, 42);
173    }
174
175    #[test]
176    fn test_runtime_spawn_and_await() {
177        let runtime = AsyncRuntime::new().unwrap();
178        let handle = runtime.spawn(async { "hello" });
179        let result = runtime.block_on(handle).unwrap();
180        assert_eq!(result, "hello");
181    }
182
183    #[test]
184    fn test_runtime_handle_multiple_tasks() {
185        let runtime = AsyncRuntime::new().unwrap();
186        let counter = Arc::new(AtomicU32::new(0));
187
188        let mut handles = vec![];
189        for _ in 0..10 {
190            let counter_clone = counter.clone();
191            let handle = runtime.spawn(async move {
192                tokio::time::sleep(Duration::from_millis(10)).await;
193                counter_clone.fetch_add(1, Ordering::SeqCst);
194            });
195            handles.push(handle);
196        }
197
198        // Wait for all tasks to complete
199        for handle in handles {
200            runtime.block_on(handle).unwrap();
201        }
202
203        assert_eq!(counter.load(Ordering::SeqCst), 10);
204    }
205
206    #[test]
207    fn test_runtime_shutdown() {
208        let runtime = AsyncRuntime::new().unwrap();
209        runtime.shutdown();
210        // If we reach here without panic, shutdown was graceful
211    }
212
213    #[test]
214    fn test_ffi_create_destroy() {
215        let runtime_ptr = create_runtime();
216        assert!(!runtime_ptr.is_null());
217        assert!(runtime_is_valid(runtime_ptr));
218        // SAFETY: runtime_ptr is valid from create_runtime
219        unsafe { destroy_runtime(runtime_ptr) };
220    }
221
222    #[test]
223    fn test_ffi_destroy_null() {
224        // Should not panic - null pointer is handled safely
225        // SAFETY: destroy_runtime explicitly handles null pointers
226        unsafe { destroy_runtime(ptr::null_mut()) };
227    }
228
229    #[test]
230    #[should_panic(expected = "test panic")]
231    fn test_panic_boundary() {
232        // This test demonstrates that panics within Rust code can be caught
233        // In real FFI scenarios, the ffi_catch macros prevent panics from crossing boundaries
234        std::panic::panic_any("test panic");
235    }
236}