inn_network/
session.rs

1//-------------------------------------------------------------------
2// MIT License
3// Copyright (c) 2022 black-mongo
4// @author CameronYang
5// @doc
6//
7// @end
8// Created : 2022-04-14T11:19:56+08:00
9//-------------------------------------------------------------------
10
11use crate::codec::forward::ForwardCodec;
12use crate::codec::{BindStatus, DstAddress, VisitorRequest, VisitorResponse};
13use crate::messages::*;
14use crate::server::ProxyServer;
15use crate::VisitorCodec;
16use actix::prelude::*;
17use actix::{Actor, Context};
18use log::*;
19use std::io;
20use tokio::io::split;
21use tokio::io::WriteHalf;
22use tokio::net::TcpStream;
23use tokio_util::codec::FramedRead;
24pub type Framed<R, C> = actix::io::FramedWrite<R, WriteHalf<TcpStream>, C>;
25#[derive(Default)]
26pub struct VisitorSession {
27    // session unique id generated by ProxyServer
28    id: usize,
29    // Client address
30    src_address: Option<DstAddress>,
31    // Dst Address
32    dst_address: Option<DstAddress>,
33    // ProxyServer Recipient
34    server: Option<Recipient<ToProxyServer>>,
35    // Foward Recipient
36    forward: Option<Recipient<ToFoward>>,
37    // FramedWrite to visitor client
38    framed: Option<Framed<VisitorResponse, VisitorCodec>>,
39}
40#[allow(dead_code)]
41impl VisitorSession {
42    pub fn new(
43        id: usize,
44        server: Recipient<ToProxyServer>,
45        src_address: DstAddress,
46        framed: Framed<VisitorResponse, VisitorCodec>,
47    ) -> Self {
48        VisitorSession {
49            id,
50            src_address: Some(src_address),
51            forward: None,
52            server: Some(server),
53            framed: Some(framed),
54            dst_address: None,
55        }
56    }
57    fn trace(&self, msg: &str) {
58        self.log("trace", msg);
59    }
60    fn debug(&self, msg: &str) {
61        self.log("debug", msg);
62    }
63    fn error(&self, msg: &str) {
64        self.log("error", msg);
65    }
66    fn info(&self, msg: &str) {
67        self.log("info", msg);
68    }
69    fn log(&self, log: &str, msg: &str) {
70        if let (Some(src), Some(dst)) = (&self.src_address, &self.dst_address) {
71            match log {
72                "info" => info!(
73                    "id={}, {}:{} => {}:{} {}",
74                    self.id, src.addr, src.port, dst.addr, dst.port, msg
75                ),
76                "error" => error!(
77                    "id={},{}:{} => {}:{} {}",
78                    self.id, dst.addr, src.addr, src.port, dst.port, msg
79                ),
80                "trace" => trace!(
81                    "id={},{}:{} => {}:{} {}",
82                    self.id,
83                    dst.addr,
84                    src.addr,
85                    src.port,
86                    dst.port,
87                    msg
88                ),
89                "debug" => debug!(
90                    "id={},{}:{} => {}:{} {}",
91                    self.id, dst.addr, src.addr, src.port, dst.port, msg
92                ),
93                _ => {}
94            }
95        }
96    }
97    fn write(&mut self, resp: VisitorResponse) {
98        if let Some(framed) = &mut self.framed {
99            framed.write(resp);
100        }
101    }
102    fn forward(&mut self, data: ToFoward) {
103        if let Some(forward) = &mut self.forward {
104            forward.do_send(data);
105        }
106    }
107    fn do_send_server(&mut self, req: ToProxyServer) {
108        if let Some(server) = &mut self.server {
109            server.do_send(req);
110        }
111    }
112}
113impl Actor for VisitorSession {
114    type Context = Context<Self>;
115
116    fn started(&mut self, ctx: &mut Self::Context) {
117        self.do_send_server(ToProxyServer::Connect(ctx.address().recipient()));
118        self.info("Started");
119    }
120    fn stopped(&mut self, _ctx: &mut Self::Context) {
121        self.info("Stopped");
122        self.do_send_server(ToProxyServer::DisConnect(self.id));
123        self.forward(ToFoward::Stop);
124    }
125}
126impl StreamHandler<Result<VisitorRequest, io::Error>> for VisitorSession {
127    fn handle(&mut self, request: Result<VisitorRequest, io::Error>, ctx: &mut Context<Self>) {
128        match &request {
129            Ok(VisitorRequest::Greeting { proto, auth }) => {
130                let _ = proto;
131                self.write(VisitorResponse::Choice(ProxyServer::auth_choice(auth)));
132            }
133            Ok(VisitorRequest::Auth { id, pwd }) => {
134                if id == "admin" && pwd == "123456" {
135                    self.write(VisitorResponse::AuthRespSuccess);
136                } else {
137                    self.write(VisitorResponse::AuthRespError);
138                }
139            }
140            Ok(VisitorRequest::Connection { cmd, address }) => {
141                trace!("id = {}, stream handler: {:?}", self.id, request);
142                let _ = cmd;
143                self.dst_address = Some(address.clone());
144                let address = address.clone();
145                let addr = ctx.address().recipient();
146                let ip = address.addr.clone();
147                let port = address.port;
148                let t = address.t;
149                // Create remote connection
150                let _ = actix::spawn(async move {
151                    if let Ok(stream) = TcpStream::connect(format!("{}:{}", ip.clone(), port)).await
152                    {
153                        Forward::create(|ctx1| {
154                            let (r, w) = split(stream);
155                            addr.do_send(ToSession::RemoteConnected(
156                                ctx1.address().recipient(),
157                                DstAddress::new(t, &ip, port),
158                            ));
159                            Forward::add_stream(FramedRead::new(r, ForwardCodec), ctx1);
160                            Forward {
161                                visitor: addr,
162                                framed: Some(actix::io::FramedWrite::new(w, ForwardCodec, ctx1)),
163                            }
164                        });
165                    } else {
166                        addr.do_send(ToSession::RemoteConnectionRefuse);
167                    }
168                });
169            }
170            Ok(VisitorRequest::Forward(data)) => {
171                self.forward(ToFoward::Forward(data.clone()));
172            }
173            // cli message
174            Ok(VisitorRequest::Cli(cli)) => {
175                debug!("session recv cli = {:?}", cli);
176                self.do_send_server(ToProxyServer::Cli(self.id, cli.clone()));
177            }
178            e => {
179                self.error(format!("stream handle error = {:?}, Stop session", e).as_str());
180                ctx.stop();
181            }
182        }
183    }
184    fn finished(&mut self, _ctx: &mut Self::Context) {
185        trace!("finished");
186    }
187}
188impl actix::io::WriteHandler<io::Error> for VisitorSession {}
189
190impl Handler<ToSession> for VisitorSession {
191    type Result = MessageResult<ToSession>;
192    fn handle(&mut self, to: ToSession, ctx: &mut Context<Self>) -> Self::Result {
193        match to {
194            ToSession::Ping => MessageResult(SessionReply::Pong),
195            ToSession::Stop => {
196                ctx.stop();
197                MessageResult(SessionReply::Ok)
198            }
199            ToSession::Meta => MessageResult(SessionReply::Meta(SessionMeta(self.id))),
200            ToSession::Forward(data) => {
201                self.write(VisitorResponse::Forward(data));
202                MessageResult(SessionReply::Ok)
203            }
204            ToSession::RemoteConnected(recipient, bind) => {
205                self.info("RemoteConnected");
206                self.forward = Some(recipient);
207                self.write(VisitorResponse::BindResp {
208                    status: BindStatus::Granted,
209                    address: Some(bind),
210                });
211                MessageResult(SessionReply::Ok)
212            }
213            ToSession::RemoteConnectionRefuse => {
214                self.info("RemoteConnectionRefuse");
215                self.write(VisitorResponse::BindResp {
216                    status: BindStatus::ConnectionRefuse,
217                    address: None,
218                });
219                ctx.stop();
220                MessageResult(SessionReply::Ok)
221            }
222            ToSession::ProxyServerReply(ProxyServerReply::Id(id)) => {
223                self.id = id;
224                MessageResult(SessionReply::Ok)
225            }
226            ToSession::ProxyServerReply(ProxyServerReply::OnlineCounter(_n)) => {
227                MessageResult(SessionReply::Ok)
228            }
229            ToSession::ProxyServerReply(ProxyServerReply::Cli(cli)) => {
230                debug!("session reply cli = {:?}", cli);
231                self.write(VisitorResponse::Cli(cli));
232                MessageResult(SessionReply::Ok)
233            }
234            _ => MessageResult(SessionReply::Ok),
235        }
236    }
237}
238
239// Forward
240pub struct Forward {
241    visitor: Recipient<ToSession>,
242    framed: Option<Framed<Vec<u8>, ForwardCodec>>,
243}
244impl Actor for Forward {
245    type Context = Context<Self>;
246    fn started(&mut self, _ctx: &mut Context<Self>) {
247        trace!("forward started!");
248    }
249    fn stopped(&mut self, _ctx: &mut Context<Self>) {
250        trace!("forward stopped");
251        self.visitor.do_send(ToSession::Stop);
252    }
253}
254impl Forward {
255    #[allow(clippy::single_match)]
256    pub fn write(&mut self, data: Vec<u8>) {
257        match &mut self.framed {
258            Some(framed) => framed.write(data),
259            _ => {}
260        }
261    }
262    pub fn visitor(&mut self, data: Vec<u8>) {
263        self.visitor.do_send(ToSession::Forward(data));
264    }
265}
266impl Handler<ToFoward> for Forward {
267    type Result = MessageResult<ToFoward>;
268    fn handle(&mut self, to: ToFoward, ctx: &mut Context<Self>) -> Self::Result {
269        match to {
270            ToFoward::Forward(data) => self.write(data),
271            ToFoward::Stop => ctx.stop(),
272        }
273        MessageResult(ForwardReply::Ok)
274    }
275}
276impl actix::io::WriteHandler<io::Error> for Forward {}
277impl StreamHandler<Result<Vec<u8>, io::Error>> for Forward {
278    fn handle(&mut self, resp: Result<Vec<u8>, io::Error>, _ctx: &mut Context<Self>) {
279        // trace!("Forward handle receive = {:?}", resp);
280        if let Ok(data) = resp {
281            self.visitor(data);
282        }
283    }
284}