ping_viewer_next/server/protocols/v1/
websocket.rs1use actix::{
2 dev::ContextFutureSpawner, fut, Actor, ActorFutureExt, Addr, AsyncContext, Handler, Message,
3 StreamHandler, WrapFuture,
4};
5use actix_web::HttpRequest;
6use actix_web_actors::ws;
7use lazy_static::lazy_static;
8use paperclip::actix::{
9 api_v2_operation, get,
10 web::{self, HttpResponse},
11 Apiv2Schema,
12};
13use regex::Regex;
14use serde::{Deserialize, Serialize};
15use serde_json::{json, Value};
16use std::sync::{Arc, Mutex};
17use tracing::info;
18use uuid::Uuid;
19
20use crate::device::manager::ManagerActorHandler;
21
22pub struct StringMessage(String);
23
24impl Message for StringMessage {
25 type Result = ();
26}
27
28#[derive(Serialize, Debug)]
29pub struct WebsocketError {
30 pub error: String,
31}
32
33#[derive(Debug)]
34pub struct WebsocketActorContent {
35 pub actor: Addr<WebsocketActor>,
36 pub re: Option<Regex>,
37 pub device_number: Option<Uuid>,
38}
39
40#[derive(Debug, Default)]
41pub struct WebsocketManager {
42 pub clients: Vec<WebsocketActorContent>,
43}
44
45impl WebsocketManager {
46 pub fn send(&self, value: &serde_json::Value, name: &str, device_number: Option<Uuid>) {
47 if self.clients.is_empty() {
48 return;
49 }
50
51 let string = serde_json::to_string(value).unwrap();
52 for client in &self.clients {
53 if client.device_number.is_none() || client.device_number == device_number {
55 let is_match = client.re.as_ref().map_or(false, |regx| regx.is_match(name));
56 if is_match {
57 client.actor.do_send(StringMessage(string.clone()));
58 }
59 }
60 }
61 }
62}
63
64lazy_static! {
65 pub static ref MANAGER: Arc<Mutex<WebsocketManager>> =
66 Arc::new(Mutex::new(WebsocketManager::default()));
67}
68
69pub fn send_to_websockets(message: Value, device: Option<Uuid>) {
70 MANAGER
71 .lock()
72 .unwrap()
73 .send(&message, &message.to_string(), device);
74}
75
76pub struct WebsocketActor {
77 server: Arc<Mutex<WebsocketManager>>,
78 pub filter: String,
79 pub device_number: Option<Uuid>,
80 pub manager_handler: web::Data<ManagerActorHandler>,
81}
82
83impl WebsocketActor {
84 pub fn new(
85 message_filter: String,
86 device_number: Option<Uuid>,
87 manager_handler: web::Data<ManagerActorHandler>,
88 ) -> Self {
89 Self {
90 server: MANAGER.clone(),
91 filter: message_filter,
92 device_number,
93 manager_handler,
94 }
95 }
96}
97
98impl Handler<StringMessage> for WebsocketActor {
99 type Result = ();
100
101 fn handle(&mut self, message: StringMessage, context: &mut Self::Context) {
102 context.text(message.0);
103 }
104}
105
106impl Actor for WebsocketActor {
107 type Context = ws::WebsocketContext<Self>;
108}
109
110impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WebsocketActor {
111 fn started(&mut self, ctx: &mut Self::Context) {
112 info!("ServerManager: Starting websocket client, add itself in manager.");
113 self.server
114 .lock()
115 .unwrap()
116 .clients
117 .push(WebsocketActorContent {
118 actor: ctx.address(),
119 re: Regex::new(&self.filter).ok(),
120 device_number: (self.device_number),
121 });
122 }
123
124 fn finished(&mut self, ctx: &mut Self::Context) {
125 info!("ServerManager: Finishing websocket, remove itself from manager.");
126 self.server
127 .lock()
128 .unwrap()
129 .clients
130 .retain(|x| x.actor != ctx.address());
131 }
132
133 fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
134 match msg {
135 Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
136 Ok(ws::Message::Text(text)) => {
137 let manager_requests: Vec<crate::ModuleType> = match serde_json::from_str(&text) {
138 Ok(requests) => requests,
139 Err(err) => match serde_json::from_str(&text) {
140 Ok(request) => vec![request],
141 Err(_) => {
142 let error_msg = format!("Error: {}", err);
143 ctx.text(error_msg);
144 return;
145 }
146 },
147 };
148
149 for request in manager_requests {
150 match request {
151 crate::ModuleType::DeviceManager(request) => {
152 let manager_handler = self.manager_handler.clone();
153
154 let future =
155 async move { manager_handler.send(request).await }.into_actor(self);
156
157 future
158 .then(|res, _, ctx| {
159 match &res {
160 Ok(result) => {
161 crate::server::protocols::v1::websocket::send_to_websockets(
162 json!(result),
163 None,
164 );
165 }
166 Err(err) => {
167 ctx.text(serde_json::to_string_pretty(err).unwrap());
168 }
169 }
170 fut::ready(())
171 })
172 .wait(ctx);
173 }
174 }
175 }
176 }
177 Ok(ws::Message::Close(msg)) => ctx.close(msg),
178 _ => (),
179 }
180 }
181}
182
183#[api_v2_operation(skip)]
184#[get("ws")]
185pub async fn websocket(
186 req: HttpRequest,
187 query: web::Query<WebsocketQuery>,
188 stream: web::Payload,
189 manager_handler: web::Data<ManagerActorHandler>,
190) -> Result<HttpResponse, actix_web::Error> {
191 let query_inner = query.into_inner();
192
193 let filter = match query_inner.filter {
194 Some(filter) => filter,
195 _ => ".*".to_owned(),
196 };
197 let device_number = query_inner.device_number;
198
199 if let Some(device_number) = device_number {
200 let request = crate::device::manager::Request::Info(crate::device::manager::UuidWrapper {
201 uuid: device_number,
202 });
203 match manager_handler.send(request).await {
204 Ok(response) => {
205 info!(
206 "ServerManager: Received websocket request connection for device: {response:?}"
207 );
208 }
209 Err(err) => {
210 return Ok(HttpResponse::InternalServerError().json(json!(err)));
211 }
212 }
213 }
214
215 ws::start(
216 WebsocketActor::new(filter, device_number, manager_handler.clone()),
217 &req,
218 stream,
219 )
220}
221
222#[derive(Deserialize, Apiv2Schema, Clone)]
223pub struct WebsocketQuery {
224 filter: Option<String>,
226 device_number: Option<Uuid>,
227}