1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
use dbui_core::{RequestMessage, ResponseMessage, Result}; use dbui_service::handler::MessageHandler; use dbui_service::AppConfig; use actix::{Actor, StreamHandler}; use actix_session::Session; use actix_web::{web, Error, HttpRequest, HttpResponse}; use actix_web_actors::ws; struct DbuiWebSocket { handler: MessageHandler } impl Actor for DbuiWebSocket { type Context = ws::WebsocketContext<Self>; fn started(&mut self, wsc: &mut Self::Context) { for msg in self.handler.on_open() { match self.send(msg, wsc) { Ok(_) => (), Err(e) => slog::warn!(self.handler.log(), "Unable to send initial open messages: {:?}", e) } } } } impl DbuiWebSocket { fn handler(&self) -> &MessageHandler { &self.handler } fn handle_text(&self, s: String, wsc: &mut ws::WebsocketContext<Self>) -> Result<()> { let req = RequestMessage::from_json(&s)?; self.handle_message(req, wsc) } fn handle_message(&self, req: RequestMessage, wsc: &mut ws::WebsocketContext<Self>) -> Result<()> { for msg in self.handler.on_message(req)? { self.send(msg, wsc)?; } Ok(()) } fn send(&self, rsp: ResponseMessage, wsc: &mut ws::WebsocketContext<Self>) -> Result<()> { wsc.text(rsp.to_json()?); Ok(()) } } impl StreamHandler<ws::Message, ws::ProtocolError> for DbuiWebSocket { fn handle(&mut self, msg: ws::Message, wsc: &mut Self::Context) { match msg { ws::Message::Ping(msg) => wsc.pong(&msg), ws::Message::Text(text) => match &self.handle_text(text, wsc) { Ok(_) => (), Err(e) => { slog::warn!(&self.handler().log(), "Error handling message: {:?}", e); let msg = ResponseMessage::ServerError { reason: format!("{}", e), content: "Error handling message".into() }; match self.send(msg, wsc) { Ok(_) => (), Err(e) => slog::warn!(&self.handler().log(), "Error sending server error message: {:?}", e) } } }, ws::Message::Binary(bin) => wsc.binary(bin), _ => () } } } pub fn connect( session: Session, cfg: web::Data<AppConfig>, project: web::Path<String>, req: HttpRequest, stream: web::Payload ) -> std::result::Result<HttpResponse, Error> { let ctx = crate::req_context(&session, &cfg, &req, "connect"); let projects = ctx.app().projects(); let p = projects.read().expect("Can't lock project service!").get_project(&project); match p { Ok(p) => { let handler = MessageHandler::new(p, ctx); let socket = DbuiWebSocket { handler }; ws::start(socket, &req, stream) } Err(e) => { slog::warn!(ctx.log(), "Unable to load project [{}]: {:?}", project, e); panic!("Cant load Project") } } }