n_observer_cffi_impl/
observable_cffi_traits.rs1use std::{
3 ffi::{c_ulong, c_void},
4 sync::Arc,
5};
6
7use async_cffi::{CffiFuture, CffiPointerBuffer, SafePtr};
8
9use futures::future::BoxFuture;
10
11use n_observer::{AnyArc, InnerObserverReceiver, Observable, Publisher};
12
13use crate::CffiPublisher;
14
15use crate::CffiInnerObserverReceiver;
16
17#[repr(C)]
18#[derive(Debug, Clone)]
19pub struct CffiObservable {
20 pub self_ptr: *const c_void,
21 pub cffi_publisher: CffiPublisher,
22 pub cffi_inner_observer_receiver: CffiInnerObserverReceiver,
23 pub get_fut: extern "C" fn(*const c_void) -> *const c_void,
24}
25
26impl Publisher for CffiObservable {
27 fn add_observer(
28 &self,
29 observer: Box<dyn InnerObserverReceiver>,
30 input_index: usize,
31 ) -> BoxFuture<'_, Option<AnyArc>> {
32 self.as_publisher().add_observer(observer, input_index)
33 }
34 fn notify(&self, data: AnyArc) -> BoxFuture<'_, ()> {
35 self.as_publisher().notify(data)
36 }
37}
38
39impl InnerObserverReceiver for CffiObservable {
40 fn update(&self, data: Vec<Option<AnyArc>>) -> BoxFuture<'_, ()> {
41 self.as_inner_observer_receiver().update(data)
42 }
43 fn hold_strong_publisher_ref(
44 &self,
45 publisher: Arc<dyn Publisher + Send + Sync>,
46 ) -> BoxFuture<'_, ()> {
47 self.as_inner_observer_receiver()
48 .hold_strong_publisher_ref(publisher)
49 }
50}
51
52impl Observable<SafePtr> for CffiObservable {
53 fn get(&self) -> BoxFuture<'_, Option<Arc<SafePtr>>> {
54 let get_fn = self.get_fut;
55
56 let self_ptr = SafePtr(self.self_ptr);
57
58 Box::pin(async move {
59 let self_ptr = self_ptr;
60
61 let fut = (get_fn)(self_ptr.0);
62
63 if fut.is_null() {
64 panic!("C function returned null pointer");
65 }
66
67 let fut = unsafe {
68 (fut as *mut CffiFuture)
71 .as_mut()
72 .expect("CffiFuture cannot be null")
73 };
74
75 let ret = fut.await;
76
77 assert!(!ret.is_null());
78
79 let ret = ret as *const *const c_void;
80
81 let ret = *unsafe { ret.as_ref().unwrap() };
82
83 let ret = if ret.is_null() {
84 None
85 } else {
86 Some(Arc::new(SafePtr(ret)))
87 };
88
89 ret
90 })
91 }
92}
93
94unsafe impl Send for CffiObservable {}
95
96unsafe impl Sync for CffiObservable {}
97
98impl CffiObservable {
99 fn as_publisher(&self) -> &dyn Publisher {
100 &self.cffi_publisher
101 }
102 fn as_inner_observer_receiver(&self) -> &dyn InnerObserverReceiver {
103 &self.cffi_inner_observer_receiver
104 }
105 extern "C" fn get_fut_impl(self_ptr: *const c_void) -> *const c_void {
106 let self_ref = unsafe {
107 (self_ptr as *const &(dyn Observable<SafePtr> + Send + Sync))
108 .as_ref()
109 .expect("Self pointer cannot be null")
110 };
111
112 let fut = Box::pin(async move {
113 let ret = self_ref.get().await;
114
115 let ret = ret
116 .map(|ret| {
117 let ret = *ret;
118
119 ret.0
120 })
121 .unwrap_or(std::ptr::null());
122
123 SafePtr(ret)
124 });
125
126 CffiFuture::from_rust_future_boxed(fut).into_raw()
127 }
128}
129
130impl From<&dyn Observable<SafePtr>> for CffiObservable {
131 fn from(inner: &dyn Observable<SafePtr>) -> Self {
132 CffiObservable {
133 self_ptr: Box::into_raw(Box::new(inner)) as *const c_void,
137
138 cffi_publisher: {
139 let publisher = Box::new(inner as &dyn Publisher);
140
141 let ret: CffiPublisher = (*publisher).into();
142
143 Box::leak(publisher); ret
146 },
147
148 cffi_inner_observer_receiver: {
149 let inner_observer_receiver = Box::new(inner as &dyn InnerObserverReceiver);
150
151 let ret: CffiInnerObserverReceiver = (*inner_observer_receiver).into();
152
153 Box::leak(inner_observer_receiver); ret
156 },
157
158 get_fut: CffiObservable::get_fut_impl,
159 }
160 }
161}