netxserver/server/
async_token_manager.rs1use crate::async_token::{IAsyncToken, IAsyncTokenInner};
2use crate::controller::ICreateController;
3use crate::impl_server::SpecialFunctionTag;
4use crate::server::async_token::{AsyncToken, NetxToken};
5use aqueue::Actor;
6use std::collections::{HashMap, VecDeque};
7use std::sync::{Arc, Weak};
8use tokio::time::{sleep, Duration, Instant};
9
10pub struct AsyncTokenManager<T: ICreateController + 'static> {
12 impl_controller: T,
13 dict: HashMap<i64, NetxToken<T::Controller>>,
14 request_out_time: u32,
15 session_save_time: u32,
16 request_disconnect_clear_queue: VecDeque<(i64, Instant)>,
17}
18
19unsafe impl<T: ICreateController + 'static> Send for AsyncTokenManager<T> {}
20unsafe impl<T: ICreateController + 'static> Sync for AsyncTokenManager<T> {}
21
22pub type TokenManager<T> = Arc<Actor<AsyncTokenManager<T>>>;
24
25impl<T: ICreateController + 'static> AsyncTokenManager<T> {
26 #[inline]
38 pub(crate) fn new(
39 impl_controller: T,
40 request_out_time: u32,
41 session_save_time: u32,
42 ) -> TokenManager<T> {
43 let ptr = Arc::new(Actor::new(AsyncTokenManager {
44 impl_controller,
45 dict: HashMap::new(),
46 request_out_time,
47 session_save_time,
48 request_disconnect_clear_queue: Default::default(),
49 }));
50
51 Self::start_check(Arc::downgrade(&ptr));
52 ptr
53 }
54
55 #[inline]
61 fn start_check(wk: Weak<Actor<AsyncTokenManager<T>>>) {
62 tokio::spawn(async move {
63 while let Some(manager) = wk.upgrade() {
64 manager.check_tokens_request_timeout().await;
65 manager.check_tokens_disconnect_timeout().await;
66
67 sleep(Duration::from_millis(50)).await
68 }
69 });
70 }
71
72 #[inline]
74 async fn check_tokens_request_timeout(&self) {
75 for token in self.dict.values() {
76 token.check_request_timeout(self.request_out_time).await;
77 }
78 }
79
80 #[inline]
82 async fn check_tokens_disconnect_timeout(&mut self) {
83 while let Some(item) = self.request_disconnect_clear_queue.pop_back() {
84 if item.1.elapsed().as_millis() as u32 >= self.session_save_time {
85 if let Some(token) = self.dict.get(&item.0) {
86 if token.is_disconnect().await {
87 if let Some(token) = self.dict.remove(&item.0) {
88 if let Err(er) = token
89 .call_special_function(SpecialFunctionTag::Closed as i32)
90 .await
91 {
92 log::error!("call token Closed err:{}", er)
93 }
94 token.clear_controller_fun_maps().await;
95 log::debug!("token {} remove", token.get_session_id());
96 } else {
97 log::debug!("remove token {} fail", item.0);
98 }
99 } else {
100 log::debug!("remove token {},but it not disconnect", item.0);
101 }
102 } else {
103 log::debug!("remove token not found {}", item.0);
104 }
105 } else {
106 self.request_disconnect_clear_queue.push_back(item);
107 break;
108 }
109 }
110 }
111
112 #[inline]
118 fn make_new_session_id(&mut self) -> i64 {
119 chrono::Local::now().timestamp_nanos_opt().unwrap()
120 }
121
122 #[inline]
132 async fn create_token(
133 &mut self,
134 manager: Weak<Actor<AsyncTokenManager<T>>>,
135 ) -> anyhow::Result<NetxToken<T::Controller>> {
136 let session_id = self.make_new_session_id();
137 let token = Arc::new(Actor::new(AsyncToken::new(session_id, manager)));
138 let controller = self.impl_controller.create_controller(token.clone())?;
139 token.set_controller(controller).await;
140 self.dict.insert(session_id, token.clone());
141 Ok(token)
142 }
143
144 #[inline]
154 pub fn get_token(&self, session_id: i64) -> Option<NetxToken<T::Controller>> {
155 self.dict.get(&session_id).cloned()
156 }
157
158 #[inline]
164 pub fn get_all_tokens(&self) -> Vec<NetxToken<T::Controller>> {
165 self.dict.values().cloned().collect()
166 }
167}
168
169pub(crate) trait IAsyncTokenManagerCreateToken<T> {
171 async fn create_token(&self, manager: Weak<Self>) -> anyhow::Result<NetxToken<T>>;
181}
182
183impl<T: ICreateController + 'static> IAsyncTokenManagerCreateToken<T::Controller>
184 for Actor<AsyncTokenManager<T>>
185{
186 #[inline]
187 async fn create_token(&self, manager: Weak<Self>) -> anyhow::Result<NetxToken<T::Controller>> {
188 self.inner_call(|inner| async move { inner.get_mut().create_token(manager).await })
189 .await
190 }
191}
192
193#[async_trait::async_trait]
195pub trait ITokenManager<T>: Send + Sync {
196 async fn get_token(&self, session_id: i64) -> Option<NetxToken<T>>;
206
207 async fn get_all_tokens(&self) -> Vec<NetxToken<T>>;
213}
214
215#[async_trait::async_trait]
217pub(crate) trait IAsyncTokenManager<T>: ITokenManager<T> {
218 async fn check_tokens_request_timeout(&self);
220
221 async fn check_tokens_disconnect_timeout(&self);
223
224 async fn peer_disconnect(&self, session_id: i64);
230}
231
232#[async_trait::async_trait]
233impl<T: ICreateController + 'static> ITokenManager<T::Controller> for Actor<AsyncTokenManager<T>> {
234 #[inline]
235 async fn get_token(&self, session_id: i64) -> Option<NetxToken<T::Controller>> {
236 self.inner_call(|inner| async move { inner.get().get_token(session_id) })
237 .await
238 }
239
240 #[inline]
241 async fn get_all_tokens(&self) -> Vec<NetxToken<T::Controller>> {
242 self.inner_call(|inner| async move { inner.get().get_all_tokens() })
243 .await
244 }
245}
246
247#[async_trait::async_trait]
248impl<T: ICreateController + 'static> IAsyncTokenManager<T::Controller>
249 for Actor<AsyncTokenManager<T>>
250{
251 #[inline]
252 async fn check_tokens_request_timeout(&self) {
253 unsafe { self.deref_inner().check_tokens_request_timeout().await }
254 }
255
256 async fn check_tokens_disconnect_timeout(&self) {
257 self.inner_call(
258 |inner| async move { inner.get_mut().check_tokens_disconnect_timeout().await },
259 )
260 .await
261 }
262
263 #[inline]
264 async fn peer_disconnect(&self, session_id: i64) {
265 self.inner_call(|inner| async move {
266 log::debug!("token {} start disconnect clear ", session_id);
267 inner
268 .get_mut()
269 .request_disconnect_clear_queue
270 .push_front((session_id, Instant::now()));
271 })
272 .await
273 }
274}