sysmonk/routes/
websocket.rs1use crate::{constant, resources, routes, squire};
2use actix;
3use actix_web::{rt, web, Error, HttpRequest, HttpResponse};
4use actix_ws::AggregatedMessage;
5use fernet::Fernet;
6use futures::future;
7use futures::stream::StreamExt;
8use std::sync::Arc;
9use std::time::Duration;
10
11async fn send_system_resources(request: HttpRequest, mut session: actix_ws::Session) {
17 let host = request.connection_info().host().to_string();
18 let disk_stats = resources::stream::get_disk_stats();
19 loop {
20 let mut system_resources = resources::stream::system_resources();
21 system_resources.insert("disk_info".to_string(), disk_stats.clone());
22 let serialized = serde_json::to_string(&system_resources).unwrap();
23 match session.text(serialized).await {
24 Ok(_) => (),
25 Err(err) => {
26 log::info!("Connection from '{}' has been {}", host, err.to_string().to_lowercase());
27 break;
28 }
29 }
30 rt::time::sleep(Duration::from_secs(1)).await;
32 }
33}
34
35async fn receive_messages(
54 mut session: actix_ws::Session,
55 mut stream: impl futures::Stream<Item=Result<AggregatedMessage, actix_ws::ProtocolError>> + Unpin,
56) {
57 while let Some(msg) = stream.next().await {
58 match msg {
59 Ok(AggregatedMessage::Text(text)) => {
60 session.text(text).await.unwrap();
62 }
63 Ok(AggregatedMessage::Binary(bin)) => {
64 session.binary(bin).await.unwrap();
66 }
67 Ok(AggregatedMessage::Ping(msg)) => {
68 session.pong(&msg).await.unwrap();
70 }
71 _ => {}
72 }
73 }
74}
75
76async fn session_handler(session: actix_ws::Session, duration: i64) {
83 let session = session.clone();
84 actix::spawn(async move {
85 rt::time::sleep(Duration::from_secs(duration as u64)).await;
86 let _ = session.close(None).await;
87 });
88}
89
90#[route("/ws/system", method = "GET")]
104async fn echo(
105 request: HttpRequest,
106 fernet: web::Data<Arc<Fernet>>,
107 session_info: web::Data<Arc<constant::Session>>,
108 config: web::Data<Arc<squire::settings::Config>>,
109 stream: web::Payload,
110) -> Result<HttpResponse, Error> {
111 log::info!("Websocket connection initiated");
112 let auth_response = squire::authenticator::verify_token(&request, &config, &fernet, &session_info);
113 if !auth_response.ok {
114 return Ok(routes::auth::failed_auth(auth_response));
115 }
116 let (response, session, stream) = match actix_ws::handle(&request, stream) {
117 Ok(result) => result,
118 Err(_) => {
119 return Ok(HttpResponse::ServiceUnavailable().finish());
120 }
121 };
122 let stream = stream
123 .aggregate_continuations();
124 rt::spawn(async move {
125 log::warn!("Connection established");
126 let send_task = send_system_resources(request.clone(), session.clone());
127 let receive_task = receive_messages(session.clone(), stream);
128 let session_task = session_handler(session.clone(), config.session_duration);
129 future::join3(send_task, receive_task, session_task).await;
130 });
131 Ok(response)
132}