1use std::{
4 future::Future,
5 sync::{Arc, Mutex},
6 task::{Poll, Waker},
7};
8
9use crate::x_core::{add_request_handler, gen_id};
10use tracing::info;
11use xCommonLib::{
12 base::status::Status,
13 serial::{self, request_message::RequestMessage},
14};
15
16struct CommonApiStatus {
17 is_finish: bool,
18 status: Status,
19 waker: Option<Waker>,
20}
21
22pub struct CommonStatusFuture {
23 shared_state: Arc<Mutex<CommonApiStatus>>,
24}
25
26impl Future for CommonStatusFuture {
27 type Output = Status;
28 fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
29 let mut shared_state = self.shared_state.lock().unwrap();
30 if shared_state.is_finish {
31 let err_msg = std::mem::take(&mut shared_state.status.err_msg);
32 return Poll::Ready(Status::new(shared_state.status.err_code, err_msg));
33 }
34 shared_state.waker = Some(cx.waker().clone());
35 Poll::Pending
36 }
37}
38
39fn build_common_status_future() -> CommonStatusFuture {
40 let shared_state = Arc::new(Mutex::new(CommonApiStatus {
41 is_finish: false,
42 status: Status::default(),
43 waker: None,
44 }));
45 let future = CommonStatusFuture { shared_state };
46
47 future
48}
49
50fn add_request_status_handler(request_id: i64, clone_shared_state: Arc<Mutex<CommonApiStatus>>) {
51 add_request_handler(
52 request_id,
53 Box::new(move |buffer| {
54 let waker = {
55 let mut shared_state = clone_shared_state.lock().unwrap();
56 let waker = shared_state.waker.take();
57 if waker.is_none() {
58 return;
59 }
60 shared_state
61 .status
62 .parse_from_bytes_return_num(buffer)
63 .unwrap();
64 shared_state.is_finish = true;
65 waker
66 };
67
68 waker.unwrap().wake_by_ref();
69 }),
70 );
71}
72
73struct ResultApiState<T> {
74 is_finish: bool,
75 status: Status,
76 value: Option<T>,
77 waker: Option<Waker>,
78}
79
80pub struct ResultApiFuture<T> {
81 shared_state: Arc<Mutex<ResultApiState<T>>>,
82}
83
84impl<T: Send + Sync> Future for ResultApiFuture<T> {
86 type Output = Result<T, Status>;
87 fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
88 let mut shared_state = self.shared_state.lock().unwrap();
89 if shared_state.is_finish {
90 if shared_state.status.is_erorr() {
91 let status = std::mem::take(&mut shared_state.status);
92 return Poll::Ready(Err(status));
93 } else {
94 let value = std::mem::take(&mut shared_state.value);
95 return Poll::Ready(Ok(value.unwrap()));
96 }
97 }
98 shared_state.waker = Some(cx.waker().clone());
99 Poll::Pending
100 }
101}
102
103pub struct ExistSharedState {
106 is_finish: bool,
107 is_exist: bool,
108 waker: Option<Waker>,
109}
110
111pub struct ExistFuture {
112 shared_state: Arc<Mutex<ExistSharedState>>,
113}
114
115impl Future for ExistFuture {
116 type Output = bool;
117 fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
118 let mut shared_state = self.shared_state.lock().unwrap();
120 if shared_state.is_finish {
121 return Poll::Ready(shared_state.is_exist);
122 }
123 shared_state.waker = Some(cx.waker().clone());
124 Poll::Pending
125 }
126}
127
128pub fn new_exist_future() -> ExistFuture {
129 let shared_state = Arc::new(Mutex::new(ExistSharedState {
130 is_finish: false,
131 is_exist: false,
132 waker: None,
133 }));
134
135 let future = ExistFuture { shared_state };
136
137 future
138}
139
140pub fn add_exist_request(shared_state: Arc<Mutex<ExistSharedState>>) -> i64 {
142 let request_id = gen_id();
143 let clone_shared_state = shared_state.clone();
144 add_request_handler(
145 request_id,
146 Box::new(move |_buffer| {
147 let waker = {
148 let mut shared_state = clone_shared_state.lock().unwrap();
149 let waker = shared_state.waker.take();
151 if waker.is_none() {
152 return;
153 }
154 let is_exist = serial::read_bool(_buffer);
155 shared_state.is_exist = is_exist;
156 shared_state.is_finish = true;
157 waker
158 };
159 waker.unwrap().wake_by_ref();
160 }),
161 );
162 request_id
163}
164
165fn build_result_api_future<T>() -> ResultApiFuture<T>
166where
167 T: RequestMessage + Default + 'static,
168{
169 let shared_state = Arc::new(Mutex::new(ResultApiState::<T> {
170 is_finish: false,
171 status: Status::default(),
172 waker: None,
173 value: None,
174 }));
175 let future = ResultApiFuture::<T> { shared_state };
176
177 future
178}
179
180fn add_request_result_handler<T>(request_id: i64, clone_shared_state: Arc<Mutex<ResultApiState<T>>>)
181where
182 T: RequestMessage + Default + 'static,
183{
184 add_request_handler(
185 request_id,
186 Box::new(move |buffer| {
187 let waker = {
188 let mut shared_state = clone_shared_state.lock().unwrap();
189 let waker = shared_state.waker.take();
191 if waker.is_none() {
192 return;
193 }
194 let read_bytes = shared_state
196 .status
197 .parse_from_bytes_return_num(buffer)
198 .unwrap();
199 if read_bytes != buffer.len() as u64 {
200 let mut value = T::default();
201 let bytes = &buffer[read_bytes as usize..];
202 value.parse_from_bytes_return_num(bytes).unwrap();
203 shared_state.value = Some(value);
204 }
205 shared_state.is_finish = true;
206 waker
207 };
208 waker.unwrap().wake_by_ref();
209 }),
210 );
211}
212
213pub mod xport {
216 use super::{
217 add_exist_request, add_request_result_handler, add_request_status_handler,
218 build_common_status_future, build_result_api_future, new_exist_future, CommonStatusFuture,
219 ExistFuture, ResultApiFuture,
220 };
221 use crate::x_core::{
222 gen_id, get_service_id, ADD_CHANNEL_ID_TO_REMOTE_SERVICE, BUILD_CHANNEL_API,
223 EVENT_RESPONSE_FUNC_API, GET_ALL_CONN_ID, GET_ALL_LOCAL_SERVICE, GET_CHANNEL_ID_BY_CONN_ID,
224 GET_XRPC_PORT, HAS_SERVICE_API, LOAD_SERVICE_API, REMOVE_REMOTE_SERVICE_ALL_CHANNEL_ID,
225 SEND_MESSAGE_API, SET_CHANNEL_XRPC_PORT, SLEEP_API, SUBSCRIBE, UNLOAD_SERVICE_API,
226 UNSUBSCRIBE,
227 };
228 use core::slice;
229 use std::collections::HashMap;
230 use std::ptr::null;
231 use std::sync::{Mutex, RwLock};
232 use tracing::info;
233 use xCommonLib::base::dll_api::event::CHANNEL_DISCONNECTED;
234 use xCommonLib::base::dll_api::event::LOCAL_SERVICE_OFF;
235 use xCommonLib::base::dll_api::event::LOCAL_SERVICE_ON;
236 use xCommonLib::base::dll_api::event::{CHANNEL_CONNECTED, MESSAGE_IN};
237
238 use lazy_static::lazy_static;
239 use xCommonLib::serial::request_message::RequestMessage;
240 use xCommonLib::service::sys_service_api::{
241 ChannelEvent, ChannelId, ConnIds, LoadServiceRequest, ServiceInfo, ServiceInfos,
242 ServiceKey, UnloadServiceRequest, VerifyInfo,
243 };
244
245 lazy_static! {
246 static ref SUBSCRIBERIDMAP:Mutex<HashMap<u16, i64>> = Mutex::new(HashMap::new());
247 static ref EVENT_SUBSCRIBER_MAP: RwLock<HashMap<u16, Box<dyn Fn(i64, *const u8, u32) + Send + Sync>>> = RwLock::new(HashMap::new());
249 }
250
251 pub struct EventContext {
252 publish_id: i64,
253 is_resp: bool,
254 }
255
256 impl EventContext {
257 pub fn new(publish_id: i64) -> Self {
258 EventContext {
259 publish_id,
260 is_resp: false,
261 }
262 }
263
264 pub fn resp<T>(mut self, t: T)
265 where
266 T: RequestMessage,
267 {
268 let buffer = t.serial().unwrap();
269 unsafe {
270 EVENT_RESPONSE_FUNC_API.assume_init_ref()(
271 self.publish_id,
272 buffer.as_ptr(),
273 buffer.len() as u32,
274 );
275 }
276 self.is_resp = true;
277 }
278 }
279
280 impl Drop for EventContext {
281 fn drop(&mut self) {
282 if self.is_resp {
283 return;
284 }
285 unsafe {
286 EVENT_RESPONSE_FUNC_API.assume_init_ref()(self.publish_id, null(), 0);
287 }
288 }
289 }
290
291 pub fn send_message(
295 request_id: i64,
296 receiver_service_key: ServiceKey,
297 channel_id: i64,
298 buffer: Vec<u8>,
299 ) {
300 let receiver_buffer = receiver_service_key.serial().unwrap();
301 unsafe {
302 SEND_MESSAGE_API.assume_init_ref()(
303 get_service_id(),
304 request_id,
305 receiver_buffer.as_ptr(),
306 receiver_buffer.len() as u32,
307 channel_id,
308 buffer.as_ptr(),
309 buffer.len() as u32,
310 );
311 }
312 }
313
314 pub fn sleep(milliseconds: u64) -> CommonStatusFuture {
318 let future = build_common_status_future();
319
320 let request_id = gen_id();
321
322 let clone_shared_state = future.shared_state.clone();
323 add_request_status_handler(request_id, clone_shared_state);
324 unsafe {
326 SLEEP_API.assume_init_ref()(get_service_id(), request_id, milliseconds);
327 }
328 future
329 }
330
331 pub fn load_service(param: &LoadServiceRequest) -> CommonStatusFuture {
332 let future = build_common_status_future();
334 let request_id = gen_id();
336 let clone_shared_state = future.shared_state.clone();
338 add_request_status_handler(request_id, clone_shared_state);
339 let message = param.serial().unwrap();
340 unsafe {
342 LOAD_SERVICE_API.assume_init_ref()(
343 get_service_id(),
344 request_id,
345 message.as_ptr(),
346 message.len() as u32,
347 );
348 };
349 future
350 }
351
352 pub fn unload_service(param: &UnloadServiceRequest) -> CommonStatusFuture {
353 let future = build_common_status_future();
355 let request_id = gen_id();
356 let clone_shared_state = future.shared_state.clone();
357 add_request_status_handler(request_id, clone_shared_state);
358 let message = param.serial().unwrap();
359 unsafe {
361 UNLOAD_SERVICE_API.assume_init_ref()(
362 get_service_id(),
363 request_id,
364 message.as_ptr(),
365 message.len() as u32,
366 );
367 }
368 future
369 }
370
371 pub fn has_service(service_key: &ServiceKey) -> ExistFuture {
373 let future = new_exist_future();
374
375 let clone_shared_state = future.shared_state.clone();
376
377 let request_id = add_exist_request(clone_shared_state);
378
379 let message = service_key.serial().unwrap();
380 unsafe {
382 HAS_SERVICE_API.assume_init_ref()(
383 get_service_id(),
384 request_id,
385 message.as_ptr(),
386 message.len() as u32,
387 );
388 };
389 future
390 }
391
392 pub fn build_channel(node_id: i64) -> CommonStatusFuture {
393 let future = build_common_status_future();
394 unsafe {
395 let request_id = gen_id();
396
397 let clone_shared_state = future.shared_state.clone();
398 add_request_status_handler(request_id, clone_shared_state);
399 BUILD_CHANNEL_API.assume_init_ref()(get_service_id(), request_id, node_id);
401 }
402 future
403 }
404
405 pub fn get_all_local_service(filter_system: bool) -> ResultApiFuture<ServiceInfos> {
406 let future = build_result_api_future::<ServiceInfos>();
407 let request_id = gen_id();
408
409 let clone_shared_state = future.shared_state.clone();
410
411 add_request_result_handler(request_id, clone_shared_state);
412
413 unsafe {
414 GET_ALL_LOCAL_SERVICE.assume_init_ref()(get_service_id(), request_id, filter_system);
415 }
416
417 future
418 }
419
420 pub fn add_channel_id_to_remote_services(
421 channel_id: i64,
422 remote_services: ServiceInfos,
423 ) -> CommonStatusFuture {
424 let future = build_common_status_future();
425 unsafe {
426 let request_id = gen_id();
427
428 let clone_shared_state = future.shared_state.clone();
429 add_request_status_handler(request_id, clone_shared_state);
430 let message = remote_services.serial().unwrap();
432 ADD_CHANNEL_ID_TO_REMOTE_SERVICE.assume_init_ref()(
434 get_service_id(),
435 request_id,
436 channel_id,
437 message.as_ptr(),
438 message.len() as u32,
439 );
440 }
441 future
442 }
443
444 pub fn remove_remote_services_all_channel_id(
445 channel_id: i64,
446 remote_service: ServiceInfo,
447 ) -> CommonStatusFuture {
448 let future = build_common_status_future();
449 unsafe {
450 let request_id = gen_id();
451
452 let clone_shared_state = future.shared_state.clone();
453 add_request_status_handler(request_id, clone_shared_state);
454 let message = remote_service.serial().unwrap();
456 REMOVE_REMOTE_SERVICE_ALL_CHANNEL_ID.assume_init_ref()(
458 get_service_id(),
459 request_id,
460 channel_id,
461 message.as_ptr(),
462 message.len() as u32,
463 );
464 }
465 future
466 }
467
468 pub fn set_channel_xrpc_port(channel_id: i64, xprc_port: i32) -> CommonStatusFuture {
469 let future = build_common_status_future();
470 unsafe {
471 let request_id = gen_id();
472
473 let clone_shared_state = future.shared_state.clone();
474 add_request_status_handler(request_id, clone_shared_state);
475 SET_CHANNEL_XRPC_PORT.assume_init_ref()(
477 get_service_id(),
478 request_id,
479 channel_id,
480 xprc_port,
481 );
482 }
483 future
484 }
485 pub fn get_xrpc_port() -> Option<i32> {
487 let request_id = gen_id();
488
489 unsafe {
490 let port = GET_XRPC_PORT.assume_init_ref()(get_service_id(), request_id);
491 Some(port)
492 }
493 }
494 pub fn get_all_conn_id() -> ResultApiFuture<ConnIds> {
497 let future = build_result_api_future::<ConnIds>();
498 let request_id = gen_id();
500
501 let clone_shared_state = future.shared_state.clone();
502 add_request_result_handler(request_id, clone_shared_state);
503 unsafe {
504 GET_ALL_CONN_ID.assume_init_ref()(get_service_id(), request_id);
505 }
506 future
507 }
508 pub fn get_channel_id_by_conn_id(conn_id: i64) -> ResultApiFuture<ChannelId> {
510 let future = build_result_api_future::<ChannelId>();
511 let request_id = gen_id();
512 let clone_shared_state = future.shared_state.clone();
513 add_request_result_handler(request_id, clone_shared_state);
514 unsafe {
515 GET_CHANNEL_ID_BY_CONN_ID.assume_init_ref()(get_service_id(), request_id, conn_id);
516 }
517
518 future
519 }
520 pub extern "C" fn event_receiver(
522 event_id: u16,
523 publish_id: i64,
524 buffer: *const u8,
525 buffer_len: u32,
526 ) -> bool {
527 let event_subscriber_map = EVENT_SUBSCRIBER_MAP.read().unwrap();
528 let hanlder = event_subscriber_map.get(&event_id);
529 if hanlder.is_none() {
530 return false;
531 }
532 let hanlder = hanlder.unwrap();
533 hanlder(publish_id, buffer, buffer_len);
534 true
535 }
536 pub fn subscribe_channel_connected(
538 handler: Box<dyn Fn(EventContext, ChannelEvent) + Send + Sync>,
539 ) {
540 let mut event_subscriber_map = EVENT_SUBSCRIBER_MAP.write().unwrap();
542
543 event_subscriber_map.insert(
545 CHANNEL_CONNECTED,
546 Box::new(move |publish_id, buffer, buffer_len| {
547 let vec_buffer =
548 unsafe { slice::from_raw_parts(buffer as *mut u8, buffer_len as usize) };
549 let mut channel_event = ChannelEvent::default();
550 channel_event.parse_from_bytes(vec_buffer).unwrap();
551
552 let ctx = EventContext::new(publish_id);
553 handler(ctx, channel_event);
554 }),
555 );
556 unsafe {
557 let subscriber_id = SUBSCRIBE.assume_init_ref()(CHANNEL_CONNECTED, 0, event_receiver);
558 let mut subscriber_map = SUBSCRIBERIDMAP.lock().unwrap();
559 subscriber_map.insert(CHANNEL_CONNECTED, subscriber_id);
560 }
561 }
562 pub fn subscribe_channel_disconnected(
564 handler: Box<dyn Fn(EventContext, ChannelEvent) + Send + Sync>,
565 ) {
566 let mut event_subscriber_map = EVENT_SUBSCRIBER_MAP.write().unwrap();
568 event_subscriber_map.insert(
569 CHANNEL_DISCONNECTED,
570 Box::new(move |publish_id, buffer, buffer_len| {
571 let vec_buffer =
572 unsafe { slice::from_raw_parts(buffer as *mut u8, buffer_len as usize) };
573 let mut channel_event = ChannelEvent::default();
574 channel_event.parse_from_bytes(vec_buffer).unwrap();
575
576 let ctx = EventContext::new(publish_id);
577 handler(ctx, channel_event);
578 }),
579 );
580 unsafe {
581 let subscriber_id =
582 SUBSCRIBE.assume_init_ref()(CHANNEL_DISCONNECTED, 0, event_receiver);
583 let mut subscriber_map = SUBSCRIBERIDMAP.lock().unwrap();
584 subscriber_map.insert(CHANNEL_DISCONNECTED, subscriber_id);
585 }
586 }
587 pub fn subscribe_local_service_on(
589 handler: Box<dyn Fn(EventContext, ServiceInfo) + Send + Sync>,
590 ) {
591 let mut event_subscriber_map = EVENT_SUBSCRIBER_MAP.write().unwrap();
593 event_subscriber_map.insert(
594 LOCAL_SERVICE_ON,
595 Box::new(move |publish_id, buffer, buffer_len| {
596
597 let vec_buffer =
598 unsafe { slice::from_raw_parts(buffer as *mut u8, buffer_len as usize) };
599 let mut service_infos = ServiceInfo::default();
600 service_infos.parse_from_bytes(vec_buffer).unwrap();
601
602 let ctx = EventContext::new(publish_id);
603
604 handler(ctx, service_infos);
605 }),
606 );
607 unsafe {
608 let subscriber_id = SUBSCRIBE.assume_init_ref()(LOCAL_SERVICE_ON, 0, event_receiver);
609 let mut subscriber_map = SUBSCRIBERIDMAP.lock().unwrap();
610 subscriber_map.insert(LOCAL_SERVICE_ON, subscriber_id);
611 }
612 }
613 pub fn subscribe_local_service_off(
615 handler: Box<dyn Fn(EventContext, ServiceInfo) + Send + Sync>,
616 ) {
617 let mut event_subscriber_map = EVENT_SUBSCRIBER_MAP.write().unwrap();
619 event_subscriber_map.insert(
620 LOCAL_SERVICE_OFF,
621 Box::new(move |publish_id, buffer, buffer_len| {
622 let vec_buffer =
624 unsafe { slice::from_raw_parts(buffer as *mut u8, buffer_len as usize) };
625 let mut service_infos = ServiceInfo::default();
626 service_infos.parse_from_bytes(vec_buffer).unwrap();
627
628 let ctx = EventContext::new(publish_id);
629 handler(ctx, service_infos);
630 }),
631 );
632 unsafe {
633 let subscriber_id = SUBSCRIBE.assume_init_ref()(LOCAL_SERVICE_OFF, 0, event_receiver);
634 let mut subscriber_map = SUBSCRIBERIDMAP.lock().unwrap();
635 subscriber_map.insert(LOCAL_SERVICE_OFF, subscriber_id);
636 }
637 }
638
639 pub fn subscribe_message_in(
641 handler: Box<dyn Fn(EventContext, VerifyInfo) -> bool + Send + Sync>,
642 ) {
643 let mut event_subscriber_map = EVENT_SUBSCRIBER_MAP.write().unwrap();
645 event_subscriber_map.insert(
646 MESSAGE_IN,
647 Box::new(move |publish_id, buffer, buffer_len| {
648 let vec_buffer =
649 unsafe { slice::from_raw_parts(buffer as *mut u8, buffer_len as usize) };
650 let mut verify_info = VerifyInfo::default();
651 verify_info.parse_from_bytes(vec_buffer).unwrap();
652 let ctx = EventContext::new(publish_id);
653 handler(ctx, verify_info);
654 }),
655 );
656 unsafe {
657 let subscriber_id = SUBSCRIBE.assume_init_ref()(MESSAGE_IN, 0, event_receiver);
658 let mut subscriber_map = SUBSCRIBERIDMAP.lock().unwrap();
659 subscriber_map.insert(MESSAGE_IN, subscriber_id);
660 }
661 }
662
663 pub fn unsubscribe_all() {
664 unsafe {
665 let mut subscriber_map = SUBSCRIBERIDMAP.lock().unwrap();
666
667 for (event_id, subscriber_id) in subscriber_map.iter() {
668 UNSUBSCRIBE.assume_init_ref()(*event_id, *subscriber_id);
669 }
670
671 subscriber_map.clear();
672 }
673 }
674}