modular_native/
lib.rs

1#![allow(clippy::missing_safety_doc)]
2
3mod module;
4
5use crate::module::NativeCModule;
6use bytes::Bytes;
7use futures::Sink;
8use modular_rs::core::error::{ModuleError, SubscribeError};
9use modular_rs::core::modules::{Module, ModuleRequest, RegistryError};
10use modular_rs::core::Modular;
11use modular_sys::*;
12use parking_lot::RwLock;
13use std::ffi::{CStr, CString};
14use std::future::Future;
15use std::os::raw::c_char;
16use std::panic::catch_unwind;
17use std::pin::Pin;
18use std::ptr::{drop_in_place, null, null_mut};
19use std::sync::{Arc, Weak};
20use std::task::{Context, Poll};
21use tokio::runtime::Runtime;
22
23#[macro_export]
24macro_rules! cstr_to_string {
25    ($arg:expr) => {
26        unsafe { cstr_to_str!($arg).map(|i| i.to_string()) }
27    };
28}
29
30#[macro_export]
31macro_rules! cstr_to_str {
32    ($arg:expr) => {
33        if $arg.is_null() {
34            None
35        } else {
36            Some(CStr::from_ptr($arg).to_string_lossy())
37        }
38    };
39}
40
41pub struct NativeModular {
42    tokio_runtime: Arc<Runtime>,
43    modular: Modular,
44}
45
46#[repr(C)]
47pub struct VTable<M> {
48    create_instance: unsafe extern "system" fn(threads: u32) -> *mut M,
49    destroy_instance: unsafe extern "system" fn(modular: *mut M),
50    subscribe: unsafe extern "system" fn(
51        modular: &M,
52        subscribe: CSubscribe,
53        subscription: *mut CSubscriptionRef,
54    ) -> i32,
55    publish: unsafe extern "system" fn(modular: &M, topic: *const c_char, data: CBuf),
56    register_module: unsafe extern "system" fn(
57        modular: &M,
58        name: *const c_char,
59        module: CModule,
60        replace: bool,
61    ) -> i32,
62    remove_module: unsafe extern "system" fn(modular: &M, name: *const c_char),
63    get_module_ref: unsafe extern "system" fn(modular: &M, name: *const c_char) -> CModuleRef,
64}
65
66#[no_mangle]
67pub unsafe extern "system" fn __modular_vtable() -> *const NativeModularVTable {
68    static VTABLE: &VTable<NativeModular> = &VTable {
69        create_instance: __modular_create,
70        destroy_instance: __modular_destroy,
71        subscribe: __modular_events_subscribe,
72        publish: __modular_events_publish,
73        register_module: __modular_register_module,
74        remove_module: __modular_remove_module,
75        get_module_ref: __modular_get_module_ref,
76    };
77
78    VTABLE as *const VTable<_> as _
79}
80
81pub unsafe extern "system" fn __modular_create(threads: u32) -> *mut NativeModular {
82    #[cfg(not(target_family = "wasm"))]
83    let runtime = {
84        tokio::runtime::Builder::new_multi_thread()
85            .enable_all()
86            .worker_threads(threads as usize)
87            .build()
88            .unwrap()
89    };
90
91    #[cfg(target_family = "wasm")]
92    let runtime = {
93        tokio::runtime::Builder::new_current_thread()
94            .enable_all()
95            .build()
96            .unwrap()
97    };
98
99    let modular = Modular::default();
100
101    Box::into_raw(Box::new(NativeModular {
102        tokio_runtime: Arc::new(runtime),
103        modular,
104    }))
105}
106
107pub unsafe extern "system" fn __modular_destroy(modular: *mut NativeModular) {
108    let _ = Box::from_raw(modular);
109}
110
111struct Subscribe {
112    close_flag: Arc<RwLock<Option<()>>>,
113    on_event: OnEvent,
114    subscription: CSubscriptionRef,
115    is_closed: bool,
116}
117
118impl Subscribe {
119    #[allow(clippy::complexity)]
120    fn poll_state(
121        mut self: Pin<&mut Self>,
122    ) -> Poll<Result<(), <Subscribe as Sink<(String, Bytes)>>::Error>> {
123        if self.is_closed {
124            return Poll::Ready(Err(()));
125        }
126
127        if self.close_flag.read().is_none() {
128            self.is_closed = true;
129            return Poll::Ready(Err(()));
130        }
131
132        Poll::Ready(Ok(()))
133    }
134}
135
136impl Sink<(String, Bytes)> for Subscribe {
137    type Error = ();
138
139    fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
140        self.poll_state()
141    }
142
143    fn start_send(self: Pin<&mut Self>, item: (String, Bytes)) -> Result<(), Self::Error> {
144        let _lock = self.close_flag.read();
145        if _lock.is_none() {
146            return Err(());
147        }
148
149        let topic = CString::new(item.0).expect("`null` in topic");
150        let data = CBuf {
151            data: item.1.as_ptr(),
152            len: item.1.len(),
153        };
154
155        drop(_lock);
156        unsafe { (self.on_event)(self.subscription, topic.as_ptr(), data) };
157
158        Ok(())
159    }
160
161    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
162        self.poll_state()
163    }
164
165    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
166        self.poll_state()
167    }
168}
169
170unsafe impl Send for Subscribe {}
171unsafe impl Sync for Subscribe {}
172
173pub struct Subscription {
174    pub user_data: Obj,
175    pub close_flag: Weak<RwLock<Option<()>>>,
176    pub on_unsubscribe: Option<Cleanup>,
177}
178
179impl Drop for Subscription {
180    fn drop(&mut self) {
181        if let Some(v) = self.close_flag.upgrade() {
182            let mut guard = v.write();
183            if guard.take().is_some() {
184                if let Some(v) = self.on_unsubscribe {
185                    unsafe { v(self.user_data) }
186                }
187            }
188        }
189    }
190}
191
192pub unsafe extern "system" fn __modular_events_subscribe(
193    modular: &NativeModular,
194    subscribe: CSubscribe,
195    subscription: *mut CSubscriptionRef,
196) -> i32 {
197    let Some(topic) = cstr_to_str!(subscribe.topic) else {
198        return 1
199    };
200
201    let flag = Arc::new(RwLock::new(Some(())));
202    let weak_flag = Arc::downgrade(&flag);
203    let subscription_ptr = Box::into_raw(Box::new(Subscription {
204        user_data: subscribe.user_data,
205        close_flag: weak_flag,
206        on_unsubscribe: subscribe.on_unsubscribe,
207    }));
208
209    let subscription_ref = CSubscriptionRef {
210        user_data: subscribe.user_data,
211        subscription_ref: Obj(subscription_ptr.cast()),
212        unsubscribe: __modular_events_unsubscribe,
213    };
214
215    let subscribe = Subscribe {
216        close_flag: flag,
217        on_event: subscribe.on_event,
218        subscription: subscription_ref,
219        is_closed: false,
220    };
221
222    let handle = modular.tokio_runtime.handle();
223    let _guard = handle.enter();
224
225    match modular.modular.subscribe(&topic, subscribe) {
226        Ok(_) => {
227            *subscription = subscription_ref;
228
229            0
230        }
231        Err(err) => {
232            drop_in_place(subscription_ptr);
233
234            match err {
235                SubscribeError::InvalidPattern(_) => -1,
236            }
237        }
238    }
239}
240
241pub unsafe extern "system" fn __modular_events_publish(
242    modular: &NativeModular,
243    topic: *const c_char,
244    buf: CBuf,
245) {
246    let topic = cstr_to_str!(topic).expect("topic must not be null");
247    let bytes = Bytes::copy_from_slice(std::slice::from_raw_parts(buf.data, buf.len));
248
249    modular.modular.publish_event(&topic, bytes)
250}
251
252pub unsafe extern "system" fn __modular_events_unsubscribe(subscription: Obj) {
253    let _ = catch_unwind(|| {
254        let _ = Box::from_raw(subscription.0 as *mut Subscription);
255    })
256    .map_err(|e| eprintln!("failed to drop subscription: {:?}", e));
257}
258
259pub unsafe extern "system" fn __modular_register_module(
260    modular: &NativeModular,
261    name: *const c_char,
262    module: CModule,
263    replace: bool,
264) -> i32 {
265    let name = cstr_to_str!(name).expect("module name can't be null");
266
267    let module = NativeCModule(module);
268
269    let result = if replace {
270        modular.modular.register_or_replace_module(&name, module);
271
272        Ok(())
273    } else {
274        modular.modular.register_module(&name, module)
275    };
276
277    match result {
278        Ok(_) => 0,
279        Err(err) => match err {
280            RegistryError::AlreadyExists => -1,
281        },
282    }
283}
284
285pub unsafe extern "system" fn __modular_remove_module(
286    modular: &NativeModular,
287    name: *const c_char,
288) {
289    if let Some(v) = cstr_to_str!(name) {
290        modular.modular.remove_module(&v)
291    }
292}
293
294pub unsafe extern "system" fn __modular_get_module_ref(
295    modular: &NativeModular,
296    name: *const c_char,
297) -> CModuleRef {
298    static C_MODULE_REF_VTABLE: CModuleRefVTable = CModuleRefVTable {
299        clone,
300        drop,
301        invoke,
302    };
303
304    #[derive(Clone)]
305    pub struct RtModule {
306        runtime: Weak<Runtime>,
307        module: Module<Bytes, Bytes>,
308    }
309
310    let name = cstr_to_str!(name).expect("name can't be empty");
311    let Some(module) = modular.modular.get_module(&name) else {
312        return CModuleRef {
313            ptr: Obj(null_mut()),
314            vtable: C_MODULE_REF_VTABLE,
315        };
316    };
317
318    let module = RtModule {
319        runtime: Arc::downgrade(&modular.tokio_runtime),
320        module,
321    };
322
323    unsafe extern "system" fn clone(ptr: Obj) -> CModuleRef {
324        let v = &*(ptr.0 as *mut RtModule);
325        let new_module = v.clone();
326
327        CModuleRef {
328            ptr: Obj(Box::into_raw(Box::new(new_module)).cast()),
329            vtable: C_MODULE_REF_VTABLE,
330        }
331    }
332
333    unsafe extern "system" fn drop(ptr: Obj) {
334        let _ = Box::from_raw(ptr.0 as *mut RtModule);
335    }
336
337    unsafe extern "system" fn invoke(
338        ptr: Obj,
339        action: *const c_char,
340        data: CBuf,
341        callback: CCallback,
342    ) {
343        let RtModule { runtime, module } = (*(ptr.0 as *mut RtModule)).clone();
344
345        let action = CStr::from_ptr(action).to_string_lossy().to_string();
346        let data = Bytes::copy_from_slice(std::slice::from_raw_parts(data.data, data.len));
347
348        if let Some(v) = runtime.upgrade() {
349            let task = ModuleTask {
350                task: Box::pin(async move {
351                    match module.invoke(ModuleRequest::new(&action, data)).await {
352                        Ok(response) => {
353                            let buf = CBuf {
354                                data: response.data.as_ptr(),
355                                len: response.data.len(),
356                            };
357
358                            (callback.success)(callback.ptr, buf)
359                        }
360                        Err(error) => match error {
361                            ModuleError::UnknownMethod => (callback.unknown_method)(callback.ptr),
362                            ModuleError::Custom(v) => {
363                                let name = v.name.map(|v| CString::new(v).unwrap());
364                                let message = v.message.map(|v| CString::new(v).unwrap());
365
366                                let module_error = CModuleError {
367                                    code: v.code,
368                                    name: name.as_ref().map(|i| i.as_ptr()).unwrap_or(null()),
369                                    message: message.as_ref().map(|i| i.as_ptr()).unwrap_or(null()),
370                                };
371
372                                (callback.error)(callback.ptr, module_error)
373                            }
374                            ModuleError::Destroyed => (callback.destroyed)(callback.ptr),
375                        },
376                    };
377                }),
378                on_drop: Some(Box::new(move || (callback.destroyed)(callback.ptr))),
379            };
380
381            v.spawn(task);
382        } else {
383            (callback.destroyed)(callback.ptr)
384        }
385    }
386
387    CModuleRef {
388        ptr: Obj(Box::into_raw(Box::new(module)).cast()),
389        vtable: C_MODULE_REF_VTABLE,
390    }
391}
392
393struct ModuleTask<D, F>
394where
395    F: Future<Output = ()> + Send + Unpin,
396    D: FnOnce() + Send + Unpin,
397{
398    task: F,
399    on_drop: Option<D>,
400}
401
402impl<D, F> Drop for ModuleTask<D, F>
403where
404    F: Future<Output = ()> + Send + Unpin,
405    D: FnOnce() + Send + Unpin,
406{
407    fn drop(&mut self) {
408        if let Some(f) = self.on_drop.take() {
409            f()
410        }
411    }
412}
413
414impl<D, F> Future for ModuleTask<D, F>
415where
416    F: Future<Output = ()> + Send + Unpin,
417    D: FnOnce() + Send + Unpin,
418{
419    type Output = ();
420
421    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
422        self.on_drop.take();
423
424        Pin::new(&mut self.task).poll(cx)
425    }
426}
427
428#[test]
429fn a() {}