use std::{
future::Future,
sync::{Arc, Mutex},
task::{Poll, Waker},
};
use crate::x_core::{add_request_handler, gen_id};
use xCommonLib::{
base::status::Status,
serial::{self, request_message::RequestMessage},
};
struct CommonApiStatus {
is_finish: bool,
status: Status,
waker: Option<Waker>,
}
pub struct CommonStatusFuture {
shared_state: Arc<Mutex<CommonApiStatus>>,
}
impl Future for CommonStatusFuture {
type Output = Status;
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let mut shared_state = self.shared_state.lock().unwrap();
if shared_state.is_finish {
let err_msg = std::mem::take(&mut shared_state.status.err_msg);
return Poll::Ready(Status::new(shared_state.status.err_code, err_msg));
}
shared_state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
fn build_common_status_future() -> CommonStatusFuture {
let shared_state = Arc::new(Mutex::new(CommonApiStatus {
is_finish: false,
status: Status::default(),
waker: None,
}));
let future = CommonStatusFuture { shared_state };
future
}
fn add_request_status_handler(request_id: i64, clone_shared_state: Arc<Mutex<CommonApiStatus>>) {
add_request_handler(
request_id,
Box::new(move |buffer| {
let waker = {
let mut shared_state = clone_shared_state.lock().unwrap();
let waker = shared_state.waker.take();
if waker.is_none() {
return;
}
shared_state
.status
.parse_from_bytes_return_num(buffer)
.unwrap();
shared_state.is_finish = true;
waker
};
waker.unwrap().wake_by_ref();
}),
);
}
struct ResultApiState<T> {
is_finish: bool,
status: Status,
value: Option<T>,
waker: Option<Waker>,
}
pub struct ResultApiFuture<T> {
shared_state: Arc<Mutex<ResultApiState<T>>>,
}
impl<T: Send + Sync> Future for ResultApiFuture<T> {
type Output = Result<T, Status>;
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let mut shared_state = self.shared_state.lock().unwrap();
if shared_state.is_finish {
if shared_state.status.is_erorr() {
let status = std::mem::take(&mut shared_state.status);
return Poll::Ready(Err(status));
} else {
let value = std::mem::take(&mut shared_state.value);
return Poll::Ready(Ok(value.unwrap()));
}
}
shared_state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
pub struct ExistSharedState {
is_finish: bool,
is_exist: bool,
waker: Option<Waker>,
}
pub struct ExistFuture {
shared_state: Arc<Mutex<ExistSharedState>>,
}
impl Future for ExistFuture {
type Output = bool;
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let mut shared_state = self.shared_state.lock().unwrap();
if shared_state.is_finish {
return Poll::Ready(shared_state.is_exist);
}
shared_state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
pub fn new_exist_future() -> ExistFuture {
let shared_state = Arc::new(Mutex::new(ExistSharedState {
is_finish: false,
is_exist: false,
waker: None,
}));
let future = ExistFuture { shared_state };
future
}
pub fn add_exist_request(shared_state: Arc<Mutex<ExistSharedState>>) -> i64 {
let request_id = gen_id();
let clone_shared_state = shared_state.clone();
add_request_handler(
request_id,
Box::new(move |_buffer| {
let waker = {
let mut shared_state = clone_shared_state.lock().unwrap();
let waker = shared_state.waker.take();
if waker.is_none() {
return;
}
let is_exist = serial::read_bool(_buffer);
shared_state.is_exist = is_exist;
shared_state.is_finish = true;
waker
};
waker.unwrap().wake_by_ref();
}),
);
request_id
}
fn build_result_api_future<T>() -> ResultApiFuture<T>
where
T: RequestMessage + Default + 'static,
{
let shared_state = Arc::new(Mutex::new(ResultApiState::<T> {
is_finish: false,
status: Status::default(),
waker: None,
value: None,
}));
let future = ResultApiFuture::<T> { shared_state };
future
}
fn add_request_result_handler<T>(request_id: i64, clone_shared_state: Arc<Mutex<ResultApiState<T>>>)
where
T: RequestMessage + Default + 'static,
{
add_request_handler(
request_id,
Box::new(move |buffer| {
let waker = {
let mut shared_state = clone_shared_state.lock().unwrap();
let waker = shared_state.waker.take();
if waker.is_none() {
return;
}
let read_bytes = shared_state
.status
.parse_from_bytes_return_num(buffer)
.unwrap();
if read_bytes != buffer.len() as u64 {
let mut value = T::default();
let bytes = &buffer[read_bytes as usize..];
value.parse_from_bytes_return_num(bytes).unwrap();
shared_state.value = Some(value);
}
shared_state.is_finish = true;
waker
};
waker.unwrap().wake_by_ref();
}),
);
}
pub mod xport {
use super::{
add_exist_request, add_request_result_handler, add_request_status_handler,
build_common_status_future, build_result_api_future, new_exist_future, CommonStatusFuture,
ExistFuture, ResultApiFuture,
};
use crate::x_core::{
gen_id, get_service_id, ADD_CHANNEL_ID_TO_REMOTE_SERVICE, BUILD_CHANNEL_API,
GET_ALL_CONN_ID, GET_ALL_LOCAL_SERVICE, GET_CHANNEL_ID_BY_CONN_ID, GET_XRPC_PORT,
HAS_SERVICE_API, LOAD_SERVICE_API, REMOVE_REMOTE_SERVICE_ALL_CHANNEL_ID, SEND_MESSAGE_API,
SET_CHANNEL_XRPC_PORT, SLEEP_API, SUBSCRIBE, UNLOAD_SERVICE_API, UNSUBSCRIBE,
};
use core::slice;
use std::collections::HashMap;
use std::sync::{Mutex, RwLock};
use xCommonLib::base::dll_api::event::CHANNEL_CONNECTED;
use xCommonLib::base::dll_api::event::CHANNEL_DISCONNECTED;
use xCommonLib::base::dll_api::event::LOCAL_SERVICE_OFF;
use xCommonLib::base::dll_api::event::LOCAL_SERVICE_ON;
use lazy_static::lazy_static;
use xCommonLib::serial::request_message::RequestMessage;
use xCommonLib::service::sys_service_api::{
ChannelEvent, ChannelId, ConnIds, LoadServiceRequest, ServiceInfo, ServiceInfos,
ServiceKey, UnloadServiceRequest,
};
lazy_static! {
static ref SUBSCRIBERIDMAP:Mutex<HashMap<u16, i64>> = Mutex::new(HashMap::new());
static ref EVENT_SUBSCRIBER_MAP: RwLock<HashMap<u16, Box<dyn Fn(*const u8, u32) + Send + Sync>>> = RwLock::new(HashMap::new());
}
pub fn send_message(
request_id: i64,
receiver_service_key: ServiceKey,
channel_id: i64,
buffer: Vec<u8>,
) {
let receiver_buffer = receiver_service_key.serial().unwrap();
unsafe {
SEND_MESSAGE_API.assume_init_ref()(
get_service_id(),
request_id,
receiver_buffer.as_ptr(),
receiver_buffer.len() as u32,
channel_id,
buffer.as_ptr(),
buffer.len() as u32,
);
}
}
pub fn sleep(milliseconds: u64) -> CommonStatusFuture {
let future = build_common_status_future();
let request_id = gen_id();
let clone_shared_state = future.shared_state.clone();
add_request_status_handler(request_id, clone_shared_state);
unsafe {
SLEEP_API.assume_init_ref()(get_service_id(), request_id, milliseconds);
}
future
}
pub fn load_service(param: &LoadServiceRequest) -> CommonStatusFuture {
let future = build_common_status_future();
let request_id = gen_id();
let clone_shared_state = future.shared_state.clone();
add_request_status_handler(request_id, clone_shared_state);
let message = param.serial().unwrap();
unsafe {
LOAD_SERVICE_API.assume_init_ref()(
get_service_id(),
request_id,
message.as_ptr(),
message.len() as u32,
);
};
future
}
pub fn unload_service(param: &UnloadServiceRequest) -> CommonStatusFuture {
let future = build_common_status_future();
let request_id = gen_id();
let clone_shared_state = future.shared_state.clone();
add_request_status_handler(request_id, clone_shared_state);
let message = param.serial().unwrap();
unsafe {
UNLOAD_SERVICE_API.assume_init_ref()(
get_service_id(),
request_id,
message.as_ptr(),
message.len() as u32,
);
}
future
}
pub fn has_service(service_key: &ServiceKey) -> ExistFuture {
let future = new_exist_future();
let clone_shared_state = future.shared_state.clone();
let request_id = add_exist_request(clone_shared_state);
let message = service_key.serial().unwrap();
unsafe {
HAS_SERVICE_API.assume_init_ref()(
get_service_id(),
request_id,
message.as_ptr(),
message.len() as u32,
);
};
future
}
pub fn build_channel(node_id: i64) -> CommonStatusFuture {
let future = build_common_status_future();
unsafe {
let request_id = gen_id();
let clone_shared_state = future.shared_state.clone();
add_request_status_handler(request_id, clone_shared_state);
BUILD_CHANNEL_API.assume_init_ref()(get_service_id(), request_id, node_id);
}
future
}
pub fn get_all_local_service(filter_system: bool) -> ResultApiFuture<ServiceInfos> {
let future = build_result_api_future::<ServiceInfos>();
let request_id = gen_id();
let clone_shared_state = future.shared_state.clone();
add_request_result_handler(request_id, clone_shared_state);
unsafe {
GET_ALL_LOCAL_SERVICE.assume_init_ref()(get_service_id(), request_id, filter_system);
}
future
}
pub fn add_channel_id_to_remote_services(
channel_id: i64,
remote_services: ServiceInfos,
) -> CommonStatusFuture {
let future = build_common_status_future();
unsafe {
let request_id = gen_id();
let clone_shared_state = future.shared_state.clone();
add_request_status_handler(request_id, clone_shared_state);
let message = remote_services.serial().unwrap();
ADD_CHANNEL_ID_TO_REMOTE_SERVICE.assume_init_ref()(
get_service_id(),
request_id,
channel_id,
message.as_ptr(),
message.len() as u32,
);
}
future
}
pub fn remove_remote_services_all_channel_id(
channel_id: i64,
remote_service: ServiceInfo,
) -> CommonStatusFuture {
let future = build_common_status_future();
unsafe {
let request_id = gen_id();
let clone_shared_state = future.shared_state.clone();
add_request_status_handler(request_id, clone_shared_state);
let message = remote_service.serial().unwrap();
REMOVE_REMOTE_SERVICE_ALL_CHANNEL_ID.assume_init_ref()(
get_service_id(),
request_id,
channel_id,
message.as_ptr(),
message.len() as u32,
);
}
future
}
pub fn set_channel_xrpc_port(channel_id: i64, xprc_port: i32) -> CommonStatusFuture {
let future = build_common_status_future();
unsafe {
let request_id = gen_id();
let clone_shared_state = future.shared_state.clone();
add_request_status_handler(request_id, clone_shared_state);
SET_CHANNEL_XRPC_PORT.assume_init_ref()(
get_service_id(),
request_id,
channel_id,
xprc_port,
);
}
future
}
pub fn get_xrpc_port() -> Option<i32> {
let request_id = gen_id();
unsafe {
let port = GET_XRPC_PORT.assume_init_ref()(get_service_id(), request_id);
Some(port)
}
}
pub fn get_all_conn_id() -> ResultApiFuture<ConnIds> {
let future = build_result_api_future::<ConnIds>();
let request_id = gen_id();
let clone_shared_state = future.shared_state.clone();
add_request_result_handler(request_id, clone_shared_state);
unsafe {
GET_ALL_CONN_ID.assume_init_ref()(get_service_id(), request_id);
}
future
}
pub fn get_channel_id_by_conn_id(conn_id: i64) -> ResultApiFuture<ChannelId> {
let future = build_result_api_future::<ChannelId>();
let request_id = gen_id();
let clone_shared_state = future.shared_state.clone();
add_request_result_handler(request_id, clone_shared_state);
unsafe {
GET_CHANNEL_ID_BY_CONN_ID.assume_init_ref()(get_service_id(), request_id, conn_id);
}
future
}
pub extern "C" fn event_receiver(event_id: u16, buffer: *const u8, buffer_len: u32) -> bool {
let event_subscriber_map = EVENT_SUBSCRIBER_MAP.read().unwrap();
let hanlder = event_subscriber_map.get(&event_id);
if hanlder.is_none() {
return false;
}
let hanlder = hanlder.unwrap();
hanlder(buffer, buffer_len);
true
}
pub fn subscribe_channel_connected(handler: Box<dyn Fn(ChannelEvent) + Send + Sync>) {
let mut event_subscriber_map = EVENT_SUBSCRIBER_MAP.write().unwrap();
event_subscriber_map.insert(
CHANNEL_CONNECTED,
Box::new(move |buffer: *const u8, buffer_len: u32| {
let vec_buffer =
unsafe { slice::from_raw_parts(buffer as *mut u8, buffer_len as usize) };
let mut channel_event = ChannelEvent::default();
channel_event.parse_from_bytes(vec_buffer).unwrap();
handler(channel_event);
}),
);
unsafe {
let subscriber_id = SUBSCRIBE.assume_init_ref()(CHANNEL_CONNECTED, 0, event_receiver);
let mut subscriber_map = SUBSCRIBERIDMAP.lock().unwrap();
subscriber_map.insert(CHANNEL_CONNECTED, subscriber_id);
}
}
pub fn subscribe_channel_disconnected(handler: Box<dyn Fn(ChannelEvent) + Send + Sync>) {
let mut event_subscriber_map = EVENT_SUBSCRIBER_MAP.write().unwrap();
event_subscriber_map.insert(
CHANNEL_DISCONNECTED,
Box::new(move |buffer: *const u8, buffer_len: u32| {
let vec_buffer =
unsafe { slice::from_raw_parts(buffer as *mut u8, buffer_len as usize) };
let mut channel_event = ChannelEvent::default();
channel_event.parse_from_bytes(vec_buffer).unwrap();
handler(channel_event);
}),
);
unsafe {
let subscriber_id =
SUBSCRIBE.assume_init_ref()(CHANNEL_DISCONNECTED, 0, event_receiver);
let mut subscriber_map = SUBSCRIBERIDMAP.lock().unwrap();
subscriber_map.insert(CHANNEL_DISCONNECTED, subscriber_id);
}
}
pub fn subscribe_local_service_on(handler: Box<dyn Fn(ServiceInfo) + Send + Sync>) {
let mut event_subscriber_map = EVENT_SUBSCRIBER_MAP.write().unwrap();
event_subscriber_map.insert(
LOCAL_SERVICE_ON,
Box::new(move |buffer: *const u8, buffer_len: u32| {
let vec_buffer =
unsafe { slice::from_raw_parts(buffer as *mut u8, buffer_len as usize) };
let mut service_infos = ServiceInfo::default();
service_infos.parse_from_bytes(vec_buffer).unwrap();
handler(service_infos);
}),
);
unsafe {
let subscriber_id = SUBSCRIBE.assume_init_ref()(LOCAL_SERVICE_ON, 0, event_receiver);
let mut subscriber_map = SUBSCRIBERIDMAP.lock().unwrap();
subscriber_map.insert(LOCAL_SERVICE_ON, subscriber_id);
}
}
pub fn subscribe_local_service_off(handler: Box<dyn Fn(ServiceInfo) + Send + Sync>) {
let mut event_subscriber_map = EVENT_SUBSCRIBER_MAP.write().unwrap();
event_subscriber_map.insert(
LOCAL_SERVICE_OFF,
Box::new(move |buffer: *const u8, buffer_len: u32| {
let vec_buffer =
unsafe { slice::from_raw_parts(buffer as *mut u8, buffer_len as usize) };
let mut service_infos = ServiceInfo::default();
service_infos.parse_from_bytes(vec_buffer).unwrap();
handler(service_infos);
}),
);
unsafe {
let subscriber_id = SUBSCRIBE.assume_init_ref()(LOCAL_SERVICE_OFF, 0, event_receiver);
let mut subscriber_map = SUBSCRIBERIDMAP.lock().unwrap();
subscriber_map.insert(LOCAL_SERVICE_OFF, subscriber_id);
}
}
pub fn unsubscribe_all() {
unsafe {
let mut subscriber_map = SUBSCRIBERIDMAP.lock().unwrap();
for (event_id, subscriber_id) in subscriber_map.iter() {
UNSUBSCRIBE.assume_init_ref()(*event_id, *subscriber_id);
}
subscriber_map.clear();
}
}
}