1use anyhow::{bail, Result};
2use bytes::BufMut;
3use data_rw::Data;
4use std::sync::{Arc, Weak};
5use tokio::io::{AsyncReadExt, ReadHalf};
6
7#[cfg(all(feature = "tcpserver", not(feature = "tcp-channel-server")))]
8use tcpserver::{Builder, IPeer, ITCPServer, TCPPeer};
9
10use crate::async_token::{IAsyncToken, IAsyncTokenInner, NetxToken};
11use crate::async_token_manager::{IAsyncTokenManager, TokenManager};
12use crate::controller::ICreateController;
13use crate::owned_read_half_ex::ReadHalfExt;
14use crate::server::async_token_manager::{
15 AsyncTokenManager, IAsyncTokenManagerCreateToken, ITokenManager,
16};
17use crate::server::maybe_stream::MaybeStream;
18use crate::{RetResult, ServerOption};
19#[cfg(feature = "tcp-channel-server")]
20use tcp_channel_server::{Builder, ITCPServer, TCPPeer};
21
22cfg_if::cfg_if! {
23if #[cfg(feature = "use_openssl")]{
24 use openssl::ssl::{Ssl,SslAcceptor};
25 use tokio_openssl::SslStream;
26 use std::time::Duration;
27 use tokio::time::sleep;
28 use std::pin::Pin;
29}else if #[cfg(feature = "use_rustls")]{
30 use tokio_rustls::TlsAcceptor;
31}}
32
33#[cfg(all(feature = "tcpserver", not(feature = "tcp-channel-server")))]
35pub type NetPeer = aqueue::Actor<TCPPeer<MaybeStream>>;
36
37#[cfg(feature = "tcp-channel-server")]
39pub type NetPeer = TCPPeer<MaybeStream>;
40
41pub type NetReadHalf = ReadHalf<MaybeStream>;
43
44pub(crate) enum SpecialFunctionTag {
46 Connect = 2147483647,
47 Disconnect = 2147483646,
48 Closed = 2147483645,
49}
50
51struct NetXServerInner<T: ICreateController + 'static> {
53 option: ServerOption,
54 async_tokens: TokenManager<T>,
55}
56
57pub struct NetXServer<T: ICreateController + 'static> {
59 inner: Arc<NetXServerInner<T>>,
60 serv: Arc<dyn ITCPServer<Arc<NetXServerInner<T>>>>,
61}
62
63unsafe impl<T: ICreateController + 'static> Send for NetXServer<T> {}
65
66unsafe impl<T: ICreateController + 'static> Sync for NetXServer<T> {}
68
69impl<T> NetXServer<T>
70where
71 T: ICreateController + 'static,
72{
73 cfg_if::cfg_if! {
74 if #[cfg(feature = "use_openssl")] {
75 #[inline]
91 pub async fn new_ssl(
92 ssl_acceptor: &'static SslAcceptor,
93 option: ServerOption,
94 impl_controller: T,
95 ) -> NetXServer<T> {
96 let request_out_time = option.request_out_time;
97 let session_save_time = option.session_save_time;
98 let async_tokens =
99 AsyncTokenManager::new(impl_controller, request_out_time, session_save_time);
100 let inner = Arc::new(NetXServerInner {
101 option,
102 async_tokens,
103 });
104 let serv = Builder::new(&inner.option.addr)
105 .set_connect_event(|addr| {
106 log::debug!("{} connect", addr);
107 true
108 })
109 .set_stream_init(move |tcp_stream| async move {
110 let ssl = Ssl::new(ssl_acceptor.context())?;
111 let mut stream = SslStream::new(ssl, tcp_stream)?;
112 sleep(Duration::from_millis(200)).await;
113 Pin::new(&mut stream).accept().await?;
114 Ok(MaybeStream::ServerSsl(stream))
115 })
116 .set_input_event(|mut reader, peer, inner| async move {
117 let addr = peer.addr();
118 let token = match Self::get_peer_token(&mut reader, &peer, &inner).await {
119 Ok(token) => token,
120 Err(er) => {
121 log::debug!("user:{}:{},disconnect it", addr, er);
122 return Ok(());
123 }
124 };
125 token.set_peer(Some(peer)).await;
126 let res=Self::read_buff_byline(&mut reader, &token).await;
127 token.set_peer(None).await;
128 token
129 .call_special_function(SpecialFunctionTag::Disconnect as i32)
130 .await?;
131 inner
132 .async_tokens
133 .peer_disconnect(token.get_session_id())
134 .await;
135 res?;
136 Ok(())
137 })
138 .build()
139 .await;
140 NetXServer { inner, serv }
141 }
142 } else if #[cfg(feature = "use_rustls")] {
143 #[inline]
159 pub async fn new_tls(
160 acceptor:&'static TlsAcceptor,
161 option: ServerOption,
162 impl_controller: T,
163 ) -> NetXServer<T> {
164 let request_out_time = option.request_out_time;
165 let session_save_time = option.session_save_time;
166 let async_tokens =
167 AsyncTokenManager::new(impl_controller, request_out_time, session_save_time);
168 let inner = Arc::new(NetXServerInner {
169 option,
170 async_tokens,
171 });
172 let serv = Builder::new(&inner.option.addr)
173 .set_connect_event(|addr| {
174 log::debug!("{} connect", addr);
175 true
176 })
177 .set_stream_init(move |tcp_stream| async move {
178 Ok(MaybeStream::ServerTls(acceptor.accept(tcp_stream).await?))
179 })
180 .set_input_event(|mut reader, peer, inner| async move {
181 let addr = peer.addr();
182 let token = match Self::get_peer_token(&mut reader, &peer, &inner).await {
183 Ok(token) => token,
184 Err(er) => {
185 log::debug!("user:{}:{},disconnect it", addr, er);
186 return Ok(());
187 }
188 };
189 token.set_peer(Some(peer)).await;
190 let res=Self::read_buff_byline(&mut reader, &token).await;
191 token.set_peer(None).await;
192 token
193 .call_special_function(SpecialFunctionTag::Disconnect as i32)
194 .await?;
195 inner
196 .async_tokens
197 .peer_disconnect(token.get_session_id())
198 .await;
199 res?;
200 Ok(())
201 })
202 .build()
203 .await;
204 NetXServer { inner, serv }
205 }
206 }
207 }
208
209 #[inline]
224 pub async fn new(option: ServerOption, impl_controller: T) -> NetXServer<T> {
225 let request_out_time = option.request_out_time;
226 let session_save_time = option.session_save_time;
227 let async_tokens =
228 AsyncTokenManager::new(impl_controller, request_out_time, session_save_time);
229 let inner = Arc::new(NetXServerInner {
230 option,
231 async_tokens,
232 });
233 let serv = Builder::new(&inner.option.addr)
234 .set_connect_event(|addr| {
235 log::debug!("{} connect", addr);
236 true
237 })
238 .set_stream_init(|tcp_stream| async move { Ok(MaybeStream::Plain(tcp_stream)) })
239 .set_input_event(|mut reader, peer, inner| async move {
240 let addr = peer.addr();
241 let token = match Self::get_peer_token(&mut reader, &peer, &inner).await {
242 Ok(token) => token,
243 Err(er) => {
244 log::debug!("user:{}:{},disconnect it", addr, er);
245 return Ok(());
246 }
247 };
248 token.set_peer(Some(peer)).await;
249 let res = Self::read_buff_byline(&mut reader, &token).await;
250 token.set_peer(None).await;
251 token
252 .call_special_function(SpecialFunctionTag::Disconnect as i32)
253 .await?;
254 inner
255 .async_tokens
256 .peer_disconnect(token.get_session_id())
257 .await;
258 res?;
259 Ok(())
260 })
261 .build()
262 .await;
263 NetXServer { inner, serv }
264 }
265
266 #[inline]
282 async fn get_peer_token(
283 mut reader: &mut NetReadHalf,
284 peer: &Arc<NetPeer>,
285 inner: &Arc<NetXServerInner<T>>,
286 ) -> Result<NetxToken<T::Controller>> {
287 let cmd = reader.read_i32_le().await?;
288 if cmd != 1000 {
289 Self::send_to_key_verify_msg(peer, true, "not verify key").await?;
290 bail!("not verify key")
291 }
292 let name = reader.read_string().await?;
293 if !inner.option.service_name.is_empty() && name != inner.option.service_name {
294 Self::send_to_key_verify_msg(peer, true, "service name error").await?;
295 bail!("IP:{} service name:{} error", peer.addr(), name)
296 }
297 let password = reader.read_string().await?;
298 if !inner.option.verify_key.is_empty() && password != inner.option.verify_key {
299 Self::send_to_key_verify_msg(peer, true, "service verify key error").await?;
300 bail!("IP:{} verify key:{} error", peer.addr(), name)
301 }
302 Self::send_to_key_verify_msg(peer, false, "verify success").await?;
303 let session = reader.read_i64_le().await?;
304 let token = if session == 0 {
305 inner
306 .async_tokens
307 .create_token(Arc::downgrade(&inner.async_tokens))
308 .await?
309 } else {
310 match inner.async_tokens.get_token(session).await {
311 Some(token) => token,
312 None => {
313 inner
314 .async_tokens
315 .create_token(Arc::downgrade(&inner.async_tokens))
316 .await?
317 }
318 }
319 };
320
321 Ok(token)
322 }
323
324 #[inline]
335 async fn read_buff_byline(
336 reader: &mut NetReadHalf,
337 token: &NetxToken<T::Controller>,
338 ) -> Result<()> {
339 token
340 .call_special_function(SpecialFunctionTag::Connect as i32)
341 .await?;
342 Self::data_reading(reader, token).await?;
343 Ok(())
344 }
345
346 #[inline]
357 async fn data_reading(
358 mut reader: &mut NetReadHalf,
359 token: &NetxToken<T::Controller>,
360 ) -> Result<()> {
361 while let Ok(mut dr) = reader.read_buff().await {
362 let cmd = dr.read_fixed::<i32>()?;
363 match cmd {
364 2000 => {
365 Self::send_to_session_id(token).await?;
366 }
367 2400 => {
368 let tt = dr.read_fixed::<u8>()?;
369 let cmd = dr.read_fixed::<i32>()?;
370 let serial = dr.read_fixed::<i64>()?;
371 match tt {
372 0 => {
373 let run_token = token.clone();
374 tokio::spawn(async move {
375 let _ = run_token.execute_controller(tt, cmd, dr).await;
376 });
377 }
378 1 => {
379 let run_token = token.clone();
380 tokio::spawn(async move {
381 let res = run_token.execute_controller(tt, cmd, dr).await;
382 if let Err(er) = run_token
383 .send(Self::get_result_buff(serial, res).into_inner())
384 .await
385 {
386 log::error!("send buff 1 error:{}", er);
387 }
388 });
389 }
390 2 => {
391 let run_token = token.clone();
392 tokio::spawn(async move {
393 let res = run_token.execute_controller(tt, cmd, dr).await;
394 if let Err(er) = run_token
395 .send(Self::get_result_buff(serial, res).into_inner())
396 .await
397 {
398 log::error!("send buff {} error:{}", serial, er);
399 }
400 });
401 }
402 _ => {
403 log::error!("not found call type:{}", tt)
404 }
405 }
406 }
407 2500 => {
408 let serial = dr.read_fixed::<i64>()?;
409 token.set_result(serial, dr).await?;
410 }
411 _ => {
412 log::error!("not found cmd:{}", cmd)
413 }
414 }
415 }
416 Ok(())
417 }
418
419 #[inline]
430 fn get_result_buff(serial: i64, result: RetResult) -> Data {
431 let mut data = Data::with_capacity(1024);
432
433 data.write_fixed(0u32);
434 data.write_fixed(2500u32);
435 data.write_fixed(serial);
436
437 if result.is_error {
438 data.write_fixed(true);
439 data.write_fixed(result.error_id);
440 data.write_fixed(result.msg);
441 } else {
442 data.write_fixed(false);
443 data.write_fixed(result.arguments.len() as u32);
444 for argument in result.arguments {
445 data.write_fixed(argument.into_inner());
446 }
447 }
448
449 let len = data.len();
450 (&mut data[0..4]).put_u32_le(len as u32);
451 data
452 }
453
454 #[inline]
464 async fn send_to_session_id(token: &NetxToken<T::Controller>) -> crate::error::Result<()> {
465 let session_id = token.get_session_id();
466 let mut data = Data::new();
467 data.write_fixed(0u32);
468 data.write_fixed(2000i32);
469 data.write_fixed(session_id);
470 let len = data.len();
471 (&mut data[0..4]).put_u32_le(len as u32);
472 token.send(data.into_inner()).await
473 }
474
475 #[inline]
487 async fn send_to_key_verify_msg(
488 peer: &Arc<NetPeer>,
489 is_err: bool,
490 msg: &str,
491 ) -> crate::error::Result<()> {
492 let mut data = Data::new();
493 data.write_fixed(0u32);
494 data.write_fixed(1000i32);
495 data.write_fixed(is_err);
496 data.write_fixed(msg);
497 data.write_fixed(1u8);
498 let len = data.len();
499 (&mut data[0..4]).put_u32_le(len as u32);
500 Ok(peer.send_all(data.into_inner()).await?)
501 }
502
503 #[inline]
509 pub fn get_token_manager(&self) -> Weak<dyn ITokenManager<T::Controller>> {
510 Arc::downgrade(&self.inner.async_tokens) as Weak<dyn ITokenManager<T::Controller>>
511 }
512
513 #[inline]
519 pub async fn start(&self) -> crate::error::Result<tokio::task::JoinHandle<Result<()>>> {
520 Ok(self.serv.start(self.inner.clone()).await?)
521 }
522
523 #[inline]
529 pub async fn start_block(&self) -> crate::error::Result<()> {
530 Ok(self.serv.start_block(self.inner.clone()).await?)
531 }
532}