inn_network/
ws.rs

1//-------------------------------------------------------------------
2// MIT License
3// Copyright (c) 2022 black-mongo
4// @author CameronYang
5// @doc
6//
7// @end
8// Created : 2022-06-07T10:39:36+08:00
9//-------------------------------------------------------------------
10use actix::{Actor, Addr, AsyncContext, Handler, MessageResult, StreamHandler};
11use actix_web::{web, App, Error, HttpRequest, HttpResponse, HttpServer};
12use actix_web_actors::ws;
13use log::{error, info};
14
15use crate::{
16    server::ProxyServer, MsgName, ProxyServerReply, SessionReply, ToProxyServer, ToSession, WsMsg,
17};
18
19/// Define HTTP actor
20pub struct Ws {
21    id: usize,
22    server: Addr<ProxyServer>,
23}
24
25impl Actor for Ws {
26    type Context = ws::WebsocketContext<Self>;
27    fn started(&mut self, ctx: &mut Self::Context) {
28        self.server
29            .do_send(ToProxyServer::Connect(ctx.address().recipient()));
30    }
31    fn stopped(&mut self, _ctx: &mut Self::Context) {
32        self.server.do_send(ToProxyServer::DisConnect(self.id));
33    }
34}
35impl Handler<ToSession> for Ws {
36    type Result = MessageResult<ToSession>;
37    fn handle(&mut self, to: ToSession, ctx: &mut Self::Context) -> Self::Result {
38        match to {
39            ToSession::ProxyServerReply(ProxyServerReply::Id(id)) => {
40                self.id = id;
41                MessageResult(SessionReply::Ok)
42            }
43            ToSession::ProxyServerReply(ProxyServerReply::OnlineCounter(_n)) => {
44                MessageResult(SessionReply::Ok)
45            }
46            ToSession::ProxyServerReply(ProxyServerReply::HttpReq(req)) => {
47                let ws_msg = WsMsg {
48                    msg_name: MsgName::HttpReq,
49                    msg: req,
50                };
51                let payload = serde_json::to_string(&ws_msg).unwrap();
52                ctx.text(payload);
53                MessageResult(SessionReply::Ok)
54            }
55            _ => MessageResult(SessionReply::Ok),
56        }
57    }
58}
59
60/// Handler for ws::Message message
61impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Ws {
62    fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
63        match msg {
64            Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
65            Ok(ws::Message::Text(text)) => ctx.text(text),
66            Ok(ws::Message::Binary(bin)) => ctx.binary(bin),
67            _ => (),
68        }
69    }
70}
71
72async fn index(
73    req: HttpRequest,
74    stream: web::Payload,
75    server: web::Data<Addr<ProxyServer>>,
76) -> Result<HttpResponse, Error> {
77    let resp = ws::start(
78        Ws {
79            id: 0,
80            server: server.get_ref().clone(),
81        },
82        &req,
83        stream,
84    );
85    println!("{:?}", resp);
86    resp
87}
88
89pub async fn run(ip: String, port: u16, server: Addr<ProxyServer>) -> std::io::Result<()> {
90    info!("Listening on ws://{}:{}/ws/", ip.clone(), port);
91    let rs = HttpServer::new(move || {
92        App::new()
93            .app_data(web::Data::new(server.clone()))
94            .route("/ws/", web::get().to(index))
95    })
96    .bind((ip.clone(), port))?
97    .run()
98    .await;
99    match &rs {
100        Ok(()) => {}
101        Err(e) => {
102            error!("start ws://{}:{} error:{}", ip, port, e)
103        }
104    }
105    rs
106}