use crate::codec::forward::ForwardCodec;
use crate::codec::{BindStatus, DstAddress, VisitorRequest, VisitorResponse};
use crate::messages::*;
use crate::server::ProxyServer;
use crate::VisitorCodec;
use actix::prelude::*;
use actix::{Actor, Context};
use log::*;
use std::io;
use tokio::io::split;
use tokio::io::WriteHalf;
use tokio::net::TcpStream;
use tokio_util::codec::FramedRead;
pub type Framed<R, C> = actix::io::FramedWrite<R, WriteHalf<TcpStream>, C>;
#[derive(Default)]
pub struct VisitorSession {
id: usize,
src_address: Option<DstAddress>,
dst_address: Option<DstAddress>,
server: Option<Recipient<ToProxyServer>>,
forward: Option<Recipient<ToFoward>>,
framed: Option<Framed<VisitorResponse, VisitorCodec>>,
}
#[allow(dead_code)]
impl VisitorSession {
pub fn new(
id: usize,
server: Recipient<ToProxyServer>,
src_address: DstAddress,
framed: Framed<VisitorResponse, VisitorCodec>,
) -> Self {
VisitorSession {
id,
src_address: Some(src_address),
forward: None,
server: Some(server),
framed: Some(framed),
dst_address: None,
}
}
fn trace(&self, msg: &str) {
self.log("trace", msg);
}
fn debug(&self, msg: &str) {
self.log("debug", msg);
}
fn error(&self, msg: &str) {
self.log("error", msg);
}
fn info(&self, msg: &str) {
self.log("info", msg);
}
fn log(&self, log: &str, msg: &str) {
if let (Some(src), Some(dst)) = (&self.src_address, &self.dst_address) {
match log {
"info" => info!(
"id={}, {}:{} => {}:{} {}",
self.id, src.addr, src.port, dst.addr, dst.port, msg
),
"error" => error!(
"id={},{}:{} => {}:{} {}",
self.id, dst.addr, src.addr, src.port, dst.port, msg
),
"trace" => trace!(
"id={},{}:{} => {}:{} {}",
self.id,
dst.addr,
src.addr,
src.port,
dst.port,
msg
),
"debug" => debug!(
"id={},{}:{} => {}:{} {}",
self.id, dst.addr, src.addr, src.port, dst.port, msg
),
_ => {}
}
}
}
fn write(&mut self, resp: VisitorResponse) {
if let Some(framed) = &mut self.framed {
framed.write(resp);
}
}
fn forward(&mut self, data: ToFoward) {
if let Some(forward) = &mut self.forward {
forward.do_send(data);
}
}
fn do_send_server(&mut self, req: ToProxyServer) {
if let Some(server) = &mut self.server {
server.do_send(req);
}
}
}
impl Actor for VisitorSession {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
self.do_send_server(ToProxyServer::Connect(ctx.address().recipient()));
self.info("Started");
}
fn stopped(&mut self, _ctx: &mut Self::Context) {
self.info("Stopped");
self.do_send_server(ToProxyServer::DisConnect(self.id));
self.forward(ToFoward::Stop);
}
}
impl StreamHandler<Result<VisitorRequest, io::Error>> for VisitorSession {
fn handle(&mut self, request: Result<VisitorRequest, io::Error>, ctx: &mut Context<Self>) {
match &request {
Ok(VisitorRequest::Greeting { proto, auth }) => {
let _ = proto;
self.write(VisitorResponse::Choice(ProxyServer::auth_choice(auth)));
}
Ok(VisitorRequest::Auth { id, pwd }) => {
if id == "admin" && pwd == "123456" {
self.write(VisitorResponse::AuthRespSuccess);
} else {
self.write(VisitorResponse::AuthRespError);
}
}
Ok(VisitorRequest::Connection { cmd, address }) => {
trace!("id = {}, stream handler: {:?}", self.id, request);
let _ = cmd;
self.dst_address = Some(address.clone());
let address = address.clone();
let addr = ctx.address().recipient();
let ip = address.addr.clone();
let port = address.port;
let t = address.t;
let _ = actix::spawn(async move {
if let Ok(stream) = TcpStream::connect(format!("{}:{}", ip.clone(), port)).await
{
Forward::create(|ctx1| {
let (r, w) = split(stream);
addr.do_send(ToSession::RemoteConnected(
ctx1.address().recipient(),
DstAddress::new(t, &ip, port),
));
Forward::add_stream(FramedRead::new(r, ForwardCodec), ctx1);
Forward {
visitor: addr,
framed: Some(actix::io::FramedWrite::new(w, ForwardCodec, ctx1)),
}
});
} else {
addr.do_send(ToSession::RemoteConnectionRefuse);
}
});
}
Ok(VisitorRequest::Forward(data)) => {
self.forward(ToFoward::Forward(data.clone()));
}
Ok(VisitorRequest::Cli(cli)) => {
debug!("session recv cli = {:?}", cli);
self.do_send_server(ToProxyServer::Cli(self.id, cli.clone()));
}
e => {
self.error(format!("stream handle error = {:?}, Stop session", e).as_str());
ctx.stop();
}
}
}
fn finished(&mut self, _ctx: &mut Self::Context) {
trace!("finished");
}
}
impl actix::io::WriteHandler<io::Error> for VisitorSession {}
impl Handler<ToSession> for VisitorSession {
type Result = MessageResult<ToSession>;
fn handle(&mut self, to: ToSession, ctx: &mut Context<Self>) -> Self::Result {
match to {
ToSession::Ping => MessageResult(SessionReply::Pong),
ToSession::Stop => {
ctx.stop();
MessageResult(SessionReply::Ok)
}
ToSession::Meta => MessageResult(SessionReply::Meta(SessionMeta(self.id))),
ToSession::Forward(data) => {
self.write(VisitorResponse::Forward(data));
MessageResult(SessionReply::Ok)
}
ToSession::RemoteConnected(recipient, bind) => {
self.info("RemoteConnected");
self.forward = Some(recipient);
self.write(VisitorResponse::BindResp {
status: BindStatus::Granted,
address: Some(bind),
});
MessageResult(SessionReply::Ok)
}
ToSession::RemoteConnectionRefuse => {
self.info("RemoteConnectionRefuse");
self.write(VisitorResponse::BindResp {
status: BindStatus::ConnectionRefuse,
address: None,
});
ctx.stop();
MessageResult(SessionReply::Ok)
}
ToSession::ProxyServerReply(ProxyServerReply::Id(id)) => {
self.id = id;
MessageResult(SessionReply::Ok)
}
ToSession::ProxyServerReply(ProxyServerReply::OnlineCounter(_n)) => {
MessageResult(SessionReply::Ok)
}
ToSession::ProxyServerReply(ProxyServerReply::Cli(cli)) => {
debug!("session reply cli = {:?}", cli);
self.write(VisitorResponse::Cli(cli));
MessageResult(SessionReply::Ok)
}
_ => MessageResult(SessionReply::Ok),
}
}
}
pub struct Forward {
visitor: Recipient<ToSession>,
framed: Option<Framed<Vec<u8>, ForwardCodec>>,
}
impl Actor for Forward {
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Context<Self>) {
trace!("forward started!");
}
fn stopped(&mut self, _ctx: &mut Context<Self>) {
trace!("forward stopped");
self.visitor.do_send(ToSession::Stop);
}
}
impl Forward {
#[allow(clippy::single_match)]
pub fn write(&mut self, data: Vec<u8>) {
match &mut self.framed {
Some(framed) => framed.write(data),
_ => {}
}
}
pub fn visitor(&mut self, data: Vec<u8>) {
self.visitor.do_send(ToSession::Forward(data));
}
}
impl Handler<ToFoward> for Forward {
type Result = MessageResult<ToFoward>;
fn handle(&mut self, to: ToFoward, ctx: &mut Context<Self>) -> Self::Result {
match to {
ToFoward::Forward(data) => self.write(data),
ToFoward::Stop => ctx.stop(),
}
MessageResult(ForwardReply::Ok)
}
}
impl actix::io::WriteHandler<io::Error> for Forward {}
impl StreamHandler<Result<Vec<u8>, io::Error>> for Forward {
fn handle(&mut self, resp: Result<Vec<u8>, io::Error>, _ctx: &mut Context<Self>) {
if let Ok(data) = resp {
self.visitor(data);
}
}
}