netxserver/server/
async_token.rs

1use crate::async_token_manager::IAsyncTokenManager;
2use crate::{IController, NetPeer, RetResult};
3//use anyhow::{anyhow, bail, Result};
4use aqueue::Actor;
5use data_rw::{Data, DataOwnedReader};
6use oneshot::{channel as oneshot, Receiver, Sender};
7use std::collections::{HashMap, VecDeque};
8use std::sync::atomic::{AtomicI64, Ordering};
9use std::sync::{Arc, Weak};
10use tokio::time::Instant;
11
12#[cfg(all(feature = "tcpserver", not(feature = "tcp-channel-server")))]
13use tcpserver::IPeer;
14
15/// Represents an asynchronous token that manages a session and its associated data.
16pub struct AsyncToken<T> {
17    /// The session ID associated with this token.
18    session_id: i64,
19    /// The controller associated with this token, wrapped in an `Arc`.
20    controller: Option<Arc<T>>,
21    /// The network peer associated with this token, wrapped in an `Arc`.
22    peer: Option<Arc<NetPeer>>,
23    /// A weak reference to the asynchronous token manager.
24    manager: Weak<dyn IAsyncTokenManager<T>>,
25    /// A dictionary mapping serial numbers to result senders.
26    result_dict: HashMap<i64, Sender<crate::error::Result<DataOwnedReader>>>,
27    /// An atomic counter for generating serial numbers.
28    serial_atomic: AtomicI64,
29    /// A queue of requests with their timestamps.
30    request_queue: VecDeque<(i64, Instant)>,
31}
32
33unsafe impl<T: IController> Send for AsyncToken<T> {}
34unsafe impl<T: IController> Sync for AsyncToken<T> {}
35
36/// Type alias for a reference-counted actor managing an `AsyncToken`.
37pub type NetxToken<T> = Arc<Actor<AsyncToken<T>>>;
38
39impl<T: IController> AsyncToken<T> {
40    /// Creates a new `AsyncToken` with the given session ID and manager.
41    pub(crate) fn new(session_id: i64, manager: Weak<dyn IAsyncTokenManager<T>>) -> AsyncToken<T> {
42        AsyncToken {
43            session_id,
44            controller: None,
45            peer: None,
46            manager,
47            result_dict: Default::default(),
48            serial_atomic: AtomicI64::new(1),
49            request_queue: Default::default(),
50        }
51    }
52}
53
54impl<T> Drop for AsyncToken<T> {
55    /// Logs a debug message when the `AsyncToken` is dropped.
56    fn drop(&mut self) {
57        log::debug!("token session id:{} drop", self.session_id);
58    }
59}
60
61impl<T: IController> AsyncToken<T> {
62    /// Calls a special function on the controller with the given command tag.
63    #[inline]
64    pub(crate) async fn call_special_function(&self, cmd_tag: i32) -> anyhow::Result<()> {
65        if let Some(ref controller) = self.controller {
66            controller
67                .call(1, cmd_tag, DataOwnedReader::new(vec![0; 4]))
68                .await?;
69        }
70        Ok(())
71    }
72
73    /// Executes a controller function with the given parameters.
74    #[inline]
75    pub(crate) async fn execute_controller(
76        &self,
77        tt: u8,
78        cmd: i32,
79        dr: DataOwnedReader,
80    ) -> anyhow::Result<RetResult> {
81        if let Some(ref controller) = self.controller {
82            return controller.call(tt, cmd, dr).await;
83        }
84        anyhow::bail!("controller is none")
85    }
86
87    /// Generates a new serial number.
88    #[inline]
89    pub(crate) fn new_serial(&self) -> i64 {
90        self.serial_atomic.fetch_add(1, Ordering::Acquire)
91    }
92
93    /// Sets an error for the given serial number.
94    #[inline]
95    pub fn set_error(&mut self, serial: i64, err: crate::error::Error) -> crate::error::Result<()> {
96        if let Some(tx) = self.result_dict.remove(&serial) {
97            Ok(tx
98                .send(Err(err))
99                .map_err(|_| crate::error::Error::SerialClose(serial))?)
100        } else {
101            Ok(())
102        }
103    }
104
105    /// Checks for request timeouts and sets errors for timed-out requests.
106    #[inline]
107    pub fn check_request_timeout(&mut self, request_out_time: u32) {
108        while let Some(item) = self.request_queue.pop_back() {
109            if item.1.elapsed().as_millis() as u32 >= request_out_time {
110                if let Err(er) = self.set_error(item.0, crate::error::Error::SerialTimeOut(item.0))
111                {
112                    log::error!("check err:{}", er);
113                }
114            } else {
115                self.request_queue.push_back(item);
116                break;
117            }
118        }
119    }
120}
121
122/// Trait defining the inner workings of an asynchronous token.
123pub(crate) trait IAsyncTokenInner {
124    /// The type of the controller.
125    type Controller: IController;
126
127    /// Sets the controller for the asynchronous token.
128    ///
129    /// # Arguments
130    ///
131    /// * `controller` - An `Arc` reference to the controller.
132    async fn set_controller(&self, controller: Arc<Self::Controller>);
133
134    /// Clears all function mappings of the controller.
135    async fn clear_controller_fun_maps(&self);
136
137    /// Sets the network peer for the asynchronous token.
138    ///
139    /// # Arguments
140    ///
141    /// * `peer` - An optional `Arc` reference to the network peer.
142    async fn set_peer(&self, peer: Option<Arc<NetPeer>>);
143
144    /// Calls a special function on the controller, such as disconnect or connect.
145    ///
146    /// # Arguments
147    ///
148    /// * `cmd_tag` - The command tag indicating the function to call.
149    ///
150    /// # Returns
151    ///
152    /// * `Result<()>` - The result of the function call.
153    async fn call_special_function(&self, cmd_tag: i32) -> anyhow::Result<()>;
154
155    /// Executes a controller function with the given parameters.
156    ///
157    /// # Arguments
158    ///
159    /// * `tt` - The type tag.
160    /// * `cmd` - The command to execute.
161    /// * `data` - The data to pass to the function.
162    ///
163    /// # Returns
164    ///
165    /// * `RetResult` - The result of the function execution.
166    async fn execute_controller(&self, tt: u8, cmd: i32, data: DataOwnedReader) -> RetResult;
167
168    /// Sets the response result for a given serial number.
169    ///
170    /// # Arguments
171    ///
172    /// * `serial` - The serial number.
173    /// * `data` - The data to set as the result.
174    ///
175    /// # Returns
176    ///
177    /// * `Result<()>` - The result of setting the response.
178    async fn set_result(&self, serial: i64, data: DataOwnedReader) -> anyhow::Result<()>;
179
180    /// Checks for request timeouts and handles them.
181    ///
182    /// # Arguments
183    ///
184    /// * `request_out_time` - The timeout duration in milliseconds.
185    async fn check_request_timeout(&self, request_out_time: u32);
186}
187
188impl<T: IController + 'static> IAsyncTokenInner for Actor<AsyncToken<T>> {
189    type Controller = T;
190
191    #[inline]
192    async fn set_controller(&self, controller: Arc<T>) {
193        self.inner_call(|inner| async move { inner.get_mut().controller = Some(controller) })
194            .await
195    }
196
197    #[inline]
198    async fn clear_controller_fun_maps(&self) {
199        self.inner_call(|inner| async move {
200            inner.get_mut().controller = None;
201        })
202        .await
203    }
204
205    #[inline]
206    async fn set_peer(&self, peer: Option<Arc<NetPeer>>) {
207        self.inner_call(|inner| async move {
208            inner.get_mut().peer = peer;
209        })
210        .await
211    }
212
213    #[inline]
214    async fn call_special_function(&self, cmd_tag: i32) -> anyhow::Result<()> {
215        unsafe { self.deref_inner().call_special_function(cmd_tag).await }
216    }
217
218    #[inline]
219    async fn execute_controller(&self, tt: u8, cmd: i32, dr: DataOwnedReader) -> RetResult {
220        unsafe {
221            match self.deref_inner().execute_controller(tt, cmd, dr).await {
222                Ok(res) => res,
223                Err(err) => {
224                    log::error!(
225                        "session id:{} call cmd:{} error:{:?}",
226                        self.get_session_id(),
227                        cmd,
228                        err
229                    );
230                    RetResult::error(
231                        -1,
232                        format!(
233                            "session id:{} call cmd:{} error:{}",
234                            self.get_session_id(),
235                            cmd,
236                            err
237                        ),
238                    )
239                }
240            }
241        }
242    }
243
244    #[inline]
245    async fn set_result(&self, serial: i64, dr: DataOwnedReader) -> anyhow::Result<()> {
246        let have_tx: Option<Sender<crate::error::Result<DataOwnedReader>>> = self
247            .inner_call(|inner| async move { inner.get_mut().result_dict.remove(&serial) })
248            .await;
249
250        if let Some(tx) = have_tx {
251            Ok(tx
252                .send(Ok(dr))
253                .map_err(|_| crate::error::Error::SerialClose(serial))?)
254        } else {
255            match RetResult::from(dr) {
256                Ok(res) => match res.check() {
257                    Ok(_) => {
258                        log::error!("not found 2 {}", serial)
259                    }
260                    Err(err) => {
261                        log::error!("{}", err)
262                    }
263                },
264                Err(er) => {
265                    log::error!("not found {} :{}", serial, er)
266                }
267            }
268
269            Ok(())
270        }
271    }
272
273    #[inline]
274    async fn check_request_timeout(&self, request_out_time: u32) {
275        self.inner_call(|inner| async move {
276            inner.get_mut().check_request_timeout(request_out_time);
277        })
278        .await
279    }
280}
281
282/// Trait defining the interface for an asynchronous token.
283pub trait IAsyncToken {
284    /// The type of the controller.
285    type Controller: IController;
286
287    /// Gets the session ID associated with the token.
288    ///
289    /// # Returns
290    ///
291    /// * `i64` - The session ID.
292    fn get_session_id(&self) -> i64;
293
294    /// Generates a new serial ID.
295    ///
296    /// # Returns
297    ///
298    /// * `i64` - The new serial ID.
299    fn new_serial(&self) -> i64;
300
301    /// Gets the TCP socket peer.
302    ///
303    /// # Returns
304    ///
305    /// * `impl std::future::Future<Output = Option<Arc<NetPeer>>>` - A future that resolves to an optional `Arc` reference to the network peer.
306    fn get_peer(&self) -> impl std::future::Future<Output = Option<Arc<NetPeer>>>;
307
308    /// Sends a buffer.
309    ///
310    /// # Arguments
311    ///
312    /// * `buff` - The buffer to send.
313    ///
314    /// # Returns
315    ///
316    /// * `impl std::future::Future<Output = Result<()>>` - A future that resolves to a `Result`.
317    fn send(&self, buff: Vec<u8>) -> impl std::future::Future<Output = crate::error::Result<()>>;
318
319    /// Gets the network token by session ID.
320    ///
321    /// # Arguments
322    ///
323    /// * `session_id` - The session ID.
324    ///
325    /// # Returns
326    ///
327    /// * `impl std::future::Future<Output = Result<Option<NetxToken<Self::Controller>>>>` - A future that resolves to an optional `NetxToken`.
328    fn get_token(
329        &self,
330        session_id: i64,
331    ) -> impl std::future::Future<Output = crate::error::Result<Option<NetxToken<Self::Controller>>>>;
332
333    /// Gets all network tokens.
334    ///
335    /// # Returns
336    ///
337    /// * `impl std::future::Future<Output = Result<Vec<NetxToken<Self::Controller>>>>` - A future that resolves to a vector of `NetxToken`.
338    fn get_all_tokens(
339        &self,
340    ) -> impl std::future::Future<Output = crate::error::Result<Vec<NetxToken<Self::Controller>>>>;
341
342    /// Calls a function with the given serial and buffer.
343    ///
344    /// # Arguments
345    ///
346    /// * `serial` - The serial number.
347    /// * `buff` - The buffer to send.
348    ///
349    /// # Returns
350    ///
351    /// * `impl std::future::Future<Output = Result<RetResult>>` - A future that resolves to a `RetResult`.
352    fn call(
353        &self,
354        serial: i64,
355        buff: Data,
356    ) -> impl std::future::Future<Output = crate::error::Result<RetResult>>;
357
358    /// Runs a function with the given buffer.
359    ///
360    /// # Arguments
361    ///
362    /// * `buff` - The buffer to send.
363    ///
364    /// # Returns
365    ///
366    /// * `impl std::future::Future<Output = Result<()>>` - A future that resolves to a `Result`.
367    fn run(&self, buff: Data) -> impl std::future::Future<Output = crate::error::Result<()>>;
368
369    /// Checks if the connection is disconnected.
370    ///
371    /// # Returns
372    ///
373    /// * `impl std::future::Future<Output = bool>` - A future that resolves to a boolean indicating if the connection is disconnected.
374    fn is_disconnect(&self) -> impl std::future::Future<Output = bool>;
375}
376
377impl<T: IController + 'static> IAsyncToken for Actor<AsyncToken<T>> {
378    /// controller type
379    type Controller = T;
380
381    #[inline]
382    fn get_session_id(&self) -> i64 {
383        unsafe { self.deref_inner().session_id }
384    }
385
386    #[inline]
387    fn new_serial(&self) -> i64 {
388        unsafe { self.deref_inner().new_serial() }
389    }
390
391    #[inline]
392    async fn get_peer(&self) -> Option<Arc<NetPeer>> {
393        self.inner_call(|inner| async move { inner.get_mut().peer.clone() })
394            .await
395    }
396
397    #[inline]
398    async fn send(&self, buff: Vec<u8>) -> crate::error::Result<()> {
399        unsafe {
400            if let Some(peer) = self.deref_inner().peer.clone() {
401                Ok(peer.send_all(buff).await?)
402            } else {
403                Err(crate::error::Error::TokenDisconnect(self.get_session_id()))
404            }
405        }
406    }
407
408    #[inline]
409    async fn get_token(&self, session_id: i64) -> crate::error::Result<Option<NetxToken<T>>> {
410        self.inner_call(|inner| async move {
411            let manager = inner
412                .get()
413                .manager
414                .upgrade()
415                .ok_or_else(|| crate::error::Error::ManagerUpgradeFail)?;
416            Ok(manager.get_token(session_id).await)
417        })
418        .await
419    }
420
421    #[inline]
422    async fn get_all_tokens(&self) -> crate::error::Result<Vec<NetxToken<T>>> {
423        self.inner_call(|inner| async move {
424            let manager = inner
425                .get()
426                .manager
427                .upgrade()
428                .ok_or_else(|| crate::error::Error::ManagerUpgradeFail)?;
429            Ok(manager.get_all_tokens().await)
430        })
431        .await
432    }
433
434    #[inline]
435    async fn call(&self, serial: i64, buff: Data) -> crate::error::Result<RetResult> {
436        let (peer, rx): (
437            Arc<NetPeer>,
438            Receiver<crate::error::Result<DataOwnedReader>>,
439        ) = self
440            .inner_call(|inner| async move {
441                if let Some(peer) = inner.get().peer.clone() {
442                    let (tx, rx): (
443                        Sender<crate::error::Result<DataOwnedReader>>,
444                        Receiver<crate::error::Result<DataOwnedReader>>,
445                    ) = oneshot();
446                    if inner.get_mut().result_dict.contains_key(&serial) {
447                        return Err(crate::error::Error::SerialHave);
448                    }
449                    if inner.get_mut().result_dict.insert(serial, tx).is_none() {
450                        inner
451                            .get_mut()
452                            .request_queue
453                            .push_front((serial, Instant::now()));
454                    }
455                    Ok((peer, rx))
456                } else {
457                    Err(crate::error::Error::TokenDisconnect(inner.get().session_id))
458                }
459            })
460            .await?;
461        peer.send_all(buff.into_inner()).await?;
462        match rx.await {
463            Err(_) => Err(crate::error::Error::SerialClose(serial)),
464            Ok(data) => Ok(RetResult::from(data?)?),
465        }
466    }
467
468    #[inline]
469    async fn run(&self, buff: Data) -> crate::error::Result<()> {
470        let peer = self
471            .inner_call(|inner| async move {
472                if let Some(peer) = inner.get().peer.clone() {
473                    Ok(peer)
474                } else {
475                    Err(crate::error::Error::TokenDisconnect(inner.get().session_id))
476                }
477            })
478            .await?;
479        peer.send_all(buff.into_inner()).await?;
480        Ok(())
481    }
482
483    #[inline]
484    async fn is_disconnect(&self) -> bool {
485        self.inner_call(|inner| async move {
486            if let Some(ref peer) = inner.get().peer {
487                #[cfg(all(feature = "tcpserver", not(feature = "tcp-channel-server")))]
488                if let Ok(r) = peer.is_disconnect().await {
489                    return r;
490                }
491
492                #[cfg(feature = "tcp-channel-server")]
493                return peer.is_disconnect();
494            }
495            true
496        })
497        .await
498    }
499}
500
501/// Macro to call a peer with a command and arguments.
502#[macro_export]
503macro_rules! call_peer {
504    (@uint $($x:tt)*)=>(());
505    (@count $($rest:expr),*)=>(<[()]>::len(&[$(call_peer!(@uint $rest)),*]));
506    ($peer:expr=>$cmd:expr;$($args:expr), *$(,)*) => ({
507            use data_rw::Data;
508            let mut data=Data::with_capacity(128);
509            let args_count=call_peer!(@count $($args),*) as i32;
510            let serial=$peer.new_serial();
511            data.write_fixed(0u32);
512            data.write_fixed(2400u32);
513            data.write_fixed(2u8);
514            data.write_fixed($cmd);
515            data.write_fixed(serial);
516            data.write_fixed(args_count);
517            $(data.pack_serialize($args)?;)*
518            let len=data.len();
519            (&mut data[0..4]).put_u32_le(len as u32);
520            let mut ret= $peer.call(serial,data).await?.check()?;
521            ret.deserialize()?
522    });
523    (@result $peer:expr=>$cmd:expr;$($args:expr), *$(,)*) => ({
524            use data_rw::Data;
525            let mut data=Data::with_capacity(128);
526            let args_count=call_peer!(@count $($args),*) as i32;
527            let serial=$peer.new_serial();
528            data.write_fixed(0u32);
529            data.write_fixed(2400u32);
530            data.write_fixed(2u8);
531            data.write_fixed($cmd);
532            data.write_fixed(serial);
533            data.write_fixed(args_count);
534            $(data.pack_serialize($args)?;)*
535            let len=data.len();
536            (&mut data[0..4]).put_u32_le(len as u32);
537            $peer.call(serial,data).await?
538    });
539    (@run $peer:expr=>$cmd:expr;$($args:expr), *$(,)*) => ({
540            use data_rw::Data;
541            let mut data=Data::with_capacity(128);
542            let args_count=call_peer!(@count $($args),*) as i32;
543            let serial=$peer.new_serial();
544            data.write_fixed(0u32);
545            data.write_fixed(2400u32);
546            data.write_fixed(0u8);
547            data.write_fixed($cmd);
548            data.write_fixed(serial);
549            data.write_fixed(args_count);
550            $(data.pack_serialize($args)?;)*
551            let len=data.len();
552            (&mut data[0..4]).put_u32_le(len as u32);
553            $peer.run(data).await?;
554    });
555     (@run_not_err $peer:expr=>$cmd:expr;$($args:expr), *$(,)*) => ({
556            use data_rw::Data;
557            let mut data=Data::with_capacity(128);
558            let args_count=call_peer!(@count $($args),*) as i32;
559            let serial=$peer.new_serial();
560            data.write_fixed(0u32);
561            data.write_fixed(2400u32);
562            data.write_fixed(0u8);
563            data.write_fixed($cmd);
564            data.write_fixed(serial);
565            data.write_fixed(args_count);
566            $(
567              if let Err(err)=  data.pack_serialize($args){
568                 log::error!{"pack_serialize {} is error:{}",$cmd,err};
569              }
570            )*
571            let len=data.len();
572            (&mut data[0..4]).put_u32_le(len as u32);
573            if let Err(err)= $peer.run(data).await{
574                 log::warn!{"run {} is error:{}",$cmd,err}
575            }
576    });
577    (@checkrun $peer:expr=>$cmd:expr;$($args:expr), *$(,)*) => ({
578            use data_rw::Data;
579            let mut data=Data::with_capacity(128);
580            let args_count=call_peer!(@count $($args),*) as i32;
581            let serial=$peer.new_serial();
582            data.write_fixed(0u32);
583            data.write_fixed(2400u32);
584            data.write_fixed(1u8);
585            data.write_fixed($cmd);
586            data.write_fixed(serial);
587            data.write_fixed(args_count);
588            $(data.pack_serialize($args)?;)*
589            let len=data.len();
590            (&mut data[0..4]).put_u32_le(len as u32);
591            $peer.call(serial,data).await?.check()?;
592    });
593}
594
595/// Macro to create a reference to an implementation of a given interface.
596///
597/// # Arguments
598///
599/// * `$client` - The client instance.
600/// * `$interface` - The interface type.
601#[macro_export]
602macro_rules! impl_ref {
603    ($client:expr=>$interface:ty) => {
604        paste::paste! {
605            [<___impl_ $interface _call>]::new_ref(&$client)
606        }
607    };
608}