1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224
use tcpserver::{TCPPeer, Builder, ITCPServer, IPeer}; use std::sync::{Arc, Weak}; use aqueue::{Actor, AResult}; use tokio::net::tcp::OwnedReadHalf; use log::*; use tokio::task::JoinHandle; use std::error::Error; use tokio::io::AsyncReadExt; use data_rw::Data; use std::ops::Deref; use crate::{OwnedReadHalfExt, ServerOption, RetResult}; use crate::server::async_token_manager::AsyncTokenManager; use crate::async_token_manager::{IAsyncTokenManager, TokenManager}; use crate::async_token::{IAsyncToken, NetxToken}; use crate::controller::ICreateController; use bytes::Buf; enum SpecialFunctionTag{ CONNECT=2147483647, DISCONNECT=2147483646 } pub struct NetXServer<T>{ option:ServerOption, serv:Arc<dyn ITCPServer<Arc<NetXServer<T>>>>, async_tokens:TokenManager<T> } unsafe impl<T> Send for NetXServer<T>{} unsafe impl<T> Sync for NetXServer<T>{} impl<T: ICreateController +'static> NetXServer<T> { #[inline] pub async fn new(option:ServerOption,impl_controller:T)->Arc<NetXServer<T>>{ let serv= Builder::new(&option.addr).set_connect_event(|addr|{ info!("{} connect",addr); true }).set_input_event(async move|mut reader,peer,serv|{ let addr=peer.addr(); let token= match Self::get_peer_token(&mut reader, &peer, &serv).await { Ok(token)=>token, Err(er)=>{ info!("user:{}:{},disconnect it", addr, er); return } }; if let Err(er)=Self::read_buff_byline(&mut reader,&peer,&token).await{ error!("read buff err:{}",er) } if let Err(er)= token.call_special_function(SpecialFunctionTag::DISCONNECT as i32).await{ error!("call token disconnect err:{}",er) } if let Err(er)= serv.async_tokens.peer_disconnect(token.get_sessionid()).await{ error!("peer disconnect err:{}",er) } }).build().await; let request_out_time=option.request_out_time; let session_save_time=option.session_save_time; Arc::new(NetXServer{ option, serv, async_tokens:AsyncTokenManager::new(impl_controller,request_out_time,session_save_time) }) } #[inline] async fn get_peer_token(mut reader:&mut OwnedReadHalf, peer:&Arc<Actor<TCPPeer>>, serv:&Arc<NetXServer<T>>) ->Result<NetxToken,Box<dyn Error>>{ let cmd=reader.read_i32_le().await?; if cmd !=1000{ Self::send_to_key_verify_msg(&peer,true,"not verify key").await?; return Err("not verify key".into()) } let name=reader.read_string().await?; if !serv.option.service_name.is_empty() && name !=serv.option.service_name{ Self::send_to_key_verify_msg(&peer,true,"service name error").await?; return Err(format!("IP:{} service name:{} error",peer.addr(),name).into()) } let password=reader.read_string().await?; if !serv.option.verify_key.is_empty() && password!=serv.option.verify_key{ Self::send_to_key_verify_msg(&peer,true,"service verify key error").await?; return Err(format!("IP:{} verify key:{} error",peer.addr(),name).into()) } Self::send_to_key_verify_msg(&peer,false,"verify success").await?; let session=reader.read_i64_le().await?; let token= if session==0{ serv.async_tokens.create_token(Arc::downgrade(&serv.async_tokens) as Weak<dyn IAsyncTokenManager>).await? } else { match serv.async_tokens.get_token(session).await? { Some(token) => token, None => serv.async_tokens.create_token(Arc::downgrade(&serv.async_tokens) as Weak<dyn IAsyncTokenManager>).await? } }; Ok(token) } #[inline] async fn read_buff_byline(mut reader:&mut OwnedReadHalf,peer:&Arc<Actor<TCPPeer>>,token:&NetxToken)->Result<(),Box<dyn Error>>{ token.set_peer(Some(Arc::downgrade(peer))).await?; token.call_special_function(SpecialFunctionTag::CONNECT as i32).await?; Self::send_to_sessionid(peer, token.get_sessionid()).await?; Self::data_reading(&mut reader,token).await?; Ok(()) } #[inline] async fn data_reading(mut reader:&mut OwnedReadHalf,token:&NetxToken)->Result<(),Box<dyn Error>>{ while let Ok(mut data)=reader.read_buff().await{ let cmd=data.get_le::<i32>()?; match cmd { 2400 => { let tt=data.get_le::<u8>()?; let cmd=data.get_le::<i32>()?; let serial=data.get_le::<i64>()?; match tt { 0=>{ let run_token=token.clone(); tokio::spawn(async move { let _ = run_token.run_controller(tt, cmd, data).await; }); }, 1=>{ let run_token=token.clone(); tokio::spawn(async move{ let res= run_token.run_controller(tt,cmd,data).await; if let Err(er)= run_token.send(Self::get_result_buff(serial,res)).await{ error!("send buff 1 error:{}",er); } }); }, 2=>{ let run_token=token.clone(); tokio::spawn(async move{ let res= run_token.run_controller(tt,cmd,data).await; if let Err(er)= run_token.send(Self::get_result_buff(serial,res)).await{ error!("send buff 2 error:{}",er); } }); }, _=>{ error!("not found call type:{}",tt) } } }, 2500=>{ let serial=data.get_le::<i64>()?; token.set_result(serial,data).await?; }, _ => { error!("not found cmd:{}",cmd) } } } Ok(()) } #[inline] fn get_result_buff(serial:i64,result:RetResult)->Data { let mut data = Data::with_capacity(1024); data.write_to_le(&2500u32); data.write_to_le(&serial); if result.is_error { data.write_to_le(&true); data.write_to_le(&result.error_id); data.write_to_le(&result.msg); } else { data.write_to_le(&false); data.write_to_le(&(result.arguments.len() as u32)); for argument in result.arguments { data.write_to_le(&argument.bytes()); } } let len = data.len() + 4usize; let mut buff = Data::with_capacity(len); buff.write_to_le(&(len as u32)); buff.write(&data); buff } #[inline] async fn send_to_sessionid(peer:&Arc<Actor<TCPPeer>>,sessionid:i64)->AResult<usize>{ let mut data=Data::new(); data.write_to_le(&2000i32); data.write_to_le(&sessionid); data.write_to_le(&1u8); Self::sendto(peer,data).await } #[inline] async fn send_to_key_verify_msg(peer:&Arc<Actor<TCPPeer>>, is_err:bool, msg:&str) -> AResult<usize> { let mut data=Data::new(); data.write_to_le(&1000i32); data.write_to_le(&is_err); data.write_to_le(&msg); Self::sendto(peer,data).await } #[inline] async fn sendto<D:Deref<Target=[u8]>>(peer:&Arc<Actor<TCPPeer>>,buff:D)-> AResult<usize>{ let buff=&*buff; let len=buff.len()+4; let mut data=Data::with_capacity(len); data.write_to_le(&(len as u32)); data.write(buff); peer.send(data).await } #[inline] pub async fn start(self:&Arc<Self>) -> Result<JoinHandle<tokio::io::Result<()>>,Box<dyn Error>> { Ok(self.serv.start(self.clone()).await?) } #[inline] pub async fn start_block(self:&Arc<Self>)->Result<(),Box<dyn Error>>{ self.serv.start_block(self.clone()).await } }