netxserver/server/
impl_server.rs

1use anyhow::{bail, Result};
2use bytes::BufMut;
3use data_rw::Data;
4use std::sync::{Arc, Weak};
5use tokio::io::{AsyncReadExt, ReadHalf};
6
7#[cfg(all(feature = "tcpserver", not(feature = "tcp-channel-server")))]
8use tcpserver::{Builder, IPeer, ITCPServer, TCPPeer};
9
10use crate::async_token::{IAsyncToken, IAsyncTokenInner, NetxToken};
11use crate::async_token_manager::{IAsyncTokenManager, TokenManager};
12use crate::controller::ICreateController;
13use crate::owned_read_half_ex::ReadHalfExt;
14use crate::server::async_token_manager::{
15    AsyncTokenManager, IAsyncTokenManagerCreateToken, ITokenManager,
16};
17use crate::server::maybe_stream::MaybeStream;
18use crate::{RetResult, ServerOption};
19#[cfg(feature = "tcp-channel-server")]
20use tcp_channel_server::{Builder, ITCPServer, TCPPeer};
21
22cfg_if::cfg_if! {
23if #[cfg(feature = "use_openssl")]{
24   use openssl::ssl::{Ssl,SslAcceptor};
25   use tokio_openssl::SslStream;
26   use std::time::Duration;
27   use tokio::time::sleep;
28   use std::pin::Pin;
29}else if #[cfg(feature = "use_rustls")]{
30   use tokio_rustls::TlsAcceptor;
31}}
32
33/// Type alias for `NetPeer` when the `tcpserver` feature is enabled and the `tcp-channel-server` feature is not enabled.
34#[cfg(all(feature = "tcpserver", not(feature = "tcp-channel-server")))]
35pub type NetPeer = aqueue::Actor<TCPPeer<MaybeStream>>;
36
37/// Type alias for `NetPeer` when the `tcp-channel-server` feature is enabled.
38#[cfg(feature = "tcp-channel-server")]
39pub type NetPeer = TCPPeer<MaybeStream>;
40
41/// Type alias for `NetReadHalf` which is a `ReadHalf` of `MaybeStream`.
42pub type NetReadHalf = ReadHalf<MaybeStream>;
43
44/// Enum representing special function tags.
45pub(crate) enum SpecialFunctionTag {
46    Connect = 2147483647,
47    Disconnect = 2147483646,
48    Closed = 2147483645,
49}
50
51/// Inner structure of `NetXServer` containing server options and async tokens.
52struct NetXServerInner<T: ICreateController + 'static> {
53    option: ServerOption,
54    async_tokens: TokenManager<T>,
55}
56
57/// NetX Service structure.
58pub struct NetXServer<T: ICreateController + 'static> {
59    inner: Arc<NetXServerInner<T>>,
60    serv: Arc<dyn ITCPServer<Arc<NetXServerInner<T>>>>,
61}
62
63/// Implement `Send` for `NetXServer`.
64unsafe impl<T: ICreateController + 'static> Send for NetXServer<T> {}
65
66/// Implement `Sync` for `NetXServer`.
67unsafe impl<T: ICreateController + 'static> Sync for NetXServer<T> {}
68
69impl<T> NetXServer<T>
70where
71    T: ICreateController + 'static,
72{
73    cfg_if::cfg_if! {
74        if #[cfg(feature = "use_openssl")] {
75            /// Creates a new `NetXServer` instance with OpenSSL TLS encryption.
76            ///
77            /// # Arguments
78            ///
79            /// * `ssl_acceptor` - A reference to the `SslAcceptor` used for SSL/TLS connections.
80            /// * `option` - The server options.
81            /// * `impl_controller` - The controller implementation.
82            ///
83            /// # Returns
84            ///
85            /// A new instance of `NetXServer`.
86            ///
87            /// # Errors
88            ///
89            /// This function will return an error if the SSL/TLS stream initialization fails.
90            #[inline]
91            pub async fn new_ssl(
92                ssl_acceptor: &'static SslAcceptor,
93                option: ServerOption,
94                impl_controller: T,
95            ) -> NetXServer<T> {
96                let request_out_time = option.request_out_time;
97                let session_save_time = option.session_save_time;
98                let async_tokens =
99                    AsyncTokenManager::new(impl_controller, request_out_time, session_save_time);
100                let inner = Arc::new(NetXServerInner {
101                    option,
102                    async_tokens,
103                });
104                let serv = Builder::new(&inner.option.addr)
105                    .set_connect_event(|addr| {
106                        log::debug!("{} connect", addr);
107                        true
108                    })
109                    .set_stream_init(move |tcp_stream| async move {
110                        let ssl = Ssl::new(ssl_acceptor.context())?;
111                        let mut stream = SslStream::new(ssl, tcp_stream)?;
112                        sleep(Duration::from_millis(200)).await;
113                        Pin::new(&mut stream).accept().await?;
114                        Ok(MaybeStream::ServerSsl(stream))
115                    })
116                    .set_input_event(|mut reader, peer, inner| async move {
117                        let addr = peer.addr();
118                        let token = match Self::get_peer_token(&mut reader, &peer, &inner).await {
119                            Ok(token) => token,
120                            Err(er) => {
121                                log::debug!("user:{}:{},disconnect it", addr, er);
122                                return Ok(());
123                            }
124                        };
125                        token.set_peer(Some(peer)).await;
126                        let res=Self::read_buff_byline(&mut reader, &token).await;
127                        token.set_peer(None).await;
128                        token
129                            .call_special_function(SpecialFunctionTag::Disconnect as i32)
130                            .await?;
131                        inner
132                            .async_tokens
133                            .peer_disconnect(token.get_session_id())
134                            .await;
135                        res?;
136                        Ok(())
137                    })
138                    .build()
139                    .await;
140                NetXServer { inner, serv }
141            }
142        } else if #[cfg(feature = "use_rustls")] {
143            /// Creates a new `NetXServer` instance with Rustls TLS encryption.
144            ///
145            /// # Arguments
146            ///
147            /// * `acceptor` - A reference to the `TlsAcceptor` used for TLS connections.
148            /// * `option` - The server options.
149            /// * `impl_controller` - The controller implementation.
150            ///
151            /// # Returns
152            ///
153            /// A new instance of `NetXServer`.
154            ///
155            /// # Errors
156            ///
157            /// This function will return an error if the TLS stream initialization fails.
158            #[inline]
159            pub async fn new_tls(
160                acceptor:&'static TlsAcceptor,
161                option: ServerOption,
162                impl_controller: T,
163            ) -> NetXServer<T> {
164                let request_out_time = option.request_out_time;
165                let session_save_time = option.session_save_time;
166                let async_tokens =
167                    AsyncTokenManager::new(impl_controller, request_out_time, session_save_time);
168                let inner = Arc::new(NetXServerInner {
169                    option,
170                    async_tokens,
171                });
172                let serv = Builder::new(&inner.option.addr)
173                    .set_connect_event(|addr| {
174                        log::debug!("{} connect", addr);
175                        true
176                    })
177                    .set_stream_init(move |tcp_stream| async move {
178                       Ok(MaybeStream::ServerTls(acceptor.accept(tcp_stream).await?))
179                    })
180                    .set_input_event(|mut reader, peer, inner| async move {
181                        let addr = peer.addr();
182                        let token = match Self::get_peer_token(&mut reader, &peer, &inner).await {
183                            Ok(token) => token,
184                            Err(er) => {
185                                log::debug!("user:{}:{},disconnect it", addr, er);
186                                return Ok(());
187                            }
188                        };
189                        token.set_peer(Some(peer)).await;
190                        let res=Self::read_buff_byline(&mut reader,  &token).await;
191                        token.set_peer(None).await;
192                        token
193                            .call_special_function(SpecialFunctionTag::Disconnect as i32)
194                            .await?;
195                        inner
196                            .async_tokens
197                            .peer_disconnect(token.get_session_id())
198                            .await;
199                        res?;
200                        Ok(())
201                    })
202                    .build()
203                    .await;
204                NetXServer { inner, serv }
205            }
206        }
207    }
208
209    /// Creates a new `NetXServer` instance.
210    ///
211    /// # Arguments
212    ///
213    /// * `option` - The server options.
214    /// * `impl_controller` - The controller implementation.
215    ///
216    /// # Returns
217    ///
218    /// A new instance of `NetXServer`.
219    ///
220    /// # Errors
221    ///
222    /// This function will return an error if the server initialization fails.
223    #[inline]
224    pub async fn new(option: ServerOption, impl_controller: T) -> NetXServer<T> {
225        let request_out_time = option.request_out_time;
226        let session_save_time = option.session_save_time;
227        let async_tokens =
228            AsyncTokenManager::new(impl_controller, request_out_time, session_save_time);
229        let inner = Arc::new(NetXServerInner {
230            option,
231            async_tokens,
232        });
233        let serv = Builder::new(&inner.option.addr)
234            .set_connect_event(|addr| {
235                log::debug!("{} connect", addr);
236                true
237            })
238            .set_stream_init(|tcp_stream| async move { Ok(MaybeStream::Plain(tcp_stream)) })
239            .set_input_event(|mut reader, peer, inner| async move {
240                let addr = peer.addr();
241                let token = match Self::get_peer_token(&mut reader, &peer, &inner).await {
242                    Ok(token) => token,
243                    Err(er) => {
244                        log::debug!("user:{}:{},disconnect it", addr, er);
245                        return Ok(());
246                    }
247                };
248                token.set_peer(Some(peer)).await;
249                let res = Self::read_buff_byline(&mut reader, &token).await;
250                token.set_peer(None).await;
251                token
252                    .call_special_function(SpecialFunctionTag::Disconnect as i32)
253                    .await?;
254                inner
255                    .async_tokens
256                    .peer_disconnect(token.get_session_id())
257                    .await;
258                res?;
259                Ok(())
260            })
261            .build()
262            .await;
263        NetXServer { inner, serv }
264    }
265
266    /// Retrieves the peer token by reading and verifying the peer's credentials.
267    ///
268    /// # Arguments
269    ///
270    /// * `reader` - A mutable reference to the `NetReadHalf` reader.
271    /// * `peer` - An `Arc` reference to the `NetPeer`.
272    /// * `inner` - An `Arc` reference to the `NetXServerInner` containing server options and async tokens.
273    ///
274    /// # Returns
275    ///
276    /// A `Result` containing the `NetxToken` if successful, or an error if the verification fails.
277    ///
278    /// # Errors
279    ///
280    /// This function will return an error if the verification key, service name, or password is incorrect.
281    #[inline]
282    async fn get_peer_token(
283        mut reader: &mut NetReadHalf,
284        peer: &Arc<NetPeer>,
285        inner: &Arc<NetXServerInner<T>>,
286    ) -> Result<NetxToken<T::Controller>> {
287        let cmd = reader.read_i32_le().await?;
288        if cmd != 1000 {
289            Self::send_to_key_verify_msg(peer, true, "not verify key").await?;
290            bail!("not verify key")
291        }
292        let name = reader.read_string().await?;
293        if !inner.option.service_name.is_empty() && name != inner.option.service_name {
294            Self::send_to_key_verify_msg(peer, true, "service name error").await?;
295            bail!("IP:{} service name:{} error", peer.addr(), name)
296        }
297        let password = reader.read_string().await?;
298        if !inner.option.verify_key.is_empty() && password != inner.option.verify_key {
299            Self::send_to_key_verify_msg(peer, true, "service verify key error").await?;
300            bail!("IP:{} verify key:{} error", peer.addr(), name)
301        }
302        Self::send_to_key_verify_msg(peer, false, "verify success").await?;
303        let session = reader.read_i64_le().await?;
304        let token = if session == 0 {
305            inner
306                .async_tokens
307                .create_token(Arc::downgrade(&inner.async_tokens))
308                .await?
309        } else {
310            match inner.async_tokens.get_token(session).await {
311                Some(token) => token,
312                None => {
313                    inner
314                        .async_tokens
315                        .create_token(Arc::downgrade(&inner.async_tokens))
316                        .await?
317                }
318            }
319        };
320
321        Ok(token)
322    }
323
324    /// Reads data from the buffer line by line and processes it.
325    ///
326    /// # Arguments
327    ///
328    /// * `reader` - A mutable reference to the `NetReadHalf` reader.
329    /// * `token` - A reference to the `NetxToken`.
330    ///
331    /// # Returns
332    ///
333    /// A `Result` indicating success or failure.
334    #[inline]
335    async fn read_buff_byline(
336        reader: &mut NetReadHalf,
337        token: &NetxToken<T::Controller>,
338    ) -> Result<()> {
339        token
340            .call_special_function(SpecialFunctionTag::Connect as i32)
341            .await?;
342        Self::data_reading(reader, token).await?;
343        Ok(())
344    }
345
346    /// Reads data from the buffer and processes commands.
347    ///
348    /// # Arguments
349    ///
350    /// * `reader` - A mutable reference to the `NetReadHalf` reader.
351    /// * `token` - A reference to the `NetxToken`.
352    ///
353    /// # Returns
354    ///
355    /// A `Result` indicating success or failure.
356    #[inline]
357    async fn data_reading(
358        mut reader: &mut NetReadHalf,
359        token: &NetxToken<T::Controller>,
360    ) -> Result<()> {
361        while let Ok(mut dr) = reader.read_buff().await {
362            let cmd = dr.read_fixed::<i32>()?;
363            match cmd {
364                2000 => {
365                    Self::send_to_session_id(token).await?;
366                }
367                2400 => {
368                    let tt = dr.read_fixed::<u8>()?;
369                    let cmd = dr.read_fixed::<i32>()?;
370                    let serial = dr.read_fixed::<i64>()?;
371                    match tt {
372                        0 => {
373                            let run_token = token.clone();
374                            tokio::spawn(async move {
375                                let _ = run_token.execute_controller(tt, cmd, dr).await;
376                            });
377                        }
378                        1 => {
379                            let run_token = token.clone();
380                            tokio::spawn(async move {
381                                let res = run_token.execute_controller(tt, cmd, dr).await;
382                                if let Err(er) = run_token
383                                    .send(Self::get_result_buff(serial, res).into_inner())
384                                    .await
385                                {
386                                    log::error!("send buff 1 error:{}", er);
387                                }
388                            });
389                        }
390                        2 => {
391                            let run_token = token.clone();
392                            tokio::spawn(async move {
393                                let res = run_token.execute_controller(tt, cmd, dr).await;
394                                if let Err(er) = run_token
395                                    .send(Self::get_result_buff(serial, res).into_inner())
396                                    .await
397                                {
398                                    log::error!("send buff {} error:{}", serial, er);
399                                }
400                            });
401                        }
402                        _ => {
403                            log::error!("not found call type:{}", tt)
404                        }
405                    }
406                }
407                2500 => {
408                    let serial = dr.read_fixed::<i64>()?;
409                    token.set_result(serial, dr).await?;
410                }
411                _ => {
412                    log::error!("not found cmd:{}", cmd)
413                }
414            }
415        }
416        Ok(())
417    }
418
419    /// Constructs a result buffer from the given serial and result.
420    ///
421    /// # Arguments
422    ///
423    /// * `serial` - The serial number.
424    /// * `result` - The result to be included in the buffer.
425    ///
426    /// # Returns
427    ///
428    /// A `Data` object containing the serialized result.
429    #[inline]
430    fn get_result_buff(serial: i64, result: RetResult) -> Data {
431        let mut data = Data::with_capacity(1024);
432
433        data.write_fixed(0u32);
434        data.write_fixed(2500u32);
435        data.write_fixed(serial);
436
437        if result.is_error {
438            data.write_fixed(true);
439            data.write_fixed(result.error_id);
440            data.write_fixed(result.msg);
441        } else {
442            data.write_fixed(false);
443            data.write_fixed(result.arguments.len() as u32);
444            for argument in result.arguments {
445                data.write_fixed(argument.into_inner());
446            }
447        }
448
449        let len = data.len();
450        (&mut data[0..4]).put_u32_le(len as u32);
451        data
452    }
453
454    /// Sends the session ID to the client.
455    ///
456    /// # Arguments
457    ///
458    /// * `token` - A reference to the `NetxToken`.
459    ///
460    /// # Returns
461    ///
462    /// A `Result` indicating success or failure.
463    #[inline]
464    async fn send_to_session_id(token: &NetxToken<T::Controller>) -> crate::error::Result<()> {
465        let session_id = token.get_session_id();
466        let mut data = Data::new();
467        data.write_fixed(0u32);
468        data.write_fixed(2000i32);
469        data.write_fixed(session_id);
470        let len = data.len();
471        (&mut data[0..4]).put_u32_le(len as u32);
472        token.send(data.into_inner()).await
473    }
474
475    /// Sends a key verification message to the peer.
476    ///
477    /// # Arguments
478    ///
479    /// * `peer` - An `Arc` reference to the `NetPeer`.
480    /// * `is_err` - A boolean indicating if there was an error.
481    /// * `msg` - A message string.
482    ///
483    /// # Returns
484    ///
485    /// A `Result` indicating success or failure.
486    #[inline]
487    async fn send_to_key_verify_msg(
488        peer: &Arc<NetPeer>,
489        is_err: bool,
490        msg: &str,
491    ) -> crate::error::Result<()> {
492        let mut data = Data::new();
493        data.write_fixed(0u32);
494        data.write_fixed(1000i32);
495        data.write_fixed(is_err);
496        data.write_fixed(msg);
497        data.write_fixed(1u8);
498        let len = data.len();
499        (&mut data[0..4]).put_u32_le(len as u32);
500        Ok(peer.send_all(data.into_inner()).await?)
501    }
502
503    /// Retrieves the token manager as a weak reference.
504    ///
505    /// # Returns
506    ///
507    /// A `Weak` reference to the `ITokenManager`.
508    #[inline]
509    pub fn get_token_manager(&self) -> Weak<dyn ITokenManager<T::Controller>> {
510        Arc::downgrade(&self.inner.async_tokens) as Weak<dyn ITokenManager<T::Controller>>
511    }
512
513    /// Starts the server asynchronously.
514    ///
515    /// # Returns
516    ///
517    /// A `Result` containing a `JoinHandle` that resolves to a `Result`.
518    #[inline]
519    pub async fn start(&self) -> crate::error::Result<tokio::task::JoinHandle<Result<()>>> {
520        Ok(self.serv.start(self.inner.clone()).await?)
521    }
522
523    /// Starts the server and blocks until it stops.
524    ///
525    /// # Returns
526    ///
527    /// A `Result` indicating success or failure.
528    #[inline]
529    pub async fn start_block(&self) -> crate::error::Result<()> {
530        Ok(self.serv.start_block(self.inner.clone()).await?)
531    }
532}