1use crate::codec::forward::ForwardCodec;
12use crate::codec::{BindStatus, DstAddress, VisitorRequest, VisitorResponse};
13use crate::messages::*;
14use crate::server::ProxyServer;
15use crate::VisitorCodec;
16use actix::prelude::*;
17use actix::{Actor, Context};
18use log::*;
19use std::io;
20use tokio::io::split;
21use tokio::io::WriteHalf;
22use tokio::net::TcpStream;
23use tokio_util::codec::FramedRead;
24pub type Framed<R, C> = actix::io::FramedWrite<R, WriteHalf<TcpStream>, C>;
25#[derive(Default)]
26pub struct VisitorSession {
27 id: usize,
29 src_address: Option<DstAddress>,
31 dst_address: Option<DstAddress>,
33 server: Option<Recipient<ToProxyServer>>,
35 forward: Option<Recipient<ToFoward>>,
37 framed: Option<Framed<VisitorResponse, VisitorCodec>>,
39}
40#[allow(dead_code)]
41impl VisitorSession {
42 pub fn new(
43 id: usize,
44 server: Recipient<ToProxyServer>,
45 src_address: DstAddress,
46 framed: Framed<VisitorResponse, VisitorCodec>,
47 ) -> Self {
48 VisitorSession {
49 id,
50 src_address: Some(src_address),
51 forward: None,
52 server: Some(server),
53 framed: Some(framed),
54 dst_address: None,
55 }
56 }
57 fn trace(&self, msg: &str) {
58 self.log("trace", msg);
59 }
60 fn debug(&self, msg: &str) {
61 self.log("debug", msg);
62 }
63 fn error(&self, msg: &str) {
64 self.log("error", msg);
65 }
66 fn info(&self, msg: &str) {
67 self.log("info", msg);
68 }
69 fn log(&self, log: &str, msg: &str) {
70 if let (Some(src), Some(dst)) = (&self.src_address, &self.dst_address) {
71 match log {
72 "info" => info!(
73 "id={}, {}:{} => {}:{} {}",
74 self.id, src.addr, src.port, dst.addr, dst.port, msg
75 ),
76 "error" => error!(
77 "id={},{}:{} => {}:{} {}",
78 self.id, dst.addr, src.addr, src.port, dst.port, msg
79 ),
80 "trace" => trace!(
81 "id={},{}:{} => {}:{} {}",
82 self.id,
83 dst.addr,
84 src.addr,
85 src.port,
86 dst.port,
87 msg
88 ),
89 "debug" => debug!(
90 "id={},{}:{} => {}:{} {}",
91 self.id, dst.addr, src.addr, src.port, dst.port, msg
92 ),
93 _ => {}
94 }
95 }
96 }
97 fn write(&mut self, resp: VisitorResponse) {
98 if let Some(framed) = &mut self.framed {
99 framed.write(resp);
100 }
101 }
102 fn forward(&mut self, data: ToFoward) {
103 if let Some(forward) = &mut self.forward {
104 forward.do_send(data);
105 }
106 }
107 fn do_send_server(&mut self, req: ToProxyServer) {
108 if let Some(server) = &mut self.server {
109 server.do_send(req);
110 }
111 }
112}
113impl Actor for VisitorSession {
114 type Context = Context<Self>;
115
116 fn started(&mut self, ctx: &mut Self::Context) {
117 self.do_send_server(ToProxyServer::Connect(ctx.address().recipient()));
118 self.info("Started");
119 }
120 fn stopped(&mut self, _ctx: &mut Self::Context) {
121 self.info("Stopped");
122 self.do_send_server(ToProxyServer::DisConnect(self.id));
123 self.forward(ToFoward::Stop);
124 }
125}
126impl StreamHandler<Result<VisitorRequest, io::Error>> for VisitorSession {
127 fn handle(&mut self, request: Result<VisitorRequest, io::Error>, ctx: &mut Context<Self>) {
128 match &request {
129 Ok(VisitorRequest::Greeting { proto, auth }) => {
130 let _ = proto;
131 self.write(VisitorResponse::Choice(ProxyServer::auth_choice(auth)));
132 }
133 Ok(VisitorRequest::Auth { id, pwd }) => {
134 if id == "admin" && pwd == "123456" {
135 self.write(VisitorResponse::AuthRespSuccess);
136 } else {
137 self.write(VisitorResponse::AuthRespError);
138 }
139 }
140 Ok(VisitorRequest::Connection { cmd, address }) => {
141 trace!("id = {}, stream handler: {:?}", self.id, request);
142 let _ = cmd;
143 self.dst_address = Some(address.clone());
144 let address = address.clone();
145 let addr = ctx.address().recipient();
146 let ip = address.addr.clone();
147 let port = address.port;
148 let t = address.t;
149 let _ = actix::spawn(async move {
151 if let Ok(stream) = TcpStream::connect(format!("{}:{}", ip.clone(), port)).await
152 {
153 Forward::create(|ctx1| {
154 let (r, w) = split(stream);
155 addr.do_send(ToSession::RemoteConnected(
156 ctx1.address().recipient(),
157 DstAddress::new(t, &ip, port),
158 ));
159 Forward::add_stream(FramedRead::new(r, ForwardCodec), ctx1);
160 Forward {
161 visitor: addr,
162 framed: Some(actix::io::FramedWrite::new(w, ForwardCodec, ctx1)),
163 }
164 });
165 } else {
166 addr.do_send(ToSession::RemoteConnectionRefuse);
167 }
168 });
169 }
170 Ok(VisitorRequest::Forward(data)) => {
171 self.forward(ToFoward::Forward(data.clone()));
172 }
173 Ok(VisitorRequest::Cli(cli)) => {
175 debug!("session recv cli = {:?}", cli);
176 self.do_send_server(ToProxyServer::Cli(self.id, cli.clone()));
177 }
178 e => {
179 self.error(format!("stream handle error = {:?}, Stop session", e).as_str());
180 ctx.stop();
181 }
182 }
183 }
184 fn finished(&mut self, _ctx: &mut Self::Context) {
185 trace!("finished");
186 }
187}
188impl actix::io::WriteHandler<io::Error> for VisitorSession {}
189
190impl Handler<ToSession> for VisitorSession {
191 type Result = MessageResult<ToSession>;
192 fn handle(&mut self, to: ToSession, ctx: &mut Context<Self>) -> Self::Result {
193 match to {
194 ToSession::Ping => MessageResult(SessionReply::Pong),
195 ToSession::Stop => {
196 ctx.stop();
197 MessageResult(SessionReply::Ok)
198 }
199 ToSession::Meta => MessageResult(SessionReply::Meta(SessionMeta(self.id))),
200 ToSession::Forward(data) => {
201 self.write(VisitorResponse::Forward(data));
202 MessageResult(SessionReply::Ok)
203 }
204 ToSession::RemoteConnected(recipient, bind) => {
205 self.info("RemoteConnected");
206 self.forward = Some(recipient);
207 self.write(VisitorResponse::BindResp {
208 status: BindStatus::Granted,
209 address: Some(bind),
210 });
211 MessageResult(SessionReply::Ok)
212 }
213 ToSession::RemoteConnectionRefuse => {
214 self.info("RemoteConnectionRefuse");
215 self.write(VisitorResponse::BindResp {
216 status: BindStatus::ConnectionRefuse,
217 address: None,
218 });
219 ctx.stop();
220 MessageResult(SessionReply::Ok)
221 }
222 ToSession::ProxyServerReply(ProxyServerReply::Id(id)) => {
223 self.id = id;
224 MessageResult(SessionReply::Ok)
225 }
226 ToSession::ProxyServerReply(ProxyServerReply::OnlineCounter(_n)) => {
227 MessageResult(SessionReply::Ok)
228 }
229 ToSession::ProxyServerReply(ProxyServerReply::Cli(cli)) => {
230 debug!("session reply cli = {:?}", cli);
231 self.write(VisitorResponse::Cli(cli));
232 MessageResult(SessionReply::Ok)
233 }
234 _ => MessageResult(SessionReply::Ok),
235 }
236 }
237}
238
239pub struct Forward {
241 visitor: Recipient<ToSession>,
242 framed: Option<Framed<Vec<u8>, ForwardCodec>>,
243}
244impl Actor for Forward {
245 type Context = Context<Self>;
246 fn started(&mut self, _ctx: &mut Context<Self>) {
247 trace!("forward started!");
248 }
249 fn stopped(&mut self, _ctx: &mut Context<Self>) {
250 trace!("forward stopped");
251 self.visitor.do_send(ToSession::Stop);
252 }
253}
254impl Forward {
255 #[allow(clippy::single_match)]
256 pub fn write(&mut self, data: Vec<u8>) {
257 match &mut self.framed {
258 Some(framed) => framed.write(data),
259 _ => {}
260 }
261 }
262 pub fn visitor(&mut self, data: Vec<u8>) {
263 self.visitor.do_send(ToSession::Forward(data));
264 }
265}
266impl Handler<ToFoward> for Forward {
267 type Result = MessageResult<ToFoward>;
268 fn handle(&mut self, to: ToFoward, ctx: &mut Context<Self>) -> Self::Result {
269 match to {
270 ToFoward::Forward(data) => self.write(data),
271 ToFoward::Stop => ctx.stop(),
272 }
273 MessageResult(ForwardReply::Ok)
274 }
275}
276impl actix::io::WriteHandler<io::Error> for Forward {}
277impl StreamHandler<Result<Vec<u8>, io::Error>> for Forward {
278 fn handle(&mut self, resp: Result<Vec<u8>, io::Error>, _ctx: &mut Context<Self>) {
279 if let Ok(data) = resp {
281 self.visitor(data);
282 }
283 }
284}