use std::{
future::Future,
sync::{Arc, Mutex},
task::{Poll, Waker},
};
use crate::x_core::{add_request_handler, gen_id};
use xCommonLib::{base::status::Status, serial::request_message::RequestMessage};
struct CommonApiStatus {
is_finish: bool,
status: Status,
waker: Option<Waker>,
}
pub struct CommonStatusFuture {
shared_state: Arc<Mutex<CommonApiStatus>>,
}
impl Future for CommonStatusFuture {
type Output = Status;
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let mut shared_state = self.shared_state.lock().unwrap();
if shared_state.is_finish {
let err_msg = std::mem::take(&mut shared_state.status.err_msg);
return Poll::Ready(Status::new(shared_state.status.err_code, err_msg));
}
shared_state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
fn build_common_status_future() -> CommonStatusFuture {
let shared_state = Arc::new(Mutex::new(CommonApiStatus {
is_finish: false,
status: Status::default(),
waker: None,
}));
let future = CommonStatusFuture { shared_state };
future
}
fn add_request_status_handler(request_id: i64, clone_shared_state: Arc<Mutex<CommonApiStatus>>) {
add_request_handler(
request_id,
Box::new(move |buffer| {
let waker = {
let mut shared_state = clone_shared_state.lock().unwrap();
let waker = shared_state.waker.take();
if waker.is_none() {
return;
}
shared_state
.status
.parse_from_bytes_return_num(buffer)
.unwrap();
shared_state.is_finish = true;
waker
};
waker.unwrap().wake_by_ref();
}),
);
}
struct ResultApiState<T> {
is_finish: bool,
status: Status,
value: Option<T>,
waker: Option<Waker>,
}
pub struct ResultApiFuture<T> {
shared_state: Arc<Mutex<ResultApiState<T>>>,
}
impl<T: Send + Sync> Future for ResultApiFuture<T> {
type Output = Result<T, Status>;
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let mut shared_state = self.shared_state.lock().unwrap();
if shared_state.is_finish {
if shared_state.status.is_erorr() {
let status = std::mem::take(&mut shared_state.status);
return Poll::Ready(Err(status));
} else {
let value = std::mem::take(&mut shared_state.value);
return Poll::Ready(Ok(value.unwrap()));
}
}
shared_state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
fn build_result_api_future<T>() -> ResultApiFuture<T>
where
T: RequestMessage + Default + 'static,
{
let shared_state = Arc::new(Mutex::new(ResultApiState::<T> {
is_finish: false,
status: Status::default(),
waker: None,
value: None,
}));
let future = ResultApiFuture::<T> { shared_state };
future
}
fn add_request_result_handler<T>(request_id: i64, clone_shared_state: Arc<Mutex<ResultApiState<T>>>)
where
T: RequestMessage + Default + 'static,
{
add_request_handler(
request_id,
Box::new(move |buffer| {
let waker = {
let mut shared_state = clone_shared_state.lock().unwrap();
let waker = shared_state.waker.take();
if waker.is_none() {
return;
}
let read_bytes = shared_state
.status
.parse_from_bytes_return_num(buffer)
.unwrap();
if read_bytes != buffer.len() as u64 {
let mut value = T::default();
let bytes = &buffer[read_bytes as usize..];
value.parse_from_bytes_return_num(bytes).unwrap();
shared_state.value = Some(value);
}
shared_state.is_finish = true;
waker
};
waker.unwrap().wake_by_ref();
}),
);
}
struct BytesApiState {
is_finish: bool,
status: Status,
value: Option<Vec<u8>>,
waker: Option<Waker>,
}
pub struct BytesApiFuture {
shared_state: Arc<Mutex<BytesApiState>>,
}
fn build_bytes_api_future() -> BytesApiFuture {
let shared_state = Arc::new(Mutex::new(BytesApiState {
is_finish: false,
status: Status::default(),
waker: None,
value: None,
}));
let future = BytesApiFuture { shared_state };
future
}
impl Future for BytesApiFuture {
type Output = Result<Vec<u8>, Status>;
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let mut shared_state = self.shared_state.lock().unwrap();
if shared_state.is_finish {
if shared_state.status.is_erorr() {
let status = std::mem::take(&mut shared_state.status);
return Poll::Ready(Err(status));
} else {
let value = std::mem::take(&mut shared_state.value);
if value.is_none() {
return Poll::Ready(Ok(Vec::default()));
} else {
return Poll::Ready(Ok(value.unwrap()));
}
}
}
shared_state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
fn add_request_bytes_handler(request_id: i64, clone_shared_state: Arc<Mutex<BytesApiState>>) {
add_request_handler(
request_id,
Box::new(move |buffer| {
let waker = {
let mut shared_state = clone_shared_state.lock().unwrap();
let waker = shared_state.waker.take();
if waker.is_none() {
return;
}
let read_bytes = shared_state
.status
.parse_from_bytes_return_num(buffer)
.unwrap();
if read_bytes != buffer.len() as u64 {
shared_state.value = Some(buffer[read_bytes as usize..].to_vec());
}
shared_state.is_finish = true;
waker
};
waker.unwrap().wake_by_ref();
}),
);
}
pub mod xport {
use core::slice;
use std::collections::HashMap;
use std::sync::{Mutex, RwLock};
use super::{
add_request_result_handler, add_request_status_handler, build_common_status_future,
build_result_api_future, CommonStatusFuture, ResultApiFuture,
};
use crate::x_core::{
gen_id, get_service_id, ADD_CHANNEL_ID_TO_REMOTE_SERVICE, BUILD_CHANNEL_API,
GET_ALL_CONN_ID, GET_ALL_LOCAL_SERVICE, GET_CHANNEL_ID_BY_CONN_ID, GET_XRPC_PORT,
LOAD_SERVICE_API, REMOVE_REMOTE_SERVICE_ALL_CHANNEL_ID, SEND_MESSAGE_API,
SET_CHANNEL_XRPC_PORT, SLEEP_API, SUBSCRIBE, UNLOAD_SERVICE_API, UNSUBSCRIBE,
};
use xCommonLib::base::dll_api::event::CHANNEL_CONNECTED;
use xCommonLib::base::dll_api::event::CHANNEL_DISCONNECTED;
use xCommonLib::base::dll_api::event::LOCAL_SERVICE_OFF;
use xCommonLib::base::dll_api::event::LOCAL_SERVICE_ON;
use lazy_static::lazy_static;
use xCommonLib::serial::request_message::RequestMessage;
use xCommonLib::service::sys_service_api::{
ChannelEvent, ChannelId, ConnIds, LoadServiceRequest, ServiceInfo, ServiceInfos,
ServiceKey, UnloadServiceRequest,
};
lazy_static! {
static ref SUBSCRIBERIDMAP:Mutex<HashMap<u16, i64>> = Mutex::new(HashMap::new());
static ref EVENT_SUBSCRIBER_MAP: RwLock<HashMap<u16, Box<dyn Fn(*const u8, u32) + Send + Sync>>> = RwLock::new(HashMap::new());
}
pub fn send_message(
request_id: i64,
receiver_service_key: ServiceKey,
channel_id: i64,
buffer: Vec<u8>,
) {
let receiver_buffer = receiver_service_key.serial().unwrap();
unsafe {
SEND_MESSAGE_API.assume_init_ref()(
get_service_id(),
request_id,
receiver_buffer.as_ptr(),
receiver_buffer.len() as u32,
channel_id,
buffer.as_ptr(),
buffer.len() as u32,
);
}
}
pub fn sleep(milliseconds: u64) -> CommonStatusFuture {
let future = build_common_status_future();
let request_id = gen_id();
let clone_shared_state = future.shared_state.clone();
add_request_status_handler(request_id, clone_shared_state);
unsafe {
SLEEP_API.assume_init_ref()(get_service_id(), request_id, milliseconds);
}
future
}
pub fn load_service(param: &LoadServiceRequest) -> CommonStatusFuture {
let future = build_common_status_future();
let request_id = gen_id();
let clone_shared_state = future.shared_state.clone();
add_request_status_handler(request_id, clone_shared_state);
let message = param.serial().unwrap();
unsafe {
LOAD_SERVICE_API.assume_init_ref()(
get_service_id(),
request_id,
message.as_ptr(),
message.len() as u32,
);
};
future
}
pub fn unload_service(param: &UnloadServiceRequest) -> CommonStatusFuture {
let future = build_common_status_future();
let request_id = gen_id();
let clone_shared_state = future.shared_state.clone();
add_request_status_handler(request_id, clone_shared_state);
let message = param.serial().unwrap();
unsafe {
UNLOAD_SERVICE_API.assume_init_ref()(
get_service_id(),
request_id,
message.as_ptr(),
message.len() as u32,
);
}
future
}
pub fn build_channel(node_id: i64) -> CommonStatusFuture {
let future = build_common_status_future();
unsafe {
let request_id = gen_id();
let clone_shared_state = future.shared_state.clone();
add_request_status_handler(request_id, clone_shared_state);
BUILD_CHANNEL_API.assume_init_ref()(get_service_id(), request_id, node_id);
}
future
}
pub fn get_all_local_service(filter_system: bool) -> ResultApiFuture<ServiceInfos> {
let future = build_result_api_future::<ServiceInfos>();
let request_id = gen_id();
let clone_shared_state = future.shared_state.clone();
add_request_result_handler(request_id, clone_shared_state);
unsafe {
GET_ALL_LOCAL_SERVICE.assume_init_ref()(get_service_id(), request_id, filter_system);
}
future
}
pub fn add_channel_id_to_remote_services(
channel_id: i64,
remote_services: ServiceInfos,
) -> CommonStatusFuture {
let future = build_common_status_future();
unsafe {
let request_id = gen_id();
let clone_shared_state = future.shared_state.clone();
add_request_status_handler(request_id, clone_shared_state);
let message = remote_services.serial().unwrap();
ADD_CHANNEL_ID_TO_REMOTE_SERVICE.assume_init_ref()(
get_service_id(),
request_id,
channel_id,
message.as_ptr(),
message.len() as u32,
);
}
future
}
pub fn remove_remote_services_all_channel_id(
channel_id: i64,
remote_service: ServiceInfo,
) -> CommonStatusFuture {
let future = build_common_status_future();
unsafe {
let request_id = gen_id();
let clone_shared_state = future.shared_state.clone();
add_request_status_handler(request_id, clone_shared_state);
let message = remote_service.serial().unwrap();
REMOVE_REMOTE_SERVICE_ALL_CHANNEL_ID.assume_init_ref()(
get_service_id(),
request_id,
channel_id,
message.as_ptr(),
message.len() as u32,
);
}
future
}
pub fn set_channel_xrpc_port(channel_id: i64, xprc_port: i32) -> CommonStatusFuture {
let future = build_common_status_future();
unsafe {
let request_id = gen_id();
let clone_shared_state = future.shared_state.clone();
add_request_status_handler(request_id, clone_shared_state);
SET_CHANNEL_XRPC_PORT.assume_init_ref()(
get_service_id(),
request_id,
channel_id,
xprc_port,
);
}
future
}
pub fn get_xrpc_port() -> Option<i32> {
let request_id = gen_id();
unsafe {
let port = GET_XRPC_PORT.assume_init_ref()(get_service_id(), request_id);
Some(port)
}
}
pub fn get_all_conn_id() -> ResultApiFuture<ConnIds> {
let future = build_result_api_future::<ConnIds>();
let request_id = gen_id();
let clone_shared_state = future.shared_state.clone();
add_request_result_handler(request_id, clone_shared_state);
unsafe {
GET_ALL_CONN_ID.assume_init_ref()(get_service_id(), request_id);
}
future
}
pub fn get_channel_id_by_conn_id(conn_id: i64) -> ResultApiFuture<ChannelId> {
let future = build_result_api_future::<ChannelId>();
let request_id = gen_id();
let clone_shared_state = future.shared_state.clone();
add_request_result_handler(request_id, clone_shared_state);
unsafe {
GET_CHANNEL_ID_BY_CONN_ID.assume_init_ref()(get_service_id(), request_id, conn_id);
}
future
}
pub extern "C" fn event_receiver(event_id: u16, buffer: *const u8, buffer_len: u32) -> bool {
let event_subscriber_map = EVENT_SUBSCRIBER_MAP.read().unwrap();
let hanlder = event_subscriber_map.get(&event_id);
if hanlder.is_none() {
return false;
}
let hanlder = hanlder.unwrap();
hanlder(buffer, buffer_len);
true
}
pub fn subscribe_channel_connected(handler: Box<dyn Fn(ChannelEvent) + Send + Sync>) {
let mut event_subscriber_map = EVENT_SUBSCRIBER_MAP.write().unwrap();
event_subscriber_map.insert(
CHANNEL_CONNECTED,
Box::new(move |buffer: *const u8, buffer_len: u32| {
let vec_buffer =
unsafe { slice::from_raw_parts(buffer as *mut u8, buffer_len as usize) };
let mut channel_event = ChannelEvent::default();
channel_event.parse_from_bytes(vec_buffer).unwrap();
handler(channel_event);
}),
);
unsafe {
let subscriber_id = SUBSCRIBE.assume_init_ref()(CHANNEL_CONNECTED, 0, event_receiver);
let mut subscriber_map = SUBSCRIBERIDMAP.lock().unwrap();
subscriber_map.insert(CHANNEL_CONNECTED, subscriber_id);
}
}
pub fn subscribe_channel_disconnected(handler: Box<dyn Fn(ChannelEvent) + Send + Sync>) {
let mut event_subscriber_map = EVENT_SUBSCRIBER_MAP.write().unwrap();
event_subscriber_map.insert(
CHANNEL_DISCONNECTED,
Box::new(move |buffer: *const u8, buffer_len: u32| {
let vec_buffer =
unsafe { slice::from_raw_parts(buffer as *mut u8, buffer_len as usize) };
let mut channel_event = ChannelEvent::default();
channel_event.parse_from_bytes(vec_buffer).unwrap();
handler(channel_event);
}),
);
unsafe {
let subscriber_id =
SUBSCRIBE.assume_init_ref()(CHANNEL_DISCONNECTED, 0, event_receiver);
let mut subscriber_map = SUBSCRIBERIDMAP.lock().unwrap();
subscriber_map.insert(CHANNEL_DISCONNECTED, subscriber_id);
}
}
pub fn subscribe_local_service_on(handler: Box<dyn Fn(ServiceInfo) + Send + Sync>) {
let mut event_subscriber_map = EVENT_SUBSCRIBER_MAP.write().unwrap();
event_subscriber_map.insert(
LOCAL_SERVICE_ON,
Box::new(move |buffer: *const u8, buffer_len: u32| {
let vec_buffer =
unsafe { slice::from_raw_parts(buffer as *mut u8, buffer_len as usize) };
let mut service_infos = ServiceInfo::default();
service_infos.parse_from_bytes(vec_buffer).unwrap();
handler(service_infos);
}),
);
unsafe {
let subscriber_id = SUBSCRIBE.assume_init_ref()(LOCAL_SERVICE_ON, 0, event_receiver);
let mut subscriber_map = SUBSCRIBERIDMAP.lock().unwrap();
subscriber_map.insert(LOCAL_SERVICE_ON, subscriber_id);
}
}
pub fn subscribe_local_service_off(handler: Box<dyn Fn(ServiceInfo) + Send + Sync>) {
let mut event_subscriber_map = EVENT_SUBSCRIBER_MAP.write().unwrap();
event_subscriber_map.insert(
LOCAL_SERVICE_OFF,
Box::new(move |buffer: *const u8, buffer_len: u32| {
let vec_buffer =
unsafe { slice::from_raw_parts(buffer as *mut u8, buffer_len as usize) };
let mut service_infos = ServiceInfo::default();
service_infos.parse_from_bytes(vec_buffer).unwrap();
handler(service_infos);
}),
);
unsafe {
let subscriber_id = SUBSCRIBE.assume_init_ref()(LOCAL_SERVICE_OFF, 0, event_receiver);
let mut subscriber_map = SUBSCRIBERIDMAP.lock().unwrap();
subscriber_map.insert(LOCAL_SERVICE_OFF, subscriber_id);
}
}
pub fn unsubscribe_all() {
unsafe {
let mut subscriber_map = SUBSCRIBERIDMAP.lock().unwrap();
for (event_id, subscriber_id) in subscriber_map.iter() {
UNSUBSCRIBE.assume_init_ref()(*event_id, *subscriber_id);
}
subscriber_map.clear();
}
}
}
pub mod file {
use crate::x_core::{
add_request_handler, gen_id, get_service_id, ALLOW_DOWNLOAD_FILE_API,
ALLOW_UPLOAD_FILE_API, CLOSE_FILE_API, CREATE_DIR_API, CREATE_FILE_API, DOWNLOAD_FILE_API,
GET_DXC_DSL, GET_FILE_MD5, IS_DIR_EXIST_API, IS_FILE_EXIST_API, OPEN_FILE_API,
READ_FILE_API, SEEK_FILE_API, UPLOAD_FILE_API, WRITE_FILE_API,
};
use super::{
add_request_bytes_handler, add_request_result_handler, add_request_status_handler,
build_bytes_api_future, build_common_status_future, build_result_api_future,
BytesApiFuture, CommonStatusFuture, ResultApiFuture,
};
use xCommonLib::{
base::status::Status,
serial::{self, request_message::RequestMessage},
service::sys_service_api::{DXCDSLInfo, StringResponse, TransferdFileInfo, U64Response},
};
use std::{
future::Future,
sync::{Arc, Mutex},
task::{Poll, Waker},
};
struct ExistSharedState {
is_finish: bool,
is_exist: bool,
waker: Option<Waker>,
}
pub struct ExistFuture {
shared_state: Arc<Mutex<ExistSharedState>>,
}
impl Future for ExistFuture {
type Output = bool;
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Self::Output> {
let mut shared_state = self.shared_state.lock().unwrap();
if shared_state.is_finish {
return Poll::Ready(shared_state.is_exist);
}
shared_state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
fn add_exist_request(shared_state: Arc<Mutex<ExistSharedState>>) -> i64 {
let request_id = super::gen_id();
let clone_shared_state = shared_state.clone();
add_request_handler(
request_id,
Box::new(move |_buffer| {
let waker = {
let mut shared_state = clone_shared_state.lock().unwrap();
let waker = shared_state.waker.take();
if waker.is_none() {
return;
}
let is_exist = serial::read_bool(_buffer);
shared_state.is_exist = is_exist;
shared_state.is_finish = true;
waker
};
waker.unwrap().wake_by_ref();
}),
);
request_id
}
fn new_exist_future() -> ExistFuture {
let shared_state = Arc::new(Mutex::new(ExistSharedState {
is_finish: false,
is_exist: false,
waker: None,
}));
let future = ExistFuture { shared_state };
future
}
pub fn is_file_exist(file_path: &str) -> ExistFuture {
let future = new_exist_future();
let clone_shared_state = future.shared_state.clone();
let request_id = add_exist_request(clone_shared_state);
let buffer = serial::serial_string(file_path);
unsafe {
IS_FILE_EXIST_API.assume_init_ref()(
get_service_id(),
request_id,
buffer.as_ptr(),
buffer.len() as u32,
);
};
future
}
pub fn is_dir_exist(dir_path: &str) -> ExistFuture {
let future = new_exist_future();
let clone_shared_state = future.shared_state.clone();
let request_id = add_exist_request(clone_shared_state);
let buffer = serial::serial_string(dir_path);
unsafe {
IS_DIR_EXIST_API.assume_init_ref()(
get_service_id(),
request_id,
buffer.as_ptr(),
buffer.len() as u32,
);
};
future
}
pub fn create_dir(dir_path: &str) -> CommonStatusFuture {
let future = build_common_status_future();
let request_id = gen_id();
let clone_shared_state = future.shared_state.clone();
add_request_status_handler(request_id, clone_shared_state);
let buffer = serial::serial_string(dir_path);
unsafe {
CREATE_DIR_API.assume_init_ref()(
get_service_id(),
request_id,
buffer.as_ptr(),
buffer.len() as u32,
);
}
future
}
pub fn create_file(file_path: &str) -> CommonStatusFuture {
let future = build_common_status_future();
let request_id = gen_id();
let clone_shared_state = future.shared_state.clone();
add_request_status_handler(request_id, clone_shared_state);
let buffer = serial::serial_string(file_path);
unsafe {
CREATE_FILE_API.assume_init_ref()(
get_service_id(),
request_id,
buffer.as_ptr(),
buffer.len() as u32,
);
}
future
}
pub fn get_md5(file_path: &str) -> ResultApiFuture<StringResponse> {
let future = build_result_api_future::<StringResponse>();
let request_id = gen_id();
let clone_shared_state = future.shared_state.clone();
add_request_result_handler(request_id, clone_shared_state);
let buffer = serial::serial_string(file_path);
unsafe {
GET_FILE_MD5.assume_init_ref()(
get_service_id(),
request_id,
buffer.as_ptr(),
buffer.len() as u32,
);
}
future
}
pub fn get_dxc_dsl(file_path: &str) -> ResultApiFuture<DXCDSLInfo> {
let future = build_result_api_future::<DXCDSLInfo>();
let request_id = gen_id();
let clone_shared_state = future.shared_state.clone();
add_request_result_handler(request_id, clone_shared_state);
let buffer = serial::serial_string(file_path);
unsafe {
GET_DXC_DSL.assume_init_ref()(
get_service_id(),
request_id,
buffer.as_ptr(),
buffer.len() as u32,
);
}
future
}
pub fn allow_download_file(file_path: &str) -> ResultApiFuture<StringResponse> {
let future = build_result_api_future::<StringResponse>();
let request_id = gen_id();
let clone_shared_state = future.shared_state.clone();
add_request_result_handler(request_id, clone_shared_state);
let mut transfer_file_info = TransferdFileInfo::default();
transfer_file_info.file_path = file_path.into();
let message = transfer_file_info.serial().unwrap();
unsafe {
ALLOW_DOWNLOAD_FILE_API.assume_init_ref()(
get_service_id(),
request_id,
message.as_ptr(),
message.len() as u32,
);
}
future
}
pub fn allow_upload_file(file_path: &str, file_size: u64) -> ResultApiFuture<StringResponse> {
let future = build_result_api_future::<StringResponse>();
let request_id = gen_id();
let clone_shared_state = future.shared_state.clone();
add_request_result_handler(request_id, clone_shared_state);
let mut transfer_file_info = TransferdFileInfo::default();
transfer_file_info.file_path = file_path.into();
transfer_file_info.file_size = file_size;
let message = transfer_file_info.serial().unwrap();
unsafe {
ALLOW_UPLOAD_FILE_API.assume_init_ref()(
get_service_id(),
request_id,
message.as_ptr(),
message.len() as u32,
);
}
future
}
pub fn download_file(file_url: &str, file_path: &str) -> CommonStatusFuture {
let future = build_common_status_future();
let request_id = gen_id();
let clone_shared_state = future.shared_state.clone();
add_request_status_handler(request_id, clone_shared_state);
let mut transfer_file_info = TransferdFileInfo::default();
transfer_file_info.file_path = file_path.into();
transfer_file_info.file_url = file_url.into();
let message = transfer_file_info.serial().unwrap();
unsafe {
DOWNLOAD_FILE_API.assume_init_ref()(
get_service_id(),
request_id,
message.as_ptr(),
message.len() as u32,
);
}
future
}
pub fn upload_file(file_url: &str, file_path: &str) -> CommonStatusFuture {
let future = build_common_status_future();
let request_id = gen_id();
let clone_shared_state = future.shared_state.clone();
add_request_status_handler(request_id, clone_shared_state);
let mut transfer_file_info = TransferdFileInfo::default();
transfer_file_info.file_path = file_path.into();
transfer_file_info.file_url = file_url.into();
let message = transfer_file_info.serial().unwrap();
unsafe {
UPLOAD_FILE_API.assume_init_ref()(
get_service_id(),
request_id,
message.as_ptr(),
message.len() as u32,
);
}
future
}
pub struct File {
file_id: i64,
}
impl Drop for File {
fn drop(&mut self) {
unsafe {
CLOSE_FILE_API.assume_init_ref()(get_service_id(), self.file_id);
}
}
}
impl File {
pub async fn open(file_path: &str) -> Result<File, Status> {
let future = build_common_status_future();
let file_id = gen_id();
let clone_shared_state = future.shared_state.clone();
add_request_status_handler(file_id, clone_shared_state);
let buffer = serial::serial_string(file_path);
unsafe {
OPEN_FILE_API.assume_init_ref()(
get_service_id(),
file_id,
buffer.as_ptr(),
buffer.len() as u32,
);
}
let status = future.await;
if status.is_erorr() {
Err(status)
} else {
Ok(File { file_id })
}
}
pub fn write(&self, buffer: &[u8]) -> CommonStatusFuture {
let future = build_common_status_future();
let clone_shared_state = future.shared_state.clone();
add_request_status_handler(self.file_id, clone_shared_state);
unsafe {
WRITE_FILE_API.assume_init_ref()(
get_service_id(),
self.file_id,
buffer.as_ptr(),
buffer.len() as u32,
);
};
future
}
pub fn seek(&self, pos: u64) -> ResultApiFuture<U64Response> {
let future = build_result_api_future::<U64Response>();
let clone_shared_state = future.shared_state.clone();
add_request_result_handler(self.file_id, clone_shared_state);
unsafe {
SEEK_FILE_API.assume_init_ref()(get_service_id(), self.file_id, pos);
};
future
}
pub fn read(&self, size: u64) -> BytesApiFuture {
let future = build_bytes_api_future();
let clone_shared_state = future.shared_state.clone();
add_request_bytes_handler(self.file_id, clone_shared_state);
unsafe {
READ_FILE_API.assume_init_ref()(get_service_id(), self.file_id, size);
}
future
}
}
}
pub mod http {
use std::sync::{Arc, Mutex};
use super::{
add_request_bytes_handler, add_request_result_handler, build_bytes_api_future,
build_result_api_future, BytesApiFuture, ResultApiFuture,
};
use crate::x_core::{
gen_id, get_service_id, GET_HTTP_DATA_API, GET_HTTP_LISTEN_ADDR_API,
REMOVE_HTTP_CLIENT_API, SEND_HTTP_REQUEST_API,
};
use futures::Future;
use serde::{de::DeserializeOwned, Serialize};
use std::task::{Poll, Waker};
use tracing::debug;
use xCommonLib::{
base::status::Status,
serial::request_message::RequestMessage,
service::sys_service_api::{HTTPMethod, HttpRequest, HttpResponse, StringResponse},
};
pub struct Client {
request_id: i64,
request: HttpRequest,
response: HttpResponse,
status: Status,
}
struct HttpBodyState {
is_finish: bool,
body: Vec<u8>,
waker: Option<Waker>,
}
pub struct HttpBodyFuture {
shared_state: Arc<Mutex<HttpBodyState>>,
}
impl Future for HttpBodyFuture {
type Output = Vec<u8>;
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Self::Output> {
let mut shared_state = self.shared_state.lock().unwrap();
if shared_state.is_finish {
let body = std::mem::take(&mut shared_state.body);
return Poll::Ready(body);
}
shared_state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
impl Drop for Client {
fn drop(&mut self) {
unsafe {
REMOVE_HTTP_CLIENT_API.assume_init_ref()(get_service_id(), self.request_id);
}
}
}
impl Client {
pub fn build() -> Self {
let request = HttpRequest::default();
Client {
request_id: gen_id(),
request,
response: HttpResponse::default(),
status: Status::default(),
}
}
pub fn header(mut self, key: &str, value: &str) -> Self {
self.request.headers.insert(key.into(), value.into());
self
}
pub fn body<T>(mut self, t: &T) -> Self
where
T: ?Sized + Serialize,
{
self.request.body = serde_json::to_string(t).unwrap();
self
}
pub async fn get_string(&self) -> String {
let mut buffers: Vec<Vec<u8>> = Vec::with_capacity(5);
loop {
let body = self.get_body().await;
if body.is_err() {
break;
}
let body = body.unwrap();
if body.is_empty() {
break;
}
buffers.push(body);
}
buffers
.iter()
.map(|ele| String::from_utf8(ele.to_vec()).unwrap())
.collect::<Vec<_>>()
.join("")
}
pub async fn get_as<T>(&self) -> Result<T, Status>
where
T: DeserializeOwned,
{
let result = self.get_string().await;
debug!("result = {}", result);
if result.is_empty() {
return Err(Status::error("获取内容为空".into()));
}
let result = serde_json::from_str(&result);
match result {
Ok(value) => Ok(value),
Err(err) => Err(Status::error(err.to_string())),
}
}
pub async fn get(&mut self, url: &str) {
self.request.url = url.into();
self.request.method = Some(HTTPMethod::GET);
let result = self.do_request().await;
if result.is_err() {
self.status = result.err().unwrap();
} else {
self.response.headers = result.unwrap().headers;
}
}
pub async fn post(&mut self, url: &str) {
self.request.url = url.into();
self.request.method = Some(HTTPMethod::POST);
let result = self.do_request().await;
if result.is_err() {
self.status = result.err().unwrap();
} else {
self.response.headers = result.unwrap().headers;
}
}
pub fn do_request(&self) -> ResultApiFuture<HttpResponse> {
let future = build_result_api_future::<HttpResponse>();
let clone_shared_state = future.shared_state.clone();
add_request_result_handler(self.request_id, clone_shared_state);
let message = self.request.serial().unwrap();
unsafe {
SEND_HTTP_REQUEST_API.assume_init_ref()(
get_service_id(),
self.request_id,
message.as_ptr(),
message.len() as u32,
);
}
future
}
pub fn get_body(&self) -> BytesApiFuture {
let future = build_bytes_api_future();
let clone_shared_state = future.shared_state.clone();
add_request_bytes_handler(self.request_id, clone_shared_state);
unsafe {
GET_HTTP_DATA_API.assume_init_ref()(get_service_id(), self.request_id);
}
future
}
}
pub fn get_listen_addr() -> ResultApiFuture<StringResponse> {
let future = build_result_api_future::<StringResponse>();
let request_id = gen_id();
let clone_shared_state = future.shared_state.clone();
add_request_result_handler(request_id, clone_shared_state);
unsafe {
GET_HTTP_LISTEN_ADDR_API.assume_init_ref()(get_service_id(), request_id);
}
future
}
}
pub mod database {
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use super::{
add_request_bytes_handler, add_request_result_handler, add_request_status_handler,
build_bytes_api_future, build_common_status_future, build_result_api_future,
BytesApiFuture, CommonStatusFuture, ResultApiFuture,
};
use crate::x_core::{
get_request_id, get_service_id, BEGIN_TX, COMMIT, EXECUTE, GET_CONNECTION, IS_TABLE_EXIST,
PUTBACK_CONNECTION, QUERY, ROLLBACK,
};
use lazy_static::lazy_static;
use tracing::debug;
use xCommonLib::{
base::status::Status,
service::sys_service_api::{BoolResponse, DBExecuteResult},
};
pub struct Connection {
connection_id: i64,
}
lazy_static! {
static ref CONNECTION_MAP: Mutex<HashMap<i64, Arc<Connection>>> =
Mutex::new(HashMap::new());
}
fn get_connection_with_id(connection_id: i64) -> CommonStatusFuture {
let future = build_common_status_future();
let clone_shared_state = future.shared_state.clone();
add_request_status_handler(connection_id, clone_shared_state);
unsafe {
GET_CONNECTION.assume_init_ref()(get_service_id(), connection_id);
};
future
}
pub async fn get_connection() -> Result<Arc<Connection>, Status> {
let request_id = get_request_id();
{
let connection_map = CONNECTION_MAP.lock().unwrap();
let connection = connection_map.get(&request_id);
if connection.is_some() {
return Ok(connection.unwrap().clone());
}
}
let connection_id = super::gen_id();
let status = get_connection_with_id(connection_id).await;
if status.is_erorr() {
Err(status)
} else {
let connection = Arc::new(Connection { connection_id });
let mut connection_map = CONNECTION_MAP.lock().unwrap();
connection_map.insert(request_id, connection.clone());
Ok(connection)
}
}
impl Connection {
pub fn is_table_exist(&self, table_name: &str) -> ResultApiFuture<BoolResponse> {
let future = build_result_api_future::<BoolResponse>();
let clone_shared_state = future.shared_state.clone();
add_request_result_handler(self.connection_id, clone_shared_state);
unsafe {
IS_TABLE_EXIST.assume_init_ref()(
get_service_id(),
self.connection_id,
table_name.as_ptr(),
table_name.len() as u32,
);
}
future
}
pub fn query(&self, sql: &str) -> BytesApiFuture {
let future = build_bytes_api_future();
let clone_shared_state = future.shared_state.clone();
add_request_bytes_handler(self.connection_id, clone_shared_state);
unsafe {
QUERY.assume_init_ref()(
get_service_id(),
self.connection_id,
sql.as_ptr(),
sql.len() as u32,
);
}
future
}
pub fn execute(&self, sql: &str) -> ResultApiFuture<DBExecuteResult> {
let future = build_result_api_future::<DBExecuteResult>();
let clone_shared_state = future.shared_state.clone();
add_request_result_handler(self.connection_id, clone_shared_state);
unsafe {
EXECUTE.assume_init_ref()(
get_service_id(),
self.connection_id,
sql.as_ptr(),
sql.len() as u32,
);
}
future
}
pub fn begin_tx(&self) -> CommonStatusFuture {
let future = build_common_status_future();
let clone_shared_state = future.shared_state.clone();
add_request_status_handler(self.connection_id, clone_shared_state);
unsafe {
BEGIN_TX.assume_init_ref()(get_service_id(), self.connection_id);
};
future
}
pub fn commit(&self) -> CommonStatusFuture {
let future = build_common_status_future();
let clone_shared_state = future.shared_state.clone();
add_request_status_handler(self.connection_id, clone_shared_state);
unsafe {
COMMIT.assume_init_ref()(get_service_id(), self.connection_id);
};
future
}
pub fn rollback(&self) -> CommonStatusFuture {
let future = build_common_status_future();
let clone_shared_state = future.shared_state.clone();
add_request_status_handler(self.connection_id, clone_shared_state);
unsafe {
ROLLBACK.assume_init_ref()(get_service_id(), self.connection_id);
};
future
}
fn putback_inner(&self) -> CommonStatusFuture {
let future = build_common_status_future();
let clone_shared_state = future.shared_state.clone();
add_request_status_handler(self.connection_id, clone_shared_state);
unsafe {
PUTBACK_CONNECTION.assume_init_ref()(get_service_id(), self.connection_id);
};
future
}
pub async fn putback(self: &Arc<Self>) -> Status {
let ref_count = Arc::strong_count(self);
if ref_count > 2 {
return Status::default();
}
debug!("释放连接 = {}", self.connection_id);
{
let request_id = get_request_id();
let mut connection_map = CONNECTION_MAP.lock().unwrap();
connection_map.remove(&request_id);
}
self.putback_inner().await
}
}
}