use std::sync::Arc;
use crate::{arg, ConfigOption, Helper, ProxyResult, WMCore};
use async_trait::async_trait;
use tokio::{
net::TcpListener,
sync::{
mpsc::{channel, Receiver, Sender},
Mutex,
},
};
use webparse::{HeaderName, Request, Response};
use wenmeng::{Body, HttpTrait, ProtResult, RecvRequest, RecvResponse, Server};
pub struct ControlServer {
option: ConfigOption,
server_sender_close: Option<Sender<()>>,
control_sender_close: Sender<()>,
control_receiver_close: Option<Receiver<()>>,
count: i32,
}
struct Operate {
control: Arc<Mutex<ControlServer>>,
}
#[async_trait]
impl HttpTrait for Operate {
async fn operate(&mut self, req: &mut RecvRequest) -> ProtResult<RecvResponse> {
let mut value = ControlServer::inner_operate(req, &mut self.control).await?;
value.headers_mut().insert("server", "wmproxy");
Ok(value)
}
}
impl ControlServer {
pub fn new(option: ConfigOption) -> Self {
let (sender, receiver) = channel::<()>(1);
Self {
option,
server_sender_close: None,
control_sender_close: sender,
control_receiver_close: Some(receiver),
count: 0,
}
}
pub async fn start_serve(mut self) -> ProxyResult<()> {
let option = self.option.clone();
self.inner_start_server(option).await?;
Self::start_control(Arc::new(Mutex::new(self))).await?;
Ok(())
}
pub async fn do_restart_serve(&mut self) -> ProxyResult<()> {
let option = arg::parse_env().await?;
Helper::try_init_log(&option);
self.inner_start_server(option).await?;
Ok(())
}
async fn inner_start_server(&mut self, option: ConfigOption) -> ProxyResult<()> {
let sender = self.control_sender_close.clone();
let (sender_no_listen, receiver_no_listen) = channel::<()>(1);
let sender_close = self.server_sender_close.take();
self.count += 1;
tokio::spawn(async move {
let mut proxy = WMCore::new(option);
if let Err(e) = proxy.start_serve(receiver_no_listen, sender_close).await {
log::info!("处理失败服务进程失败: {:?}", e);
}
let _ = sender.send(()).await;
});
self.server_sender_close = Some(sender_no_listen);
Ok(())
}
async fn inner_operate(
req: &mut Request<Body>,
data: &mut Arc<Mutex<ControlServer>>,
) -> ProtResult<Response<Body>> {
let mut value = data.lock().await;
match &**req.path() {
"/reload" => {
let _ = value.do_restart_serve().await;
return Ok(Response::text()
.body("重新加载配置成功")
.unwrap()
.into_type());
}
"/stop" => {
if let Some(sender) = &value.server_sender_close {
let _ = sender.send(()).await;
}
return Ok(Response::text().body("关闭进程成功").unwrap().into_type());
}
"/now" => {
if let Ok(data) = serde_json::to_string_pretty(&value.option) {
return Ok(Response::text()
.header(HeaderName::CONTENT_TYPE, "application/json; charset=utf-8")
.body(data)
.unwrap()
.into_type());
}
}
_ => {}
};
if req.path() == "/reload" {}
if req.path() == "/stop" {
if let Some(sender) = &value.server_sender_close {
let _ = sender.send(()).await;
}
return Ok(Response::text().body("关闭进程成功").unwrap().into_type());
}
return Ok(Response::status503()
.body("服务器内部无服务")
.unwrap()
.into_type());
}
async fn receiver_await(receiver: &mut Option<Receiver<()>>) -> Option<()> {
if receiver.is_some() {
receiver.as_mut().unwrap().recv().await
} else {
let pend = std::future::pending();
let () = pend.await;
None
}
}
pub async fn start_control(control: Arc<Mutex<ControlServer>>) -> ProxyResult<()> {
let listener = {
let value = &mut control.lock().await;
if value.option.disable_control {
let mut receiver = value.control_receiver_close.take();
let _ = Self::receiver_await(&mut receiver).await;
return Ok(());
}
log::info!("控制端口绑定:{:?},提供中控功能。", value.option.control);
match TcpListener::bind(value.option.control).await {
Ok(tcp) => tcp,
Err(_) => {
log::info!("控制端口绑定失败:{},请配置不同端口。", value.option.control);
let pending = std::future::pending();
let () = pending.await;
return Ok(());
}
}
};
loop {
let mut receiver = {
let value = &mut control.lock().await;
value.control_receiver_close.take()
};
tokio::select! {
Ok((conn, addr)) = listener.accept() => {
log::info!("控制端口请求:{:?},开始处理。", addr);
let cc = control.clone();
tokio::spawn(async move {
let mut server = Server::new(conn, Some(addr));
server.set_callback_http(Box::new(Operate {
control: cc
}));
if let Err(e) = server.incoming().await {
log::info!("控制中心:处理信息时发生错误:{:?}", e);
}
});
let value = &mut control.lock().await;
value.control_receiver_close = receiver;
}
_ = Self::receiver_await(&mut receiver) => {
let value = &mut control.lock().await;
value.count -= 1;
log::info!("反向代理:控制端收到关闭信号,当前:{}", value.count);
if value.count <= 0 {
break;
}
value.control_receiver_close = receiver;
}
}
}
Ok(())
}
}