1use actix::{Actor, Addr, AsyncContext, Handler, MessageResult, StreamHandler};
11use actix_web::{web, App, Error, HttpRequest, HttpResponse, HttpServer};
12use actix_web_actors::ws;
13use log::{error, info};
14
15use crate::{
16 server::ProxyServer, MsgName, ProxyServerReply, SessionReply, ToProxyServer, ToSession, WsMsg,
17};
18
19pub struct Ws {
21 id: usize,
22 server: Addr<ProxyServer>,
23}
24
25impl Actor for Ws {
26 type Context = ws::WebsocketContext<Self>;
27 fn started(&mut self, ctx: &mut Self::Context) {
28 self.server
29 .do_send(ToProxyServer::Connect(ctx.address().recipient()));
30 }
31 fn stopped(&mut self, _ctx: &mut Self::Context) {
32 self.server.do_send(ToProxyServer::DisConnect(self.id));
33 }
34}
35impl Handler<ToSession> for Ws {
36 type Result = MessageResult<ToSession>;
37 fn handle(&mut self, to: ToSession, ctx: &mut Self::Context) -> Self::Result {
38 match to {
39 ToSession::ProxyServerReply(ProxyServerReply::Id(id)) => {
40 self.id = id;
41 MessageResult(SessionReply::Ok)
42 }
43 ToSession::ProxyServerReply(ProxyServerReply::OnlineCounter(_n)) => {
44 MessageResult(SessionReply::Ok)
45 }
46 ToSession::ProxyServerReply(ProxyServerReply::HttpReq(req)) => {
47 let ws_msg = WsMsg {
48 msg_name: MsgName::HttpReq,
49 msg: req,
50 };
51 let payload = serde_json::to_string(&ws_msg).unwrap();
52 ctx.text(payload);
53 MessageResult(SessionReply::Ok)
54 }
55 _ => MessageResult(SessionReply::Ok),
56 }
57 }
58}
59
60impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Ws {
62 fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
63 match msg {
64 Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
65 Ok(ws::Message::Text(text)) => ctx.text(text),
66 Ok(ws::Message::Binary(bin)) => ctx.binary(bin),
67 _ => (),
68 }
69 }
70}
71
72async fn index(
73 req: HttpRequest,
74 stream: web::Payload,
75 server: web::Data<Addr<ProxyServer>>,
76) -> Result<HttpResponse, Error> {
77 let resp = ws::start(
78 Ws {
79 id: 0,
80 server: server.get_ref().clone(),
81 },
82 &req,
83 stream,
84 );
85 println!("{:?}", resp);
86 resp
87}
88
89pub async fn run(ip: String, port: u16, server: Addr<ProxyServer>) -> std::io::Result<()> {
90 info!("Listening on ws://{}:{}/ws/", ip.clone(), port);
91 let rs = HttpServer::new(move || {
92 App::new()
93 .app_data(web::Data::new(server.clone()))
94 .route("/ws/", web::get().to(index))
95 })
96 .bind((ip.clone(), port))?
97 .run()
98 .await;
99 match &rs {
100 Ok(()) => {}
101 Err(e) => {
102 error!("start ws://{}:{} error:{}", ip, port, e)
103 }
104 }
105 rs
106}