marketdata_core/websocket/aio/
runtime.rs1use crate::errors::MarketDataError;
11use std::future::Future;
12use std::panic::{catch_unwind, AssertUnwindSafe};
13use std::ptr;
14use tokio::runtime::{Handle, Runtime};
15
16macro_rules! ffi_catch_ptr {
21 ($body:expr) => {
22 match catch_unwind(AssertUnwindSafe(|| $body)) {
23 Ok(result) => result,
24 Err(_) => {
25 eprintln!("PANIC: Caught panic at FFI boundary");
26 ptr::null_mut()
27 }
28 }
29 };
30}
31
32macro_rules! ffi_catch_void {
37 ($body:expr) => {
38 if let Err(_) = catch_unwind(AssertUnwindSafe(|| $body)) {
39 eprintln!("PANIC: Caught panic at FFI boundary");
40 }
41 };
42}
43
44pub struct AsyncRuntime {
52 runtime: Runtime,
53}
54
55impl AsyncRuntime {
56 pub fn new() -> Result<Self, MarketDataError> {
61 let runtime = tokio::runtime::Builder::new_multi_thread()
62 .enable_all()
63 .build()
64 .map_err(|e| MarketDataError::RuntimeError {
65 msg: format!("Failed to create runtime: {}", e),
66 })?;
67
68 Ok(Self { runtime })
69 }
70
71 pub fn handle(&self) -> Handle {
73 self.runtime.handle().clone()
74 }
75
76 pub fn block_on<F>(&self, future: F) -> F::Output
78 where
79 F: Future,
80 {
81 self.runtime.block_on(future)
82 }
83
84 pub fn spawn<F>(&self, future: F) -> tokio::task::JoinHandle<F::Output>
86 where
87 F: Future + Send + 'static,
88 F::Output: Send + 'static,
89 {
90 self.runtime.spawn(future)
91 }
92
93 pub fn shutdown(self) {
95 drop(self.runtime);
97 }
98}
99
100#[no_mangle]
111pub extern "C" fn create_runtime() -> *mut AsyncRuntime {
112 ffi_catch_ptr!({
113 match AsyncRuntime::new() {
114 Ok(runtime) => Box::into_raw(Box::new(runtime)),
115 Err(e) => {
116 crate::tracing_compat::error!(
117 target: "fugle_marketdata::runtime",
118 error = %e,
119 "failed to create async runtime"
120 );
121 let _ = e;
122 ptr::null_mut()
123 }
124 }
125 })
126}
127
128#[no_mangle]
135pub unsafe extern "C" fn destroy_runtime(runtime_ptr: *mut AsyncRuntime) {
136 ffi_catch_void!({
137 if !runtime_ptr.is_null() {
138 unsafe {
139 let runtime = Box::from_raw(runtime_ptr);
140 runtime.shutdown();
141 }
142 }
143 })
144}
145
146#[no_mangle]
151pub extern "C" fn runtime_is_valid(runtime_ptr: *const AsyncRuntime) -> bool {
152 !runtime_ptr.is_null()
153}
154
155#[cfg(test)]
156mod tests {
157 use super::*;
158 use std::sync::atomic::{AtomicU32, Ordering};
159 use std::sync::Arc;
160 use std::time::Duration;
161
162 #[test]
163 fn test_runtime_creation() {
164 let runtime = AsyncRuntime::new();
165 assert!(runtime.is_ok());
166 }
167
168 #[test]
169 fn test_runtime_block_on() {
170 let runtime = AsyncRuntime::new().unwrap();
171 let result = runtime.block_on(async { 42 });
172 assert_eq!(result, 42);
173 }
174
175 #[test]
176 fn test_runtime_spawn_and_await() {
177 let runtime = AsyncRuntime::new().unwrap();
178 let handle = runtime.spawn(async { "hello" });
179 let result = runtime.block_on(handle).unwrap();
180 assert_eq!(result, "hello");
181 }
182
183 #[test]
184 fn test_runtime_handle_multiple_tasks() {
185 let runtime = AsyncRuntime::new().unwrap();
186 let counter = Arc::new(AtomicU32::new(0));
187
188 let mut handles = vec![];
189 for _ in 0..10 {
190 let counter_clone = counter.clone();
191 let handle = runtime.spawn(async move {
192 tokio::time::sleep(Duration::from_millis(10)).await;
193 counter_clone.fetch_add(1, Ordering::SeqCst);
194 });
195 handles.push(handle);
196 }
197
198 for handle in handles {
200 runtime.block_on(handle).unwrap();
201 }
202
203 assert_eq!(counter.load(Ordering::SeqCst), 10);
204 }
205
206 #[test]
207 fn test_runtime_shutdown() {
208 let runtime = AsyncRuntime::new().unwrap();
209 runtime.shutdown();
210 }
212
213 #[test]
214 fn test_ffi_create_destroy() {
215 let runtime_ptr = create_runtime();
216 assert!(!runtime_ptr.is_null());
217 assert!(runtime_is_valid(runtime_ptr));
218 unsafe { destroy_runtime(runtime_ptr) };
220 }
221
222 #[test]
223 fn test_ffi_destroy_null() {
224 unsafe { destroy_runtime(ptr::null_mut()) };
227 }
228
229 #[test]
230 #[should_panic(expected = "test panic")]
231 fn test_panic_boundary() {
232 std::panic::panic_any("test panic");
235 }
236}