1use crate::async_token_manager::IAsyncTokenManager;
2use crate::{IController, NetPeer, RetResult};
3use aqueue::Actor;
5use data_rw::{Data, DataOwnedReader};
6use oneshot::{channel as oneshot, Receiver, Sender};
7use std::collections::{HashMap, VecDeque};
8use std::sync::atomic::{AtomicI64, Ordering};
9use std::sync::{Arc, Weak};
10use tokio::time::Instant;
11
12#[cfg(all(feature = "tcpserver", not(feature = "tcp-channel-server")))]
13use tcpserver::IPeer;
14
15pub struct AsyncToken<T> {
17 session_id: i64,
19 controller: Option<Arc<T>>,
21 peer: Option<Arc<NetPeer>>,
23 manager: Weak<dyn IAsyncTokenManager<T>>,
25 result_dict: HashMap<i64, Sender<crate::error::Result<DataOwnedReader>>>,
27 serial_atomic: AtomicI64,
29 request_queue: VecDeque<(i64, Instant)>,
31}
32
33unsafe impl<T: IController> Send for AsyncToken<T> {}
34unsafe impl<T: IController> Sync for AsyncToken<T> {}
35
36pub type NetxToken<T> = Arc<Actor<AsyncToken<T>>>;
38
39impl<T: IController> AsyncToken<T> {
40 pub(crate) fn new(session_id: i64, manager: Weak<dyn IAsyncTokenManager<T>>) -> AsyncToken<T> {
42 AsyncToken {
43 session_id,
44 controller: None,
45 peer: None,
46 manager,
47 result_dict: Default::default(),
48 serial_atomic: AtomicI64::new(1),
49 request_queue: Default::default(),
50 }
51 }
52}
53
54impl<T> Drop for AsyncToken<T> {
55 fn drop(&mut self) {
57 log::debug!("token session id:{} drop", self.session_id);
58 }
59}
60
61impl<T: IController> AsyncToken<T> {
62 #[inline]
64 pub(crate) async fn call_special_function(&self, cmd_tag: i32) -> anyhow::Result<()> {
65 if let Some(ref controller) = self.controller {
66 controller
67 .call(1, cmd_tag, DataOwnedReader::new(vec![0; 4]))
68 .await?;
69 }
70 Ok(())
71 }
72
73 #[inline]
75 pub(crate) async fn execute_controller(
76 &self,
77 tt: u8,
78 cmd: i32,
79 dr: DataOwnedReader,
80 ) -> anyhow::Result<RetResult> {
81 if let Some(ref controller) = self.controller {
82 return controller.call(tt, cmd, dr).await;
83 }
84 anyhow::bail!("controller is none")
85 }
86
87 #[inline]
89 pub(crate) fn new_serial(&self) -> i64 {
90 self.serial_atomic.fetch_add(1, Ordering::Acquire)
91 }
92
93 #[inline]
95 pub fn set_error(&mut self, serial: i64, err: crate::error::Error) -> crate::error::Result<()> {
96 if let Some(tx) = self.result_dict.remove(&serial) {
97 Ok(tx
98 .send(Err(err))
99 .map_err(|_| crate::error::Error::SerialClose(serial))?)
100 } else {
101 Ok(())
102 }
103 }
104
105 #[inline]
107 pub fn check_request_timeout(&mut self, request_out_time: u32) {
108 while let Some(item) = self.request_queue.pop_back() {
109 if item.1.elapsed().as_millis() as u32 >= request_out_time {
110 if let Err(er) = self.set_error(item.0, crate::error::Error::SerialTimeOut(item.0))
111 {
112 log::error!("check err:{}", er);
113 }
114 } else {
115 self.request_queue.push_back(item);
116 break;
117 }
118 }
119 }
120}
121
122pub(crate) trait IAsyncTokenInner {
124 type Controller: IController;
126
127 async fn set_controller(&self, controller: Arc<Self::Controller>);
133
134 async fn clear_controller_fun_maps(&self);
136
137 async fn set_peer(&self, peer: Option<Arc<NetPeer>>);
143
144 async fn call_special_function(&self, cmd_tag: i32) -> anyhow::Result<()>;
154
155 async fn execute_controller(&self, tt: u8, cmd: i32, data: DataOwnedReader) -> RetResult;
167
168 async fn set_result(&self, serial: i64, data: DataOwnedReader) -> anyhow::Result<()>;
179
180 async fn check_request_timeout(&self, request_out_time: u32);
186}
187
188impl<T: IController + 'static> IAsyncTokenInner for Actor<AsyncToken<T>> {
189 type Controller = T;
190
191 #[inline]
192 async fn set_controller(&self, controller: Arc<T>) {
193 self.inner_call(|inner| async move { inner.get_mut().controller = Some(controller) })
194 .await
195 }
196
197 #[inline]
198 async fn clear_controller_fun_maps(&self) {
199 self.inner_call(|inner| async move {
200 inner.get_mut().controller = None;
201 })
202 .await
203 }
204
205 #[inline]
206 async fn set_peer(&self, peer: Option<Arc<NetPeer>>) {
207 self.inner_call(|inner| async move {
208 inner.get_mut().peer = peer;
209 })
210 .await
211 }
212
213 #[inline]
214 async fn call_special_function(&self, cmd_tag: i32) -> anyhow::Result<()> {
215 unsafe { self.deref_inner().call_special_function(cmd_tag).await }
216 }
217
218 #[inline]
219 async fn execute_controller(&self, tt: u8, cmd: i32, dr: DataOwnedReader) -> RetResult {
220 unsafe {
221 match self.deref_inner().execute_controller(tt, cmd, dr).await {
222 Ok(res) => res,
223 Err(err) => {
224 log::error!(
225 "session id:{} call cmd:{} error:{:?}",
226 self.get_session_id(),
227 cmd,
228 err
229 );
230 RetResult::error(
231 -1,
232 format!(
233 "session id:{} call cmd:{} error:{}",
234 self.get_session_id(),
235 cmd,
236 err
237 ),
238 )
239 }
240 }
241 }
242 }
243
244 #[inline]
245 async fn set_result(&self, serial: i64, dr: DataOwnedReader) -> anyhow::Result<()> {
246 let have_tx: Option<Sender<crate::error::Result<DataOwnedReader>>> = self
247 .inner_call(|inner| async move { inner.get_mut().result_dict.remove(&serial) })
248 .await;
249
250 if let Some(tx) = have_tx {
251 Ok(tx
252 .send(Ok(dr))
253 .map_err(|_| crate::error::Error::SerialClose(serial))?)
254 } else {
255 match RetResult::from(dr) {
256 Ok(res) => match res.check() {
257 Ok(_) => {
258 log::error!("not found 2 {}", serial)
259 }
260 Err(err) => {
261 log::error!("{}", err)
262 }
263 },
264 Err(er) => {
265 log::error!("not found {} :{}", serial, er)
266 }
267 }
268
269 Ok(())
270 }
271 }
272
273 #[inline]
274 async fn check_request_timeout(&self, request_out_time: u32) {
275 self.inner_call(|inner| async move {
276 inner.get_mut().check_request_timeout(request_out_time);
277 })
278 .await
279 }
280}
281
282pub trait IAsyncToken {
284 type Controller: IController;
286
287 fn get_session_id(&self) -> i64;
293
294 fn new_serial(&self) -> i64;
300
301 fn get_peer(&self) -> impl std::future::Future<Output = Option<Arc<NetPeer>>>;
307
308 fn send(&self, buff: Vec<u8>) -> impl std::future::Future<Output = crate::error::Result<()>>;
318
319 fn get_token(
329 &self,
330 session_id: i64,
331 ) -> impl std::future::Future<Output = crate::error::Result<Option<NetxToken<Self::Controller>>>>;
332
333 fn get_all_tokens(
339 &self,
340 ) -> impl std::future::Future<Output = crate::error::Result<Vec<NetxToken<Self::Controller>>>>;
341
342 fn call(
353 &self,
354 serial: i64,
355 buff: Data,
356 ) -> impl std::future::Future<Output = crate::error::Result<RetResult>>;
357
358 fn run(&self, buff: Data) -> impl std::future::Future<Output = crate::error::Result<()>>;
368
369 fn is_disconnect(&self) -> impl std::future::Future<Output = bool>;
375}
376
377impl<T: IController + 'static> IAsyncToken for Actor<AsyncToken<T>> {
378 type Controller = T;
380
381 #[inline]
382 fn get_session_id(&self) -> i64 {
383 unsafe { self.deref_inner().session_id }
384 }
385
386 #[inline]
387 fn new_serial(&self) -> i64 {
388 unsafe { self.deref_inner().new_serial() }
389 }
390
391 #[inline]
392 async fn get_peer(&self) -> Option<Arc<NetPeer>> {
393 self.inner_call(|inner| async move { inner.get_mut().peer.clone() })
394 .await
395 }
396
397 #[inline]
398 async fn send(&self, buff: Vec<u8>) -> crate::error::Result<()> {
399 unsafe {
400 if let Some(peer) = self.deref_inner().peer.clone() {
401 Ok(peer.send_all(buff).await?)
402 } else {
403 Err(crate::error::Error::TokenDisconnect(self.get_session_id()))
404 }
405 }
406 }
407
408 #[inline]
409 async fn get_token(&self, session_id: i64) -> crate::error::Result<Option<NetxToken<T>>> {
410 self.inner_call(|inner| async move {
411 let manager = inner
412 .get()
413 .manager
414 .upgrade()
415 .ok_or_else(|| crate::error::Error::ManagerUpgradeFail)?;
416 Ok(manager.get_token(session_id).await)
417 })
418 .await
419 }
420
421 #[inline]
422 async fn get_all_tokens(&self) -> crate::error::Result<Vec<NetxToken<T>>> {
423 self.inner_call(|inner| async move {
424 let manager = inner
425 .get()
426 .manager
427 .upgrade()
428 .ok_or_else(|| crate::error::Error::ManagerUpgradeFail)?;
429 Ok(manager.get_all_tokens().await)
430 })
431 .await
432 }
433
434 #[inline]
435 async fn call(&self, serial: i64, buff: Data) -> crate::error::Result<RetResult> {
436 let (peer, rx): (
437 Arc<NetPeer>,
438 Receiver<crate::error::Result<DataOwnedReader>>,
439 ) = self
440 .inner_call(|inner| async move {
441 if let Some(peer) = inner.get().peer.clone() {
442 let (tx, rx): (
443 Sender<crate::error::Result<DataOwnedReader>>,
444 Receiver<crate::error::Result<DataOwnedReader>>,
445 ) = oneshot();
446 if inner.get_mut().result_dict.contains_key(&serial) {
447 return Err(crate::error::Error::SerialHave);
448 }
449 if inner.get_mut().result_dict.insert(serial, tx).is_none() {
450 inner
451 .get_mut()
452 .request_queue
453 .push_front((serial, Instant::now()));
454 }
455 Ok((peer, rx))
456 } else {
457 Err(crate::error::Error::TokenDisconnect(inner.get().session_id))
458 }
459 })
460 .await?;
461 peer.send_all(buff.into_inner()).await?;
462 match rx.await {
463 Err(_) => Err(crate::error::Error::SerialClose(serial)),
464 Ok(data) => Ok(RetResult::from(data?)?),
465 }
466 }
467
468 #[inline]
469 async fn run(&self, buff: Data) -> crate::error::Result<()> {
470 let peer = self
471 .inner_call(|inner| async move {
472 if let Some(peer) = inner.get().peer.clone() {
473 Ok(peer)
474 } else {
475 Err(crate::error::Error::TokenDisconnect(inner.get().session_id))
476 }
477 })
478 .await?;
479 peer.send_all(buff.into_inner()).await?;
480 Ok(())
481 }
482
483 #[inline]
484 async fn is_disconnect(&self) -> bool {
485 self.inner_call(|inner| async move {
486 if let Some(ref peer) = inner.get().peer {
487 #[cfg(all(feature = "tcpserver", not(feature = "tcp-channel-server")))]
488 if let Ok(r) = peer.is_disconnect().await {
489 return r;
490 }
491
492 #[cfg(feature = "tcp-channel-server")]
493 return peer.is_disconnect();
494 }
495 true
496 })
497 .await
498 }
499}
500
501#[macro_export]
503macro_rules! call_peer {
504 (@uint $($x:tt)*)=>(());
505 (@count $($rest:expr),*)=>(<[()]>::len(&[$(call_peer!(@uint $rest)),*]));
506 ($peer:expr=>$cmd:expr;$($args:expr), *$(,)*) => ({
507 use data_rw::Data;
508 let mut data=Data::with_capacity(128);
509 let args_count=call_peer!(@count $($args),*) as i32;
510 let serial=$peer.new_serial();
511 data.write_fixed(0u32);
512 data.write_fixed(2400u32);
513 data.write_fixed(2u8);
514 data.write_fixed($cmd);
515 data.write_fixed(serial);
516 data.write_fixed(args_count);
517 $(data.pack_serialize($args)?;)*
518 let len=data.len();
519 (&mut data[0..4]).put_u32_le(len as u32);
520 let mut ret= $peer.call(serial,data).await?.check()?;
521 ret.deserialize()?
522 });
523 (@result $peer:expr=>$cmd:expr;$($args:expr), *$(,)*) => ({
524 use data_rw::Data;
525 let mut data=Data::with_capacity(128);
526 let args_count=call_peer!(@count $($args),*) as i32;
527 let serial=$peer.new_serial();
528 data.write_fixed(0u32);
529 data.write_fixed(2400u32);
530 data.write_fixed(2u8);
531 data.write_fixed($cmd);
532 data.write_fixed(serial);
533 data.write_fixed(args_count);
534 $(data.pack_serialize($args)?;)*
535 let len=data.len();
536 (&mut data[0..4]).put_u32_le(len as u32);
537 $peer.call(serial,data).await?
538 });
539 (@run $peer:expr=>$cmd:expr;$($args:expr), *$(,)*) => ({
540 use data_rw::Data;
541 let mut data=Data::with_capacity(128);
542 let args_count=call_peer!(@count $($args),*) as i32;
543 let serial=$peer.new_serial();
544 data.write_fixed(0u32);
545 data.write_fixed(2400u32);
546 data.write_fixed(0u8);
547 data.write_fixed($cmd);
548 data.write_fixed(serial);
549 data.write_fixed(args_count);
550 $(data.pack_serialize($args)?;)*
551 let len=data.len();
552 (&mut data[0..4]).put_u32_le(len as u32);
553 $peer.run(data).await?;
554 });
555 (@run_not_err $peer:expr=>$cmd:expr;$($args:expr), *$(,)*) => ({
556 use data_rw::Data;
557 let mut data=Data::with_capacity(128);
558 let args_count=call_peer!(@count $($args),*) as i32;
559 let serial=$peer.new_serial();
560 data.write_fixed(0u32);
561 data.write_fixed(2400u32);
562 data.write_fixed(0u8);
563 data.write_fixed($cmd);
564 data.write_fixed(serial);
565 data.write_fixed(args_count);
566 $(
567 if let Err(err)= data.pack_serialize($args){
568 log::error!{"pack_serialize {} is error:{}",$cmd,err};
569 }
570 )*
571 let len=data.len();
572 (&mut data[0..4]).put_u32_le(len as u32);
573 if let Err(err)= $peer.run(data).await{
574 log::warn!{"run {} is error:{}",$cmd,err}
575 }
576 });
577 (@checkrun $peer:expr=>$cmd:expr;$($args:expr), *$(,)*) => ({
578 use data_rw::Data;
579 let mut data=Data::with_capacity(128);
580 let args_count=call_peer!(@count $($args),*) as i32;
581 let serial=$peer.new_serial();
582 data.write_fixed(0u32);
583 data.write_fixed(2400u32);
584 data.write_fixed(1u8);
585 data.write_fixed($cmd);
586 data.write_fixed(serial);
587 data.write_fixed(args_count);
588 $(data.pack_serialize($args)?;)*
589 let len=data.len();
590 (&mut data[0..4]).put_u32_le(len as u32);
591 $peer.call(serial,data).await?.check()?;
592 });
593}
594
595#[macro_export]
602macro_rules! impl_ref {
603 ($client:expr=>$interface:ty) => {
604 paste::paste! {
605 [<___impl_ $interface _call>]::new_ref(&$client)
606 }
607 };
608}