ping_viewer_next/server/protocols/v1/
websocket.rs

1use actix::{
2    dev::ContextFutureSpawner, fut, Actor, ActorFutureExt, Addr, AsyncContext, Handler, Message,
3    StreamHandler, WrapFuture,
4};
5use actix_web::HttpRequest;
6use actix_web_actors::ws;
7use lazy_static::lazy_static;
8use paperclip::actix::{
9    api_v2_operation, get,
10    web::{self, HttpResponse},
11    Apiv2Schema,
12};
13use regex::Regex;
14use serde::{Deserialize, Serialize};
15use serde_json::{json, Value};
16use std::sync::{Arc, Mutex};
17use tracing::info;
18use uuid::Uuid;
19
20use crate::device::manager::ManagerActorHandler;
21
22pub struct StringMessage(String);
23
24impl Message for StringMessage {
25    type Result = ();
26}
27
28#[derive(Serialize, Debug)]
29pub struct WebsocketError {
30    pub error: String,
31}
32
33#[derive(Debug)]
34pub struct WebsocketActorContent {
35    pub actor: Addr<WebsocketActor>,
36    pub re: Option<Regex>,
37    pub device_number: Option<Uuid>,
38}
39
40#[derive(Debug, Default)]
41pub struct WebsocketManager {
42    pub clients: Vec<WebsocketActorContent>,
43}
44
45impl WebsocketManager {
46    pub fn send(&self, value: &serde_json::Value, name: &str, device_number: Option<Uuid>) {
47        if self.clients.is_empty() {
48            return;
49        }
50
51        let string = serde_json::to_string(value).unwrap();
52        for client in &self.clients {
53            // check client list was subscribed or subscribed to all
54            if client.device_number.is_none() || client.device_number == device_number {
55                let is_match = client.re.as_ref().map_or(false, |regx| regx.is_match(name));
56                if is_match {
57                    client.actor.do_send(StringMessage(string.clone()));
58                }
59            }
60        }
61    }
62}
63
64lazy_static! {
65    pub static ref MANAGER: Arc<Mutex<WebsocketManager>> =
66        Arc::new(Mutex::new(WebsocketManager::default()));
67}
68
69pub fn send_to_websockets(message: Value, device: Option<Uuid>) {
70    MANAGER
71        .lock()
72        .unwrap()
73        .send(&message, &message.to_string(), device);
74}
75
76pub struct WebsocketActor {
77    server: Arc<Mutex<WebsocketManager>>,
78    pub filter: String,
79    pub device_number: Option<Uuid>,
80    pub manager_handler: web::Data<ManagerActorHandler>,
81}
82
83impl WebsocketActor {
84    pub fn new(
85        message_filter: String,
86        device_number: Option<Uuid>,
87        manager_handler: web::Data<ManagerActorHandler>,
88    ) -> Self {
89        Self {
90            server: MANAGER.clone(),
91            filter: message_filter,
92            device_number,
93            manager_handler,
94        }
95    }
96}
97
98impl Handler<StringMessage> for WebsocketActor {
99    type Result = ();
100
101    fn handle(&mut self, message: StringMessage, context: &mut Self::Context) {
102        context.text(message.0);
103    }
104}
105
106impl Actor for WebsocketActor {
107    type Context = ws::WebsocketContext<Self>;
108}
109
110impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WebsocketActor {
111    fn started(&mut self, ctx: &mut Self::Context) {
112        info!("ServerManager: Starting websocket client, add itself in manager.");
113        self.server
114            .lock()
115            .unwrap()
116            .clients
117            .push(WebsocketActorContent {
118                actor: ctx.address(),
119                re: Regex::new(&self.filter).ok(),
120                device_number: (self.device_number),
121            });
122    }
123
124    fn finished(&mut self, ctx: &mut Self::Context) {
125        info!("ServerManager: Finishing websocket, remove itself from manager.");
126        self.server
127            .lock()
128            .unwrap()
129            .clients
130            .retain(|x| x.actor != ctx.address());
131    }
132
133    fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
134        match msg {
135            Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
136            Ok(ws::Message::Text(text)) => {
137                let manager_requests: Vec<crate::ModuleType> = match serde_json::from_str(&text) {
138                    Ok(requests) => requests,
139                    Err(err) => match serde_json::from_str(&text) {
140                        Ok(request) => vec![request],
141                        Err(_) => {
142                            let error_msg = format!("Error: {}", err);
143                            ctx.text(error_msg);
144                            return;
145                        }
146                    },
147                };
148
149                for request in manager_requests {
150                    match request {
151                        crate::ModuleType::DeviceManager(request) => {
152                            let manager_handler = self.manager_handler.clone();
153
154                            let future =
155                                async move { manager_handler.send(request).await }.into_actor(self);
156
157                            future
158                                .then(|res, _, ctx| {
159                                    match &res {
160                                        Ok(result) => {
161                                            crate::server::protocols::v1::websocket::send_to_websockets(
162                                                json!(result),
163                                                None,
164                                            );
165                                        }
166                                        Err(err) => {
167                                            ctx.text(serde_json::to_string_pretty(err).unwrap());
168                                        }
169                                    }
170                                    fut::ready(())
171                                })
172                                .wait(ctx);
173                        }
174                    }
175                }
176            }
177            Ok(ws::Message::Close(msg)) => ctx.close(msg),
178            _ => (),
179        }
180    }
181}
182
183#[api_v2_operation(skip)]
184#[get("ws")]
185pub async fn websocket(
186    req: HttpRequest,
187    query: web::Query<WebsocketQuery>,
188    stream: web::Payload,
189    manager_handler: web::Data<ManagerActorHandler>,
190) -> Result<HttpResponse, actix_web::Error> {
191    let query_inner = query.into_inner();
192
193    let filter = match query_inner.filter {
194        Some(filter) => filter,
195        _ => ".*".to_owned(),
196    };
197    let device_number = query_inner.device_number;
198
199    if let Some(device_number) = device_number {
200        let request = crate::device::manager::Request::Info(crate::device::manager::UuidWrapper {
201            uuid: device_number,
202        });
203        match manager_handler.send(request).await {
204            Ok(response) => {
205                info!(
206                    "ServerManager: Received websocket request connection for device: {response:?}"
207                );
208            }
209            Err(err) => {
210                return Ok(HttpResponse::InternalServerError().json(json!(err)));
211            }
212        }
213    }
214
215    ws::start(
216        WebsocketActor::new(filter, device_number, manager_handler.clone()),
217        &req,
218        stream,
219    )
220}
221
222#[derive(Deserialize, Apiv2Schema, Clone)]
223pub struct WebsocketQuery {
224    /// Regex filter to select the desired incoming messages
225    filter: Option<String>,
226    device_number: Option<Uuid>,
227}