use std::{collections::HashMap, net::SocketAddr, sync::Arc};
use tokio::{
io::{split, AsyncReadExt, AsyncWriteExt},
net::TcpStream,
sync::{
mpsc::{channel, Receiver},
RwLock,
},
};
use tokio::{
io::{AsyncRead, AsyncWrite},
sync::mpsc::Sender,
};
use tokio_rustls::TlsAcceptor;
use webparse::BinaryMut;
use webparse::Buf;
use crate::{
prot::{ProtClose, ProtFrame},
proxy::ProxyServer,
trans::{TransHttp, TransTcp},
Helper, MappingConfig, ProtCreate, ProxyConfig, ProxyResult, VirtualStream,
};
pub struct CenterServer {
option: ProxyConfig,
sender: Sender<ProtFrame>,
receiver: Option<Receiver<ProtFrame>>,
sender_work: Sender<(ProtCreate, Sender<ProtFrame>)>,
receiver_work: Option<Receiver<(ProtCreate, Sender<ProtFrame>)>>,
next_id: u32,
mappings: Arc<RwLock<Vec<MappingConfig>>>,
}
impl CenterServer {
pub fn new(option: ProxyConfig) -> Self {
let (sender, receiver) = channel::<ProtFrame>(100);
let (sender_work, receiver_work) = channel::<(ProtCreate, Sender<ProtFrame>)>(10);
Self {
option,
sender,
receiver: Some(receiver),
sender_work,
receiver_work: Some(receiver_work),
next_id: 2,
mappings: Arc::new(RwLock::new(vec![])),
}
}
pub fn sender(&self) -> Sender<ProtFrame> {
self.sender.clone()
}
pub fn sender_work(&self) -> Sender<(ProtCreate, Sender<ProtFrame>)> {
self.sender_work.clone()
}
pub fn is_close(&self) -> bool {
self.sender.is_closed()
}
pub fn calc_next_id(&mut self) -> u64 {
let id = self.next_id;
self.next_id = self.next_id.wrapping_add(2);
Helper::calc_sock_map(self.option.server_id, id)
}
pub async fn inner_serve<T>(
stream: T,
option: ProxyConfig,
sender: Sender<ProtFrame>,
mut receiver: Receiver<ProtFrame>,
mut receiver_work: Receiver<(ProtCreate, Sender<ProtFrame>)>,
mappings: Arc<RwLock<Vec<MappingConfig>>>,
) -> ProxyResult<()>
where
T: AsyncRead + AsyncWrite + Unpin,
{
let mut map = HashMap::<u64, Sender<ProtFrame>>::new();
let mut read_buf = BinaryMut::new();
let mut write_buf = BinaryMut::new();
let mut verify_succ = option.username.is_none() && option.password.is_none();
let (mut reader, mut writer) = split(stream);
let mut vec = Vec::with_capacity(4096);
vec.resize(4096, 0);
let is_closed;
let mut is_ready_shutdown = false;
loop {
let _ = tokio::select! {
biased;
r = receiver_work.recv() => {
if let Some((create, sender)) = r {
map.insert(create.sock_map(), sender);
let _ = create.encode(&mut write_buf);
}
}
r = receiver.recv() => {
if let Some(p) = r {
let _ = p.encode(&mut write_buf);
}
}
r = reader.read(&mut vec) => {
match r {
Ok(0)=>{
is_closed=true;
break;
}
Ok(n) => {
read_buf.put_slice(&vec[..n]);
}
Err(_) => {
is_closed = true;
break;
},
}
}
r = writer.write(write_buf.chunk()), if write_buf.has_remaining() => {
match r {
Ok(n) => {
write_buf.advance(n);
if !write_buf.has_remaining() {
write_buf.clear();
if is_ready_shutdown {
return Ok(())
}
}
}
Err(_) => todo!(),
}
}
};
if is_ready_shutdown {
continue;
}
loop {
match Helper::decode_frame(&mut read_buf)? {
Some(p) => {
match &p {
ProtFrame::Token(p) => {
if !verify_succ
&& p.is_check_succ(&option.username, &option.password)
{
verify_succ = true;
continue;
}
}
_ => {}
}
if !verify_succ {
ProtFrame::new_close_reason(0, "not verify so close".to_string())
.encode(&mut write_buf)?;
is_ready_shutdown = true;
break;
}
match p {
ProtFrame::Create(p) => {
let (virtual_sender, virtual_receiver) = channel::<ProtFrame>(10);
map.insert(p.sock_map(), virtual_sender);
let stream = VirtualStream::new(
p.sock_map(),
sender.clone(),
virtual_receiver,
);
let proxy_server = ProxyServer::new(
option.flag,
option.username.clone(),
option.password.clone(),
option.udp_bind.clone(),
None,
);
tokio::spawn(async move {
let _ = proxy_server.deal_proxy(stream).await;
});
}
ProtFrame::Close(_) | ProtFrame::Data(_) => {
if let Some(sender) = map.get(&p.sock_map()) {
let _ = sender.send(p).await;
}
}
ProtFrame::Mapping(p) => {
let mut guard = mappings.write().await;
*guard = p.into_mappings();
}
ProtFrame::Token(_t) => {}
}
}
None => {
break;
}
}
}
if !read_buf.has_remaining() {
read_buf.clear();
}
}
if is_closed {
for v in map {
let _ = v.1.try_send(ProtFrame::Close(ProtClose::new(v.0)));
}
}
Ok(())
}
pub async fn serve<T>(&mut self, stream: T) -> ProxyResult<()>
where
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
if self.receiver.is_none() || self.receiver_work.is_none() {
log::warn!("接收器为空,请检查是否出错");
return Ok(());
}
let option = self.option.clone();
let sender = self.sender.clone();
let receiver = self.receiver.take().unwrap();
let receiver_work = self.receiver_work.take().unwrap();
let mapping = self.mappings.clone();
tokio::spawn(async move {
let _ =
Self::inner_serve(stream, option, sender, receiver, receiver_work, mapping).await;
});
Ok(())
}
pub async fn server_new_http(
&mut self,
stream: TcpStream,
addr: SocketAddr,
) -> ProxyResult<()> {
let trans = TransHttp::new(
self.sender(),
self.sender_work(),
self.calc_next_id(),
self.mappings.clone(),
);
tokio::spawn(async move {
if let Err(e) = trans.process(stream, addr).await {
log::warn!("内网穿透:Http转发时发生错误:{:?}", e);
}
});
return Ok(());
}
pub async fn server_new_https(
&mut self,
stream: TcpStream,
addr: SocketAddr,
accept: TlsAcceptor,
) -> ProxyResult<()> {
let trans = TransHttp::new(
self.sender(),
self.sender_work(),
self.calc_next_id(),
self.mappings.clone(),
);
tokio::spawn(async move {
match accept.accept(stream).await {
Ok(tls_stream) => {
if let Err(e) = trans.process(tls_stream, addr).await {
log::warn!("内网穿透:修理Https转发时发生错误:{:?}", e);
}
}
Err(e) => {
log::warn!("内网穿透:Https握手时发生错误:{:?}", e);
}
}
});
return Ok(());
}
pub async fn server_new_tcp(&mut self, stream: TcpStream) -> ProxyResult<()> {
let trans = TransTcp::new(
self.sender(),
self.sender_work(),
self.calc_next_id(),
self.mappings.clone(),
);
tokio::spawn(async move {
if let Err(e) = trans.process(stream, "tcp").await {
log::warn!("内网穿透:转发Tcp转发时发生错误:{:?}", e);
}
});
return Ok(());
}
pub async fn server_new_prxoy(&mut self, stream: TcpStream) -> ProxyResult<()> {
let trans = TransTcp::new(
self.sender(),
self.sender_work(),
self.calc_next_id(),
self.mappings.clone(),
);
tokio::spawn(async move {
if let Err(e) = trans.process(stream, "proxy").await {
log::warn!("内网穿透:转发Proxy转发时发生错误:{:?}", e);
}
});
return Ok(());
}
}