inn-network 0.1.1

A network library for Inn
Documentation
//-------------------------------------------------------------------
// MIT License
// Copyright (c) 2022 black-mongo
// @author CameronYang
// @doc
//
// @end
// Created : 2022-04-14T11:19:56+08:00
//-------------------------------------------------------------------

use crate::codec::forward::ForwardCodec;
use crate::codec::{BindStatus, DstAddress, VisitorRequest, VisitorResponse};
use crate::messages::*;
use crate::server::ProxyServer;
use crate::VisitorCodec;
use actix::prelude::*;
use actix::{Actor, Context};
use log::*;
use std::io;
use tokio::io::split;
use tokio::io::WriteHalf;
use tokio::net::TcpStream;
use tokio_util::codec::FramedRead;
pub type Framed<R, C> = actix::io::FramedWrite<R, WriteHalf<TcpStream>, C>;
#[derive(Default)]
pub struct VisitorSession {
    // session unique id generated by ProxyServer
    id: usize,
    // Client address
    src_address: Option<DstAddress>,
    // Dst Address
    dst_address: Option<DstAddress>,
    // ProxyServer Recipient
    server: Option<Recipient<ToProxyServer>>,
    // Foward Recipient
    forward: Option<Recipient<ToFoward>>,
    // FramedWrite to visitor client
    framed: Option<Framed<VisitorResponse, VisitorCodec>>,
}
#[allow(dead_code)]
impl VisitorSession {
    pub fn new(
        id: usize,
        server: Recipient<ToProxyServer>,
        src_address: DstAddress,
        framed: Framed<VisitorResponse, VisitorCodec>,
    ) -> Self {
        VisitorSession {
            id,
            src_address: Some(src_address),
            forward: None,
            server: Some(server),
            framed: Some(framed),
            dst_address: None,
        }
    }
    fn trace(&self, msg: &str) {
        self.log("trace", msg);
    }
    fn debug(&self, msg: &str) {
        self.log("debug", msg);
    }
    fn error(&self, msg: &str) {
        self.log("error", msg);
    }
    fn info(&self, msg: &str) {
        self.log("info", msg);
    }
    fn log(&self, log: &str, msg: &str) {
        if let (Some(src), Some(dst)) = (&self.src_address, &self.dst_address) {
            match log {
                "info" => info!(
                    "id={}, {}:{} => {}:{} {}",
                    self.id, src.addr, src.port, dst.addr, dst.port, msg
                ),
                "error" => error!(
                    "id={},{}:{} => {}:{} {}",
                    self.id, dst.addr, src.addr, src.port, dst.port, msg
                ),
                "trace" => trace!(
                    "id={},{}:{} => {}:{} {}",
                    self.id,
                    dst.addr,
                    src.addr,
                    src.port,
                    dst.port,
                    msg
                ),
                "debug" => debug!(
                    "id={},{}:{} => {}:{} {}",
                    self.id, dst.addr, src.addr, src.port, dst.port, msg
                ),
                _ => {}
            }
        }
    }
    fn write(&mut self, resp: VisitorResponse) {
        if let Some(framed) = &mut self.framed {
            framed.write(resp);
        }
    }
    fn forward(&mut self, data: ToFoward) {
        if let Some(forward) = &mut self.forward {
            forward.do_send(data);
        }
    }
    fn do_send_server(&mut self, req: ToProxyServer) {
        if let Some(server) = &mut self.server {
            server.do_send(req);
        }
    }
}
impl Actor for VisitorSession {
    type Context = Context<Self>;

