zero4rs 2.0.0

zero4rs is a powerful, pragmatic, and extremely fast web framework for Rust
Documentation
use std::time::{Duration, Instant};

use actix_web::web;
use actix_ws::Message;
use futures_util::StreamExt as _;

use tokio::{select, sync::broadcast, time::interval};

/// How often heartbeat pings are sent.
///
/// Should be half (or less) of the acceptable client timeout.
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);

/// How long before lack of client response causes a timeout.
const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);

/// Broadcast text & binary messages received from a client, respond to ping messages, and monitor
/// connection health to detect network issues and free up resources.
pub async fn echo_broadcast_ws(
    mut session: actix_ws::Session,
    mut msg_stream: actix_ws::MessageStream,
    mut rx: broadcast::Receiver<web::Bytes>,
) {
    log::info!("connected");

    let mut last_heartbeat = Instant::now();
    let mut interval = interval(HEARTBEAT_INTERVAL);

    let reason = loop {
        // waits for either `msg_stream` to receive a message from the client, the broadcast channel
        // to send a message, or the heartbeat interval timer to tick, yielding the value of
        // whichever one is ready first
        select! {
            broadcast_msg = rx.recv() => {
                let msg = match broadcast_msg {
                    Ok(msg) => msg,
                    Err(broadcast::error::RecvError::Closed) => break None,
                    Err(broadcast::error::RecvError::Lagged(_)) => continue,
                };

                let res = match std::str::from_utf8(&msg) {
                    Ok(val) => session.text(val).await,
                    Err(_) => session.binary(msg).await,
                };

                if let Err(err) = res {
                    log::error!("{err}");
                    break None;
                }
            }

            // heartbeat interval ticked
            _tick = interval.tick() => {
                // if no heartbeat ping/pong received recently, close the connection
                if Instant::now().duration_since(last_heartbeat) > CLIENT_TIMEOUT {
                    log::info!(
                        "client has not sent heartbeat in over {CLIENT_TIMEOUT:?}; disconnecting"
                    );

                    break None;
                }

                // send heartbeat ping
                let _ = session.ping(b"").await;
            },

            msg = msg_stream.next() => {
                let msg = match msg {
                    // received message from WebSocket client
                    Some(Ok(msg)) => msg,

                    // client WebSocket stream error
                    Some(Err(err)) => {
                        log::error!("{err}");
                        break None;
                    }

                    // client WebSocket stream ended
                    None => break None
                };

                log::debug!("msg: {msg:?}");

                match msg {
                    Message::Text(_) => {
                        // drop client's text messages
                    }

                    Message::Binary(_) => {
                        // drop client's binary messages
                    }

                    Message::Close(reason) => {
                        break reason;
                    }

                    Message::Ping(bytes) => {
                        last_heartbeat = Instant::now();
                        let _ = session.pong(&bytes).await;
                    }

                    Message::Pong(_) => {
                        last_heartbeat = Instant::now();
                    }

                    Message::Continuation(_) => {
                        log::warn!("no support for continuation frames");
                    }

                    // no-op; ignore
                    Message::Nop => {}
                };
            }
        }
    };

    // attempt to close connection gracefully
    let _ = session.close(reason).await;

    log::info!("disconnected");
}