use crate::async_token_manager::IAsyncTokenManager;
use crate::{IController, NetPeer, RetResult};
use aqueue::Actor;
use data_rw::{Data, DataOwnedReader};
use oneshot::{channel as oneshot, Receiver, Sender};
use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::{Arc, Weak};
use tokio::time::Instant;
#[cfg(all(feature = "tcpserver", not(feature = "tcp-channel-server")))]
use tcpserver::IPeer;
pub struct AsyncToken<T> {
session_id: i64,
controller: Option<Arc<T>>,
peer: Option<Arc<NetPeer>>,
manager: Weak<dyn IAsyncTokenManager<T>>,
result_dict: HashMap<i64, Sender<crate::error::Result<DataOwnedReader>>>,
serial_atomic: AtomicI64,
request_queue: VecDeque<(i64, Instant)>,
}
unsafe impl<T: IController> Send for AsyncToken<T> {}
unsafe impl<T: IController> Sync for AsyncToken<T> {}
pub type NetxToken<T> = Arc<Actor<AsyncToken<T>>>;
impl<T: IController> AsyncToken<T> {
pub(crate) fn new(session_id: i64, manager: Weak<dyn IAsyncTokenManager<T>>) -> AsyncToken<T> {
AsyncToken {
session_id,
controller: None,
peer: None,
manager,
result_dict: Default::default(),
serial_atomic: AtomicI64::new(1),
request_queue: Default::default(),
}
}
}
impl<T> Drop for AsyncToken<T> {
fn drop(&mut self) {
log::debug!("token session id:{} drop", self.session_id);
}
}
impl<T: IController> AsyncToken<T> {
#[inline]
pub(crate) async fn call_special_function(&self, cmd_tag: i32) -> anyhow::Result<()> {
if let Some(ref controller) = self.controller {
controller
.call(1, cmd_tag, DataOwnedReader::new(vec![0; 4]))
.await?;
}
Ok(())
}
#[inline]
pub(crate) async fn execute_controller(
&self,
tt: u8,
cmd: i32,
dr: DataOwnedReader,
) -> anyhow::Result<RetResult> {
if let Some(ref controller) = self.controller {
return controller.call(tt, cmd, dr).await;
}
anyhow::bail!("controller is none")
}
#[inline]
pub(crate) fn new_serial(&self) -> i64 {
self.serial_atomic.fetch_add(1, Ordering::Acquire)
}
#[inline]
pub fn set_error(&mut self, serial: i64, err: crate::error::Error) -> crate::error::Result<()> {
if let Some(tx) = self.result_dict.remove(&serial) {
Ok(tx
.send(Err(err))
.map_err(|_| crate::error::Error::SerialClose(serial))?)
} else {
Ok(())
}
}
#[inline]
pub fn check_request_timeout(&mut self, request_out_time: u32) {
while let Some(item) = self.request_queue.pop_back() {
if item.1.elapsed().as_millis() as u32 >= request_out_time {
if let Err(er) = self.set_error(item.0, crate::error::Error::SerialTimeOut(item.0))
{
log::error!("check err:{}", er);
}
} else {
self.request_queue.push_back(item);
break;
}
}
}
}
pub(crate) trait IAsyncTokenInner {
type Controller: IController;
async fn set_controller(&self, controller: Arc<Self::Controller>);
async fn clear_controller_fun_maps(&self);
async fn set_peer(&self, peer: Option<Arc<NetPeer>>);
async fn call_special_function(&self, cmd_tag: i32) -> anyhow::Result<()>;
async fn execute_controller(&self, tt: u8, cmd: i32, data: DataOwnedReader) -> RetResult;
async fn set_result(&self, serial: i64, data: DataOwnedReader) -> anyhow::Result<()>;
async fn check_request_timeout(&self, request_out_time: u32);
}
impl<T: IController + 'static> IAsyncTokenInner for Actor<AsyncToken<T>> {
type Controller = T;
#[inline]
async fn set_controller(&self, controller: Arc<T>) {
self.inner_call(|inner| async move { inner.get_mut().controller = Some(controller) })
.await
}
#[inline]
async fn clear_controller_fun_maps(&self) {
self.inner_call(|inner| async move {
inner.get_mut().controller = None;
})
.await
}
#[inline]
async fn set_peer(&self, peer: Option<Arc<NetPeer>>) {
self.inner_call(|inner| async move {
inner.get_mut().peer = peer;
})
.await
}
#[inline]
async fn call_special_function(&self, cmd_tag: i32) -> anyhow::Result<()> {
unsafe { self.deref_inner().call_special_function(cmd_tag).await }
}
#[inline]
async fn execute_controller(&self, tt: u8, cmd: i32, dr: DataOwnedReader) -> RetResult {
unsafe {
match self.deref_inner().execute_controller(tt, cmd, dr).await {
Ok(res) => res,
Err(err) => {
log::error!(
"session id:{} call cmd:{} error:{:?}",
self.get_session_id(),
cmd,
err
);
RetResult::error(
-1,
format!(
"session id:{} call cmd:{} error:{}",
self.get_session_id(),
cmd,
err
),
)
}
}
}
}
#[inline]
async fn set_result(&self, serial: i64, dr: DataOwnedReader) -> anyhow::Result<()> {
let have_tx: Option<Sender<crate::error::Result<DataOwnedReader>>> = self
.inner_call(|inner| async move { inner.get_mut().result_dict.remove(&serial) })
.await;
if let Some(tx) = have_tx {
Ok(tx
.send(Ok(dr))
.map_err(|_| crate::error::Error::SerialClose(serial))?)
} else {
match RetResult::from(dr) {
Ok(res) => match res.check() {
Ok(_) => {
log::error!("not found 2 {}", serial)
}
Err(err) => {
log::error!("{}", err)
}
},
Err(er) => {
log::error!("not found {} :{}", serial, er)
}
}
Ok(())
}
}
#[inline]
async fn check_request_timeout(&self, request_out_time: u32) {
self.inner_call(|inner| async move {
inner.get_mut().check_request_timeout(request_out_time);
})
.await
}
}
pub trait IAsyncToken {
type Controller: IController;
fn get_session_id(&self) -> i64;
fn new_serial(&self) -> i64;
fn get_peer(&self) -> impl std::future::Future<Output = Option<Arc<NetPeer>>>;
fn send(&self, buff: Vec<u8>) -> impl std::future::Future<Output = crate::error::Result<()>>;
fn get_token(
&self,
session_id: i64,
) -> impl std::future::Future<Output = crate::error::Result<Option<NetxToken<Self::Controller>>>>;
fn get_all_tokens(
&self,
) -> impl std::future::Future<Output = crate::error::Result<Vec<NetxToken<Self::Controller>>>>;
fn call(
&self,
serial: i64,
buff: Data,
) -> impl std::future::Future<Output = crate::error::Result<RetResult>>;
fn run(&self, buff: Data) -> impl std::future::Future<Output = crate::error::Result<()>>;
fn is_disconnect(&self) -> impl std::future::Future<Output = bool>;
}
impl<T: IController + 'static> IAsyncToken for Actor<AsyncToken<T>> {
type Controller = T;
#[inline]
fn get_session_id(&self) -> i64 {
unsafe { self.deref_inner().session_id }
}
#[inline]
fn new_serial(&self) -> i64 {
unsafe { self.deref_inner().new_serial() }
}
#[inline]
async fn get_peer(&self) -> Option<Arc<NetPeer>> {
self.inner_call(|inner| async move { inner.get_mut().peer.clone() })
.await
}
#[inline]
async fn send(&self, buff: Vec<u8>) -> crate::error::Result<()> {
unsafe {
if let Some(peer) = self.deref_inner().peer.clone() {
Ok(peer.send_all(buff).await?)
} else {
Err(crate::error::Error::TokenDisconnect(self.get_session_id()))
}
}
}
#[inline]
async fn get_token(&self, session_id: i64) -> crate::error::Result<Option<NetxToken<T>>> {
self.inner_call(|inner| async move {
let manager = inner
.get()
.manager
.upgrade()
.ok_or_else(|| crate::error::Error::ManagerUpgradeFail)?;
Ok(manager.get_token(session_id).await)
})
.await
}
#[inline]
async fn get_all_tokens(&self) -> crate::error::Result<Vec<NetxToken<T>>> {
self.inner_call(|inner| async move {
let manager = inner
.get()
.manager
.upgrade()
.ok_or_else(|| crate::error::Error::ManagerUpgradeFail)?;
Ok(manager.get_all_tokens().await)
})
.await
}
#[inline]
async fn call(&self, serial: i64, buff: Data) -> crate::error::Result<RetResult> {
let (peer, rx): (
Arc<NetPeer>,
Receiver<crate::error::Result<DataOwnedReader>>,
) = self
.inner_call(|inner| async move {
if let Some(peer) = inner.get().peer.clone() {
let (tx, rx): (
Sender<crate::error::Result<DataOwnedReader>>,
Receiver<crate::error::Result<DataOwnedReader>>,
) = oneshot();
if inner.get_mut().result_dict.contains_key(&serial) {
return Err(crate::error::Error::SerialHave);
}
if inner.get_mut().result_dict.insert(serial, tx).is_none() {
inner
.get_mut()
.request_queue
.push_front((serial, Instant::now()));
}
Ok((peer, rx))
} else {
Err(crate::error::Error::TokenDisconnect(inner.get().session_id))
}
})
.await?;
peer.send_all(buff.into_inner()).await?;
match rx.await {
Err(_) => Err(crate::error::Error::SerialClose(serial)),
Ok(data) => Ok(RetResult::from(data?)?),
}
}
#[inline]
async fn run(&self, buff: Data) -> crate::error::Result<()> {
let peer = self
.inner_call(|inner| async move {
if let Some(peer) = inner.get().peer.clone() {
Ok(peer)
} else {
Err(crate::error::Error::TokenDisconnect(inner.get().session_id))
}
})
.await?;
peer.send_all(buff.into_inner()).await?;
Ok(())
}
#[inline]
async fn is_disconnect(&self) -> bool {
self.inner_call(|inner| async move {
if let Some(ref peer) = inner.get().peer {
#[cfg(all(feature = "tcpserver", not(feature = "tcp-channel-server")))]
if let Ok(r) = peer.is_disconnect().await {
return r;
}
#[cfg(feature = "tcp-channel-server")]
return peer.is_disconnect();
}
true
})
.await
}
}
#[macro_export]
macro_rules! call_peer {
(@uint $($x:tt)*)=>(());
(@count $($rest:expr),*)=>(<[()]>::len(&[$(call_peer!(@uint $rest)),*]));
($peer:expr=>$cmd:expr;$($args:expr), *$(,)*) => ({
use data_rw::Data;
let mut data=Data::with_capacity(128);
let args_count=call_peer!(@count $($args),*) as i32;
let serial=$peer.new_serial();
data.write_fixed(0u32);
data.write_fixed(2400u32);
data.write_fixed(2u8);
data.write_fixed($cmd);
data.write_fixed(serial);
data.write_fixed(args_count);
$(data.pack_serialize($args)?;)*
let len=data.len();
(&mut data[0..4]).put_u32_le(len as u32);
let mut ret= $peer.call(serial,data).await?.check()?;
ret.deserialize()?
});
(@result $peer:expr=>$cmd:expr;$($args:expr), *$(,)*) => ({
use data_rw::Data;
let mut data=Data::with_capacity(128);
let args_count=call_peer!(@count $($args),*) as i32;
let serial=$peer.new_serial();
data.write_fixed(0u32);
data.write_fixed(2400u32);
data.write_fixed(2u8);
data.write_fixed($cmd);
data.write_fixed(serial);
data.write_fixed(args_count);
$(data.pack_serialize($args)?;)*
let len=data.len();
(&mut data[0..4]).put_u32_le(len as u32);
$peer.call(serial,data).await?
});
(@run $peer:expr=>$cmd:expr;$($args:expr), *$(,)*) => ({
use data_rw::Data;
let mut data=Data::with_capacity(128);
let args_count=call_peer!(@count $($args),*) as i32;
let serial=$peer.new_serial();
data.write_fixed(0u32);
data.write_fixed(2400u32);
data.write_fixed(0u8);
data.write_fixed($cmd);
data.write_fixed(serial);
data.write_fixed(args_count);
$(data.pack_serialize($args)?;)*
let len=data.len();
(&mut data[0..4]).put_u32_le(len as u32);
$peer.run(data).await?;
});
(@run_not_err $peer:expr=>$cmd:expr;$($args:expr), *$(,)*) => ({
use data_rw::Data;
let mut data=Data::with_capacity(128);
let args_count=call_peer!(@count $($args),*) as i32;
let serial=$peer.new_serial();
data.write_fixed(0u32);
data.write_fixed(2400u32);
data.write_fixed(0u8);
data.write_fixed($cmd);
data.write_fixed(serial);
data.write_fixed(args_count);
$(
if let Err(err)= data.pack_serialize($args){
log::error!{"pack_serialize {} is error:{}",$cmd,err};
}
)*
let len=data.len();
(&mut data[0..4]).put_u32_le(len as u32);
if let Err(err)= $peer.run(data).await{
log::warn!{"run {} is error:{}",$cmd,err}
}
});
(@checkrun $peer:expr=>$cmd:expr;$($args:expr), *$(,)*) => ({
use data_rw::Data;
let mut data=Data::with_capacity(128);
let args_count=call_peer!(@count $($args),*) as i32;
let serial=$peer.new_serial();
data.write_fixed(0u32);
data.write_fixed(2400u32);
data.write_fixed(1u8);
data.write_fixed($cmd);
data.write_fixed(serial);
data.write_fixed(args_count);
$(data.pack_serialize($args)?;)*
let len=data.len();
(&mut data[0..4]).put_u32_le(len as u32);
$peer.call(serial,data).await?.check()?;
});
}
#[macro_export]
macro_rules! impl_ref {
($client:expr=>$interface:ty) => {
paste::paste! {
[<___impl_ $interface _call>]::new_ref(&$client)
}
};
}