    fn started(&mut self, ctx: &mut Self::Context) {
        self.do_send_server(ToProxyServer::Connect(ctx.address().recipient()));
        self.info("Started");
    }
    fn stopped(&mut self, _ctx: &mut Self::Context) {
        self.info("Stopped");
        self.do_send_server(ToProxyServer::DisConnect(self.id));
        self.forward(ToFoward::Stop);
    }
}
impl StreamHandler<Result<VisitorRequest, io::Error>> for VisitorSession {
    fn handle(&mut self, request: Result<VisitorRequest, io::Error>, ctx: &mut Context<Self>) {
        match &request {
            Ok(VisitorRequest::Greeting { proto, auth }) => {
                let _ = proto;
                self.write(VisitorResponse::Choice(ProxyServer::auth_choice(auth)));
            }
            Ok(VisitorRequest::Auth { id, pwd }) => {
                if id == "admin" && pwd == "123456" {
                    self.write(VisitorResponse::AuthRespSuccess);
                } else {
                    self.write(VisitorResponse::AuthRespError);
                }
            }
            Ok(VisitorRequest::Connection { cmd, address }) => {
                trace!("id = {}, stream handler: {:?}", self.id, request);
                let _ = cmd;
                self.dst_address = Some(address.clone());
                let address = address.clone();
                let addr = ctx.address().recipient();
                let ip = address.addr.clone();
                let port = address.port;
                let t = address.t;
                // Create remote connection
                let _ = actix::spawn(async move {
                    if let Ok(stream) = TcpStream::connect(format!("{}:{}", ip.clone(), port)).await
                    {
                        Forward::create(|ctx1| {
                            let (r, w) = split(stream);
                            addr.do_send(ToSession::RemoteConnected(
                                ctx1.address().recipient(),
                                DstAddress::new(t, &ip, port),
                            ));
                            Forward::add_stream(FramedRead::new(r, ForwardCodec), ctx1);
                            Forward {
                                visitor: addr,
                                framed: Some(actix::io::FramedWrite::new(w, ForwardCodec, ctx1)),
                            }
                        });
                    } else {
                        addr.do_send(ToSession::RemoteConnectionRefuse);
                    }
                });
            }
            Ok(VisitorRequest::Forward(data)) => {
                self.forward(ToFoward::Forward(data.clone()));
            }
            // cli message
            Ok(VisitorRequest::Cli(cli)) => {
                debug!("session recv cli = {:?}", cli);
                self.do_send_server(ToProxyServer::Cli(self.id, cli.clone()));
            }
            e => {
                self.error(format!("stream handle error = {:?}, Stop session", e).as_str());
                ctx.stop();
            }
        }
    }
    fn finished(&mut self, _ctx: &mut Self::Context) {
        trace!("finished");
    }
}
impl actix::io::WriteHandler<io::Error> for VisitorSession {}

impl Handler<ToSession> for VisitorSession {
    type Result = MessageResult<ToSession>;
    fn handle(&mut self, to: ToSession, ctx: &mut Context<Self>) -> Self::Result {
        match to {
            ToSession::Ping => MessageResult(SessionReply::Pong),
            ToSession::Stop => {
                ctx.stop();
                MessageResult(SessionReply::Ok)
            }
            ToSession::Meta => MessageResult(SessionReply::Meta(SessionMeta(self.id))),
            ToSession::Forward(data) => {
                self.write(VisitorResponse::Forward(data));
                MessageResult(SessionReply::Ok)
            }
            ToSession::RemoteConnected(recipient, bind) => {
                self.info("RemoteConnected");
                self.forward = Some(recipient);
                self.write(VisitorResponse::BindResp {
                    status: BindStatus::Granted,
                    address: Some(bind),
                });
                MessageResult(SessionReply::Ok)
            }
            ToSession::RemoteConnectionRefuse => {
                self.info("RemoteConnectionRefuse");
                self.write(VisitorResponse::BindResp {
                    status: BindStatus::ConnectionRefuse,
                    address: None,
                });
                ctx.stop();
                MessageResult(SessionReply::Ok)
            }
            ToSession::ProxyServerReply(ProxyServerReply::Id(id)) => {
                self.id = id;
                MessageResult(SessionReply::Ok)
            }
            ToSession::ProxyServerReply(ProxyServerReply::OnlineCounter(_n)) => {
                MessageResult(SessionReply::Ok)
            }
            ToSession::ProxyServerReply(ProxyServerReply::Cli(cli)) => {
                debug!("session reply cli = {:?}", cli);
                self.write(VisitorResponse::Cli(cli));
                MessageResult(SessionReply::Ok)
            }
            _ => MessageResult(SessionReply::Ok),
        }
    }
}

// Forward
pub struct Forward {
    visitor: Recipient<ToSession>,
    framed: Option<Framed<Vec<u8>, ForwardCodec>>,
}
impl Actor for Forward {
    type Context = Context<Self>;
    fn started(&mut self, _ctx: &mut Context<Self>) {
        trace!("forward started!");
    }
    fn stopped(&mut self, _ctx: &mut Context<Self>) {
        trace!("forward stopped");
        self.visitor.do_send(ToSession::Stop);
    }
}
impl Forward {
    #[allow(clippy::single_match)]
    pub fn write(&mut self, data: Vec<u8>) {
        match &mut self.framed {
            Some(framed) => framed.write(data),
            _ => {}
        }
    }
    pub fn visitor(&mut self, data: Vec<u8>) {
        self.visitor.do_send(ToSession::Forward(data));
    }
}
impl Handler<ToFoward> for Forward {
    type Result = MessageResult<ToFoward>;
    fn handle(&mut self, to: ToFoward, ctx: &mut Context<Self>) -> Self::Result {
        match to {
            ToFoward::Forward(data) => self.write(data),
            ToFoward::Stop => ctx.stop(),
        }
        MessageResult(ForwardReply::Ok)
    }
}
impl actix::io::WriteHandler<io::Error> for Forward {}
impl StreamHandler<Result<Vec<u8>, io::Error>> for Forward {
    fn handle(&mut self, resp: Result<Vec<u8>, io::Error>, _ctx: &mut Context<Self>) {
        // trace!("Forward handle receive = {:?}", resp);
        if let Ok(data) = resp {
            self.visitor(data);
        }
    }
}