xComLib/
x_api.rs

1// 文件是否存在的声明
2
3use std::{
4    future::Future,
5    sync::{Arc, Mutex},
6    task::{Poll, Waker},
7};
8
9use crate::x_core::{add_request_handler, gen_id};
10use tracing::info;
11use xCommonLib::{
12    base::status::Status,
13    serial::{self, request_message::RequestMessage},
14};
15
16struct CommonApiStatus {
17    is_finish: bool,
18    status: Status,
19    waker: Option<Waker>,
20}
21
22pub struct CommonStatusFuture {
23    shared_state: Arc<Mutex<CommonApiStatus>>,
24}
25
26impl Future for CommonStatusFuture {
27    type Output = Status;
28    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
29        let mut shared_state = self.shared_state.lock().unwrap();
30        if shared_state.is_finish {
31            let err_msg = std::mem::take(&mut shared_state.status.err_msg);
32            return Poll::Ready(Status::new(shared_state.status.err_code, err_msg));
33        }
34        shared_state.waker = Some(cx.waker().clone());
35        Poll::Pending
36    }
37}
38
39fn build_common_status_future() -> CommonStatusFuture {
40    let shared_state = Arc::new(Mutex::new(CommonApiStatus {
41        is_finish: false,
42        status: Status::default(),
43        waker: None,
44    }));
45    let future = CommonStatusFuture { shared_state };
46
47    future
48}
49
50fn add_request_status_handler(request_id: i64, clone_shared_state: Arc<Mutex<CommonApiStatus>>) {
51    add_request_handler(
52        request_id,
53        Box::new(move |buffer| {
54            let waker = {
55                let mut shared_state = clone_shared_state.lock().unwrap();
56                let waker = shared_state.waker.take();
57                if waker.is_none() {
58                    return;
59                }
60                shared_state
61                    .status
62                    .parse_from_bytes_return_num(buffer)
63                    .unwrap();
64                shared_state.is_finish = true;
65                waker
66            };
67
68            waker.unwrap().wake_by_ref();
69        }),
70    );
71}
72
73struct ResultApiState<T> {
74    is_finish: bool,
75    status: Status,
76    value: Option<T>,
77    waker: Option<Waker>,
78}
79
80pub struct ResultApiFuture<T> {
81    shared_state: Arc<Mutex<ResultApiState<T>>>,
82}
83
84//
85impl<T: Send + Sync> Future for ResultApiFuture<T> {
86    type Output = Result<T, Status>;
87    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
88        let mut shared_state = self.shared_state.lock().unwrap();
89        if shared_state.is_finish {
90            if shared_state.status.is_erorr() {
91                let status = std::mem::take(&mut shared_state.status);
92                return Poll::Ready(Err(status));
93            } else {
94                let value = std::mem::take(&mut shared_state.value);
95                return Poll::Ready(Ok(value.unwrap()));
96            }
97        }
98        shared_state.waker = Some(cx.waker().clone());
99        Poll::Pending
100    }
101}
102
103//
104
105pub struct ExistSharedState {
106    is_finish: bool,
107    is_exist: bool,
108    waker: Option<Waker>,
109}
110
111pub struct ExistFuture {
112    shared_state: Arc<Mutex<ExistSharedState>>,
113}
114
115impl Future for ExistFuture {
116    type Output = bool;
117    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
118        //
119        let mut shared_state = self.shared_state.lock().unwrap();
120        if shared_state.is_finish {
121            return Poll::Ready(shared_state.is_exist);
122        }
123        shared_state.waker = Some(cx.waker().clone());
124        Poll::Pending
125    }
126}
127
128pub fn new_exist_future() -> ExistFuture {
129    let shared_state = Arc::new(Mutex::new(ExistSharedState {
130        is_finish: false,
131        is_exist: false,
132        waker: None,
133    }));
134
135    let future = ExistFuture { shared_state };
136
137    future
138}
139
140// 添加文件或者文件夹是否存在
141pub fn add_exist_request(shared_state: Arc<Mutex<ExistSharedState>>) -> i64 {
142    let request_id = gen_id();
143    let clone_shared_state = shared_state.clone();
144    add_request_handler(
145        request_id,
146        Box::new(move |_buffer| {
147            let waker = {
148                let mut shared_state = clone_shared_state.lock().unwrap();
149                //
150                let waker = shared_state.waker.take();
151                if waker.is_none() {
152                    return;
153                }
154                let is_exist = serial::read_bool(_buffer);
155                shared_state.is_exist = is_exist;
156                shared_state.is_finish = true;
157                waker
158            };
159            waker.unwrap().wake_by_ref();
160        }),
161    );
162    request_id
163}
164
165fn build_result_api_future<T>() -> ResultApiFuture<T>
166where
167    T: RequestMessage + Default + 'static,
168{
169    let shared_state = Arc::new(Mutex::new(ResultApiState::<T> {
170        is_finish: false,
171        status: Status::default(),
172        waker: None,
173        value: None,
174    }));
175    let future = ResultApiFuture::<T> { shared_state };
176
177    future
178}
179
180fn add_request_result_handler<T>(request_id: i64, clone_shared_state: Arc<Mutex<ResultApiState<T>>>)
181where
182    T: RequestMessage + Default + 'static,
183{
184    add_request_handler(
185        request_id,
186        Box::new(move |buffer| {
187            let waker = {
188                let mut shared_state = clone_shared_state.lock().unwrap();
189                //
190                let waker = shared_state.waker.take();
191                if waker.is_none() {
192                    return;
193                }
194                //
195                let read_bytes = shared_state
196                    .status
197                    .parse_from_bytes_return_num(buffer)
198                    .unwrap();
199                if read_bytes != buffer.len() as u64 {
200                    let mut value = T::default();
201                    let bytes = &buffer[read_bytes as usize..];
202                    value.parse_from_bytes_return_num(bytes).unwrap();
203                    shared_state.value = Some(value);
204                }
205                shared_state.is_finish = true;
206                waker
207            };
208            waker.unwrap().wake_by_ref();
209        }),
210    );
211}
212
213//
214
215pub mod xport {
216    use super::{
217        add_exist_request, add_request_result_handler, add_request_status_handler,
218        build_common_status_future, build_result_api_future, new_exist_future, CommonStatusFuture,
219        ExistFuture, ResultApiFuture,
220    };
221    use crate::x_core::{
222        gen_id, get_service_id, ADD_CHANNEL_ID_TO_REMOTE_SERVICE, BUILD_CHANNEL_API,
223        EVENT_RESPONSE_FUNC_API, GET_ALL_CONN_ID, GET_ALL_LOCAL_SERVICE, GET_CHANNEL_ID_BY_CONN_ID,
224        GET_XRPC_PORT, HAS_SERVICE_API, LOAD_SERVICE_API, REMOVE_REMOTE_SERVICE_ALL_CHANNEL_ID,
225        SEND_MESSAGE_API, SET_CHANNEL_XRPC_PORT, SLEEP_API, SUBSCRIBE, UNLOAD_SERVICE_API,
226        UNSUBSCRIBE,
227    };
228    use core::slice;
229    use std::collections::HashMap;
230    use std::ptr::null;
231    use std::sync::{Mutex, RwLock};
232    use tracing::info;
233    use xCommonLib::base::dll_api::event::CHANNEL_DISCONNECTED;
234    use xCommonLib::base::dll_api::event::LOCAL_SERVICE_OFF;
235    use xCommonLib::base::dll_api::event::LOCAL_SERVICE_ON;
236    use xCommonLib::base::dll_api::event::{CHANNEL_CONNECTED, MESSAGE_IN};
237
238    use lazy_static::lazy_static;
239    use xCommonLib::serial::request_message::RequestMessage;
240    use xCommonLib::service::sys_service_api::{
241        ChannelEvent, ChannelId, ConnIds, LoadServiceRequest, ServiceInfo, ServiceInfos,
242        ServiceKey, UnloadServiceRequest, VerifyInfo,
243    };
244
245    lazy_static! {
246      static ref SUBSCRIBERIDMAP:Mutex<HashMap<u16, i64>> = Mutex::new(HashMap::new());
247      // 订阅的map
248      static ref EVENT_SUBSCRIBER_MAP: RwLock<HashMap<u16, Box<dyn Fn(i64, *const u8,  u32) + Send + Sync>>> = RwLock::new(HashMap::new());
249    }
250
251    pub struct EventContext {
252        publish_id: i64,
253        is_resp: bool,
254    }
255
256    impl EventContext {
257        pub fn new(publish_id: i64) -> Self {
258            EventContext {
259                publish_id,
260                is_resp: false,
261            }
262        }
263
264        pub fn resp<T>(mut self, t: T)
265        where
266            T: RequestMessage,
267        {
268            let buffer = t.serial().unwrap();
269            unsafe {
270                EVENT_RESPONSE_FUNC_API.assume_init_ref()(
271                    self.publish_id,
272                    buffer.as_ptr(),
273                    buffer.len() as u32,
274                );
275            }
276            self.is_resp = true;
277        }
278    }
279
280    impl Drop for EventContext {
281        fn drop(&mut self) {
282            if self.is_resp {
283                return;
284            }
285            unsafe {
286                EVENT_RESPONSE_FUNC_API.assume_init_ref()(self.publish_id, null(), 0);
287            }
288        }
289    }
290
291    /**
292     * 发送消息
293     */
294    pub fn send_message(
295        request_id: i64,
296        receiver_service_key: ServiceKey,
297        channel_id: i64,
298        buffer: Vec<u8>,
299    ) {
300        let receiver_buffer = receiver_service_key.serial().unwrap();
301        unsafe {
302            SEND_MESSAGE_API.assume_init_ref()(
303                get_service_id(),
304                request_id,
305                receiver_buffer.as_ptr(),
306                receiver_buffer.len() as u32,
307                channel_id,
308                buffer.as_ptr(),
309                buffer.len() as u32,
310            );
311        }
312    }
313
314    /**
315     *
316     */
317    pub fn sleep(milliseconds: u64) -> CommonStatusFuture {
318        let future = build_common_status_future();
319
320        let request_id = gen_id();
321
322        let clone_shared_state = future.shared_state.clone();
323        add_request_status_handler(request_id, clone_shared_state);
324        //
325        unsafe {
326            SLEEP_API.assume_init_ref()(get_service_id(), request_id, milliseconds);
327        }
328        future
329    }
330
331    pub fn load_service(param: &LoadServiceRequest) -> CommonStatusFuture {
332        // 要先获取 dxc 的类型,然后再启动
333        let future = build_common_status_future();
334        //
335        let request_id = gen_id();
336        //
337        let clone_shared_state = future.shared_state.clone();
338        add_request_status_handler(request_id, clone_shared_state);
339        let message = param.serial().unwrap();
340        //
341        unsafe {
342            LOAD_SERVICE_API.assume_init_ref()(
343                get_service_id(),
344                request_id,
345                message.as_ptr(),
346                message.len() as u32,
347            );
348        };
349        future
350    }
351
352    pub fn unload_service(param: &UnloadServiceRequest) -> CommonStatusFuture {
353        // 也要先获取 dxc 的类型
354        let future = build_common_status_future();
355        let request_id = gen_id();
356        let clone_shared_state = future.shared_state.clone();
357        add_request_status_handler(request_id, clone_shared_state);
358        let message = param.serial().unwrap();
359        //
360        unsafe {
361            UNLOAD_SERVICE_API.assume_init_ref()(
362                get_service_id(),
363                request_id,
364                message.as_ptr(),
365                message.len() as u32,
366            );
367        }
368        future
369    }
370
371    // 文件是否存在
372    pub fn has_service(service_key: &ServiceKey) -> ExistFuture {
373        let future = new_exist_future();
374
375        let clone_shared_state = future.shared_state.clone();
376
377        let request_id = add_exist_request(clone_shared_state);
378
379        let message = service_key.serial().unwrap();
380        //  调用 Api
381        unsafe {
382            HAS_SERVICE_API.assume_init_ref()(
383                get_service_id(),
384                request_id,
385                message.as_ptr(),
386                message.len() as u32,
387            );
388        };
389        future
390    }
391
392    pub fn build_channel(node_id: i64) -> CommonStatusFuture {
393        let future = build_common_status_future();
394        unsafe {
395            let request_id = gen_id();
396
397            let clone_shared_state = future.shared_state.clone();
398            add_request_status_handler(request_id, clone_shared_state);
399            //
400            BUILD_CHANNEL_API.assume_init_ref()(get_service_id(), request_id, node_id);
401        }
402        future
403    }
404
405    pub fn get_all_local_service(filter_system: bool) -> ResultApiFuture<ServiceInfos> {
406        let future = build_result_api_future::<ServiceInfos>();
407        let request_id = gen_id();
408
409        let clone_shared_state = future.shared_state.clone();
410
411        add_request_result_handler(request_id, clone_shared_state);
412
413        unsafe {
414            GET_ALL_LOCAL_SERVICE.assume_init_ref()(get_service_id(), request_id, filter_system);
415        }
416
417        future
418    }
419
420    pub fn add_channel_id_to_remote_services(
421        channel_id: i64,
422        remote_services: ServiceInfos,
423    ) -> CommonStatusFuture {
424        let future = build_common_status_future();
425        unsafe {
426            let request_id = gen_id();
427
428            let clone_shared_state = future.shared_state.clone();
429            add_request_status_handler(request_id, clone_shared_state);
430            //
431            let message = remote_services.serial().unwrap();
432            //
433            ADD_CHANNEL_ID_TO_REMOTE_SERVICE.assume_init_ref()(
434                get_service_id(),
435                request_id,
436                channel_id,
437                message.as_ptr(),
438                message.len() as u32,
439            );
440        }
441        future
442    }
443
444    pub fn remove_remote_services_all_channel_id(
445        channel_id: i64,
446        remote_service: ServiceInfo,
447    ) -> CommonStatusFuture {
448        let future = build_common_status_future();
449        unsafe {
450            let request_id = gen_id();
451
452            let clone_shared_state = future.shared_state.clone();
453            add_request_status_handler(request_id, clone_shared_state);
454            //
455            let message = remote_service.serial().unwrap();
456            //
457            REMOVE_REMOTE_SERVICE_ALL_CHANNEL_ID.assume_init_ref()(
458                get_service_id(),
459                request_id,
460                channel_id,
461                message.as_ptr(),
462                message.len() as u32,
463            );
464        }
465        future
466    }
467
468    pub fn set_channel_xrpc_port(channel_id: i64, xprc_port: i32) -> CommonStatusFuture {
469        let future = build_common_status_future();
470        unsafe {
471            let request_id = gen_id();
472
473            let clone_shared_state = future.shared_state.clone();
474            add_request_status_handler(request_id, clone_shared_state);
475            //
476            SET_CHANNEL_XRPC_PORT.assume_init_ref()(
477                get_service_id(),
478                request_id,
479                channel_id,
480                xprc_port,
481            );
482        }
483        future
484    }
485    //
486    pub fn get_xrpc_port() -> Option<i32> {
487        let request_id = gen_id();
488
489        unsafe {
490            let port = GET_XRPC_PORT.assume_init_ref()(get_service_id(), request_id);
491            Some(port)
492        }
493    }
494    //
495
496    pub fn get_all_conn_id() -> ResultApiFuture<ConnIds> {
497        let future = build_result_api_future::<ConnIds>();
498        //
499        let request_id = gen_id();
500
501        let clone_shared_state = future.shared_state.clone();
502        add_request_result_handler(request_id, clone_shared_state);
503        unsafe {
504            GET_ALL_CONN_ID.assume_init_ref()(get_service_id(), request_id);
505        }
506        future
507    }
508    //
509    pub fn get_channel_id_by_conn_id(conn_id: i64) -> ResultApiFuture<ChannelId> {
510        let future = build_result_api_future::<ChannelId>();
511        let request_id = gen_id();
512        let clone_shared_state = future.shared_state.clone();
513        add_request_result_handler(request_id, clone_shared_state);
514        unsafe {
515            GET_CHANNEL_ID_BY_CONN_ID.assume_init_ref()(get_service_id(), request_id, conn_id);
516        }
517
518        future
519    }
520    //
521    pub extern "C" fn event_receiver(
522        event_id: u16,
523        publish_id: i64,
524        buffer: *const u8,
525        buffer_len: u32,
526    ) -> bool {
527        let event_subscriber_map = EVENT_SUBSCRIBER_MAP.read().unwrap();
528        let hanlder = event_subscriber_map.get(&event_id);
529        if hanlder.is_none() {
530            return false;
531        }
532        let hanlder = hanlder.unwrap();
533        hanlder(publish_id, buffer, buffer_len);
534        true
535    }
536    //
537    pub fn subscribe_channel_connected(
538        handler: Box<dyn Fn(EventContext, ChannelEvent) + Send + Sync>,
539    ) {
540        //
541        let mut event_subscriber_map = EVENT_SUBSCRIBER_MAP.write().unwrap();
542
543        //
544        event_subscriber_map.insert(
545            CHANNEL_CONNECTED,
546            Box::new(move |publish_id, buffer, buffer_len| {
547                let vec_buffer =
548                    unsafe { slice::from_raw_parts(buffer as *mut u8, buffer_len as usize) };
549                let mut channel_event = ChannelEvent::default();
550                channel_event.parse_from_bytes(vec_buffer).unwrap();
551
552                let ctx = EventContext::new(publish_id);
553                handler(ctx, channel_event);
554            }),
555        );
556        unsafe {
557            let subscriber_id = SUBSCRIBE.assume_init_ref()(CHANNEL_CONNECTED, 0, event_receiver);
558            let mut subscriber_map = SUBSCRIBERIDMAP.lock().unwrap();
559            subscriber_map.insert(CHANNEL_CONNECTED, subscriber_id);
560        }
561    }
562    //
563    pub fn subscribe_channel_disconnected(
564        handler: Box<dyn Fn(EventContext, ChannelEvent) + Send + Sync>,
565    ) {
566        //
567        let mut event_subscriber_map = EVENT_SUBSCRIBER_MAP.write().unwrap();
568        event_subscriber_map.insert(
569            CHANNEL_DISCONNECTED,
570            Box::new(move |publish_id, buffer, buffer_len| {
571                let vec_buffer =
572                    unsafe { slice::from_raw_parts(buffer as *mut u8, buffer_len as usize) };
573                let mut channel_event = ChannelEvent::default();
574                channel_event.parse_from_bytes(vec_buffer).unwrap();
575
576                let ctx = EventContext::new(publish_id);
577                handler(ctx, channel_event);
578            }),
579        );
580        unsafe {
581            let subscriber_id =
582                SUBSCRIBE.assume_init_ref()(CHANNEL_DISCONNECTED, 0, event_receiver);
583            let mut subscriber_map = SUBSCRIBERIDMAP.lock().unwrap();
584            subscriber_map.insert(CHANNEL_DISCONNECTED, subscriber_id);
585        }
586    }
587    //
588    pub fn subscribe_local_service_on(
589        handler: Box<dyn Fn(EventContext, ServiceInfo) + Send + Sync>,
590    ) {
591        //
592        let mut event_subscriber_map = EVENT_SUBSCRIBER_MAP.write().unwrap();
593        event_subscriber_map.insert(
594            LOCAL_SERVICE_ON,
595            Box::new(move |publish_id, buffer, buffer_len| {
596  
597                let vec_buffer =
598                    unsafe { slice::from_raw_parts(buffer as *mut u8, buffer_len as usize) };
599                let mut service_infos = ServiceInfo::default();
600                service_infos.parse_from_bytes(vec_buffer).unwrap();
601
602                let ctx = EventContext::new(publish_id);
603
604                handler(ctx, service_infos);
605            }),
606        );
607        unsafe {
608            let subscriber_id = SUBSCRIBE.assume_init_ref()(LOCAL_SERVICE_ON, 0, event_receiver);
609            let mut subscriber_map = SUBSCRIBERIDMAP.lock().unwrap();
610            subscriber_map.insert(LOCAL_SERVICE_ON, subscriber_id);
611        }
612    }
613    //
614    pub fn subscribe_local_service_off(
615        handler: Box<dyn Fn(EventContext, ServiceInfo) + Send + Sync>,
616    ) {
617        //
618        let mut event_subscriber_map = EVENT_SUBSCRIBER_MAP.write().unwrap();
619        event_subscriber_map.insert(
620            LOCAL_SERVICE_OFF,
621            Box::new(move |publish_id, buffer, buffer_len| {
622                //
623                let vec_buffer =
624                    unsafe { slice::from_raw_parts(buffer as *mut u8, buffer_len as usize) };
625                let mut service_infos = ServiceInfo::default();
626                service_infos.parse_from_bytes(vec_buffer).unwrap();
627
628                let ctx = EventContext::new(publish_id);
629                handler(ctx, service_infos);
630            }),
631        );
632        unsafe {
633            let subscriber_id = SUBSCRIBE.assume_init_ref()(LOCAL_SERVICE_OFF, 0, event_receiver);
634            let mut subscriber_map = SUBSCRIBERIDMAP.lock().unwrap();
635            subscriber_map.insert(LOCAL_SERVICE_OFF, subscriber_id);
636        }
637    }
638
639    //
640    pub fn subscribe_message_in(
641        handler: Box<dyn Fn(EventContext, VerifyInfo) -> bool + Send + Sync>,
642    ) {
643        //
644        let mut event_subscriber_map = EVENT_SUBSCRIBER_MAP.write().unwrap();
645        event_subscriber_map.insert(
646            MESSAGE_IN,
647            Box::new(move |publish_id, buffer, buffer_len| {
648                let vec_buffer =
649                    unsafe { slice::from_raw_parts(buffer as *mut u8, buffer_len as usize) };
650                let mut verify_info = VerifyInfo::default();
651                verify_info.parse_from_bytes(vec_buffer).unwrap();
652                let ctx = EventContext::new(publish_id);
653                handler(ctx, verify_info);
654            }),
655        );
656        unsafe {
657            let subscriber_id = SUBSCRIBE.assume_init_ref()(MESSAGE_IN, 0, event_receiver);
658            let mut subscriber_map = SUBSCRIBERIDMAP.lock().unwrap();
659            subscriber_map.insert(MESSAGE_IN, subscriber_id);
660        }
661    }
662
663    pub fn unsubscribe_all() {
664        unsafe {
665            let mut subscriber_map = SUBSCRIBERIDMAP.lock().unwrap();
666
667            for (event_id, subscriber_id) in subscriber_map.iter() {
668                UNSUBSCRIBE.assume_init_ref()(*event_id, *subscriber_id);
669            }
670
671            subscriber_map.clear();
672        }
673    }
674}