sysmonk/routes/
websocket.rs

1use crate::{constant, resources, routes, squire};
2use actix;
3use actix_web::{rt, web, Error, HttpRequest, HttpResponse};
4use actix_ws::AggregatedMessage;
5use fernet::Fernet;
6use futures::future;
7use futures::stream::StreamExt;
8use std::sync::Arc;
9use std::time::Duration;
10
11/// Streams system resources via websocket through a loop.
12///
13/// # Arguments
14///
15/// * `request` - A reference to the Actix web `HttpRequest` object.
16async fn send_system_resources(request: HttpRequest, mut session: actix_ws::Session) {
17    let host = request.connection_info().host().to_string();
18    let disk_stats = resources::stream::get_disk_stats();
19    loop {
20        let mut system_resources = resources::stream::system_resources();
21        system_resources.insert("disk_info".to_string(), disk_stats.clone());
22        let serialized = serde_json::to_string(&system_resources).unwrap();
23        match session.text(serialized).await {
24            Ok(_) => (),
25            Err(err) => {
26                log::info!("Connection from '{}' has been {}", host, err.to_string().to_lowercase());
27                break;
28            }
29        }
30        // 500ms / 0.5s delay
31        rt::time::sleep(Duration::from_secs(1)).await;
32    }
33}
34
35/// Receives messages from the client and sends them back.
36///
37/// # Summary
38///
39/// Handles text, binary, and ping messages from the client.
40///
41/// # References
42///
43/// * [AggregatedMessage](https://docs.rs/actix-web/4.0.0-beta.8/actix_web/websocket/struct.AggregatedMessage.html)
44/// * [ProtocolError](https://docs.rs/actix-web/4.0.0-beta.8/actix_web/websocket/enum.ProtocolError.html)
45/// * [Session](https://docs.rs/actix-web/4.0.0-beta.8/actix_web/websocket/struct.Session.html)
46/// * [Stream](https://docs.rs/futures/0.3.17/futures/stream/trait.Stream.html)
47/// * [Unpin](https://doc.rust-lang.org/std/marker/trait.Unpin.html)
48///
49/// # Arguments
50///
51/// * `session` - A reference to the Actix web `Session` object.
52/// * `stream` - A stream of `AggregatedMessage` objects.
53async fn receive_messages(
54    mut session: actix_ws::Session,
55    mut stream: impl futures::Stream<Item=Result<AggregatedMessage, actix_ws::ProtocolError>> + Unpin,
56) {
57    while let Some(msg) = stream.next().await {
58        match msg {
59            Ok(AggregatedMessage::Text(text)) => {
60                // echo text message
61                session.text(text).await.unwrap();
62            }
63            Ok(AggregatedMessage::Binary(bin)) => {
64                // echo binary message
65                session.binary(bin).await.unwrap();
66            }
67            Ok(AggregatedMessage::Ping(msg)) => {
68                // respond to PING frame with PONG frame
69                session.pong(&msg).await.unwrap();
70            }
71            _ => {}
72        }
73    }
74}
75
76/// Handles the session by closing it after a certain duration.
77///
78/// # Arguments
79///
80/// * `session` - A reference to the Actix web `Session` object.
81/// * `duration` - Duration in seconds to keep the session alive.
82async fn session_handler(session: actix_ws::Session, duration: i64) {
83    let session = session.clone();
84    actix::spawn(async move {
85        rt::time::sleep(Duration::from_secs(duration as u64)).await;
86        let _ = session.close(None).await;
87    });
88}
89
90/// Handles the WebSocket endpoint for system resources.
91///
92/// # Arguments
93///
94/// * `request` - A reference to the Actix web `HttpRequest` object.
95/// * `fernet` - Fernet object to encrypt the auth payload that will be set as `session_token` cookie.
96/// * `session_info` - Session struct that holds the `session_mapping` to handle sessions.
97/// * `config` - Configuration data for the application.
98/// * `stream` - A stream of `Payload` objects.
99///
100/// # Returns
101///
102/// Returns an `HttpResponse` with the appropriate status code.
103#[route("/ws/system", method = "GET")]
104async fn echo(
105    request: HttpRequest,
106    fernet: web::Data<Arc<Fernet>>,
107    session_info: web::Data<Arc<constant::Session>>,
108    config: web::Data<Arc<squire::settings::Config>>,
109    stream: web::Payload,
110) -> Result<HttpResponse, Error> {
111    log::info!("Websocket connection initiated");
112    let auth_response = squire::authenticator::verify_token(&request, &config, &fernet, &session_info);
113    if !auth_response.ok {
114        return Ok(routes::auth::failed_auth(auth_response));
115    }
116    let (response, session, stream) = match actix_ws::handle(&request, stream) {
117        Ok(result) => result,
118        Err(_) => {
119            return Ok(HttpResponse::ServiceUnavailable().finish());
120        }
121    };
122    let stream = stream
123        .aggregate_continuations();
124    rt::spawn(async move {
125        log::warn!("Connection established");
126        let send_task = send_system_resources(request.clone(), session.clone());
127        let receive_task = receive_messages(session.clone(), stream);
128        let session_task = session_handler(session.clone(), config.session_duration);
129        future::join3(send_task, receive_task, session_task).await;
130    });
131    Ok(response)
132}