netxserver/server/
async_token_manager.rs

1use crate::async_token::{IAsyncToken, IAsyncTokenInner};
2use crate::controller::ICreateController;
3use crate::impl_server::SpecialFunctionTag;
4use crate::server::async_token::{AsyncToken, NetxToken};
5use aqueue::Actor;
6use std::collections::{HashMap, VecDeque};
7use std::sync::{Arc, Weak};
8use tokio::time::{sleep, Duration, Instant};
9
10/// Manages asynchronous tokens, including their creation, timeout checks, and disconnection handling.
11pub struct AsyncTokenManager<T: ICreateController + 'static> {
12    impl_controller: T,
13    dict: HashMap<i64, NetxToken<T::Controller>>,
14    request_out_time: u32,
15    session_save_time: u32,
16    request_disconnect_clear_queue: VecDeque<(i64, Instant)>,
17}
18
19unsafe impl<T: ICreateController + 'static> Send for AsyncTokenManager<T> {}
20unsafe impl<T: ICreateController + 'static> Sync for AsyncTokenManager<T> {}
21
22/// Type alias for a reference-counted actor managing asynchronous tokens.
23pub type TokenManager<T> = Arc<Actor<AsyncTokenManager<T>>>;
24
25impl<T: ICreateController + 'static> AsyncTokenManager<T> {
26    /// Creates a new `AsyncTokenManager` instance.
27    ///
28    /// # Arguments
29    ///
30    /// * `impl_controller` - The controller implementation.
31    /// * `request_out_time` - The timeout duration for requests.
32    /// * `session_save_time` - The duration to save sessions.
33    ///
34    /// # Returns
35    ///
36    /// An `Arc` wrapped `Actor` managing the `AsyncTokenManager`.
37    #[inline]
38    pub(crate) fn new(
39        impl_controller: T,
40        request_out_time: u32,
41        session_save_time: u32,
42    ) -> TokenManager<T> {
43        let ptr = Arc::new(Actor::new(AsyncTokenManager {
44            impl_controller,
45            dict: HashMap::new(),
46            request_out_time,
47            session_save_time,
48            request_disconnect_clear_queue: Default::default(),
49        }));
50
51        Self::start_check(Arc::downgrade(&ptr));
52        ptr
53    }
54
55    /// Starts the periodic check for token timeouts.
56    ///
57    /// # Arguments
58    ///
59    /// * `wk` - A weak reference to the `Actor` managing the `AsyncTokenManager`.
60    #[inline]
61    fn start_check(wk: Weak<Actor<AsyncTokenManager<T>>>) {
62        tokio::spawn(async move {
63            while let Some(manager) = wk.upgrade() {
64                manager.check_tokens_request_timeout().await;
65                manager.check_tokens_disconnect_timeout().await;
66
67                sleep(Duration::from_millis(50)).await
68            }
69        });
70    }
71
72    /// Checks for tokens that have timed out on requests.
73    #[inline]
74    async fn check_tokens_request_timeout(&self) {
75        for token in self.dict.values() {
76            token.check_request_timeout(self.request_out_time).await;
77        }
78    }
79
80    /// Checks for tokens that have timed out on disconnection.
81    #[inline]
82    async fn check_tokens_disconnect_timeout(&mut self) {
83        while let Some(item) = self.request_disconnect_clear_queue.pop_back() {
84            if item.1.elapsed().as_millis() as u32 >= self.session_save_time {
85                if let Some(token) = self.dict.get(&item.0) {
86                    if token.is_disconnect().await {
87                        if let Some(token) = self.dict.remove(&item.0) {
88                            if let Err(er) = token
89                                .call_special_function(SpecialFunctionTag::Closed as i32)
90                                .await
91                            {
92                                log::error!("call token Closed err:{}", er)
93                            }
94                            token.clear_controller_fun_maps().await;
95                            log::debug!("token {} remove", token.get_session_id());
96                        } else {
97                            log::debug!("remove token {} fail", item.0);
98                        }
99                    } else {
100                        log::debug!("remove token {},but it not disconnect", item.0);
101                    }
102                } else {
103                    log::debug!("remove token not found {}", item.0);
104                }
105            } else {
106                self.request_disconnect_clear_queue.push_back(item);
107                break;
108            }
109        }
110    }
111
112    /// Generates a new session ID.
113    ///
114    /// # Returns
115    ///
116    /// A new session ID as an `i64`.
117    #[inline]
118    fn make_new_session_id(&mut self) -> i64 {
119        chrono::Local::now().timestamp_nanos_opt().unwrap()
120    }
121
122    /// Creates a new token.
123    ///
124    /// # Arguments
125    ///
126    /// * `manager` - A weak reference to the `Actor` managing the `AsyncTokenManager`.
127    ///
128    /// # Returns
129    ///
130    /// A `Result` containing the new `NetxToken` or an error.
131    #[inline]
132    async fn create_token(
133        &mut self,
134        manager: Weak<Actor<AsyncTokenManager<T>>>,
135    ) -> anyhow::Result<NetxToken<T::Controller>> {
136        let session_id = self.make_new_session_id();
137        let token = Arc::new(Actor::new(AsyncToken::new(session_id, manager)));
138        let controller = self.impl_controller.create_controller(token.clone())?;
139        token.set_controller(controller).await;
140        self.dict.insert(session_id, token.clone());
141        Ok(token)
142    }
143
144    /// Retrieves a token by its session ID.
145    ///
146    /// # Arguments
147    ///
148    /// * `session_id` - The session ID of the token.
149    ///
150    /// # Returns
151    ///
152    /// An `Option` containing the `NetxToken` if found.
153    #[inline]
154    pub fn get_token(&self, session_id: i64) -> Option<NetxToken<T::Controller>> {
155        self.dict.get(&session_id).cloned()
156    }
157
158    /// Retrieves all tokens.
159    ///
160    /// # Returns
161    ///
162    /// A `Vec` containing all `NetxToken`s.
163    #[inline]
164    pub fn get_all_tokens(&self) -> Vec<NetxToken<T::Controller>> {
165        self.dict.values().cloned().collect()
166    }
167}
168
169/// Trait for creating tokens asynchronously.
170pub(crate) trait IAsyncTokenManagerCreateToken<T> {
171    /// Creates a new token.
172    ///
173    /// # Arguments
174    ///
175    /// * `manager` - A weak reference to the `Actor` managing the `AsyncTokenManager`.
176    ///
177    /// # Returns
178    ///
179    /// A `Result` containing the new `NetxToken` or an error.
180    async fn create_token(&self, manager: Weak<Self>) -> anyhow::Result<NetxToken<T>>;
181}
182
183impl<T: ICreateController + 'static> IAsyncTokenManagerCreateToken<T::Controller>
184    for Actor<AsyncTokenManager<T>>
185{
186    #[inline]
187    async fn create_token(&self, manager: Weak<Self>) -> anyhow::Result<NetxToken<T::Controller>> {
188        self.inner_call(|inner| async move { inner.get_mut().create_token(manager).await })
189            .await
190    }
191}
192
193/// Trait for managing tokens.
194#[async_trait::async_trait]
195pub trait ITokenManager<T>: Send + Sync {
196    /// Retrieves a token by its session ID.
197    ///
198    /// # Arguments
199    ///
200    /// * `session_id` - The session ID of the token.
201    ///
202    /// # Returns
203    ///
204    /// An `Option` containing the `NetxToken` if found.
205    async fn get_token(&self, session_id: i64) -> Option<NetxToken<T>>;
206
207    /// Retrieves all tokens.
208    ///
209    /// # Returns
210    ///
211    /// A `Result` containing a `Vec` of all `NetxToken`s or an error.
212    async fn get_all_tokens(&self) -> Vec<NetxToken<T>>;
213}
214
215/// Trait for managing tokens asynchronously.
216#[async_trait::async_trait]
217pub(crate) trait IAsyncTokenManager<T>: ITokenManager<T> {
218    /// Checks for tokens that have timed out on requests.
219    async fn check_tokens_request_timeout(&self);
220
221    /// Checks for tokens that have timed out on disconnection.
222    async fn check_tokens_disconnect_timeout(&self);
223
224    /// Handles peer disconnection.
225    ///
226    /// # Arguments
227    ///
228    /// * `session_id` - The session ID of the token.
229    async fn peer_disconnect(&self, session_id: i64);
230}
231
232#[async_trait::async_trait]
233impl<T: ICreateController + 'static> ITokenManager<T::Controller> for Actor<AsyncTokenManager<T>> {
234    #[inline]
235    async fn get_token(&self, session_id: i64) -> Option<NetxToken<T::Controller>> {
236        self.inner_call(|inner| async move { inner.get().get_token(session_id) })
237            .await
238    }
239
240    #[inline]
241    async fn get_all_tokens(&self) -> Vec<NetxToken<T::Controller>> {
242        self.inner_call(|inner| async move { inner.get().get_all_tokens() })
243            .await
244    }
245}
246
247#[async_trait::async_trait]
248impl<T: ICreateController + 'static> IAsyncTokenManager<T::Controller>
249    for Actor<AsyncTokenManager<T>>
250{
251    #[inline]
252    async fn check_tokens_request_timeout(&self) {
253        unsafe { self.deref_inner().check_tokens_request_timeout().await }
254    }
255
256    async fn check_tokens_disconnect_timeout(&self) {
257        self.inner_call(
258            |inner| async move { inner.get_mut().check_tokens_disconnect_timeout().await },
259        )
260        .await
261    }
262
263    #[inline]
264    async fn peer_disconnect(&self, session_id: i64) {
265        self.inner_call(|inner| async move {
266            log::debug!("token {} start disconnect clear ", session_id);
267            inner
268                .get_mut()
269                .request_disconnect_clear_queue
270                .push_front((session_id, Instant::now()));
271        })
272        .await
273    }
274}