zero4rs 2.0.0

zero4rs is a powerful, pragmatic, and extremely fast web framework for Rust
Documentation
use std::time::Instant;

use futures_util::{
    future::{select, Either},
    StreamExt as _,
};

use tokio::sync::mpsc;
use tokio::{pin, time::interval};

use actix_ws::Message;

use crate::core::websocat::echo::output::{Level, OutputMessage, Ty};

use crate::core::{
    auth0::Requestor,
    websocat::echo::{CLIENT_TIMEOUT, CLIENT_TIMEOUT_WARNNING, HEARTBEAT_INTERVAL},
};

use super::{command::Command, UserInfo};

/// Handle and command sender for chat server.
///
/// Reduces boilerplate of setting up response channels in WebSocket handlers.
#[derive(Clone)]
pub struct Connection {
    pub cmd_tx: mpsc::UnboundedSender<Command>,
}

/// Echo text & binary messages received from the client, respond to ping messages, and monitor
/// connection health to detect network issues and free up resources.
pub async fn create(
    connection: Connection,
    requestor: Requestor,
    mut session: actix_ws::Session,
    mut msg_stream: actix_ws::MessageStream,
) {
    let user_info: UserInfo = if let Some(user) = requestor.get_user() {
        (
            user.user_id.to_owned(),
            user.user_name.to_owned(),
            user.user_role.to_owned(),
        )
    } else {
        (
            format!("guest_{}", uuid::Uuid::now_v7()),
            "guest_user".to_string(),
            "guest".to_string(),
        )
    };

    let mut last_heartbeat = Instant::now();
    let mut interval = interval(HEARTBEAT_INTERVAL);
    let mut should_be_closed = false;

    let (conn_tx, mut conn_rx) = mpsc::unbounded_channel();

    // unwrap: chat server is not dropped before the HTTP server
    let conn_id = connection.connect(conn_tx, user_info).await;

    let curr_user = requestor.get_user();

    let close_reason = loop {
        // most of the futures we process need to be stack-pinned to work with select()

        let tick = interval.tick();

        pin!(tick);

        let msg_rx = conn_rx.recv();

        pin!(msg_rx);

        // TODO: nested select is pretty gross for readability on the match
        let messages = select(msg_stream.next(), msg_rx);

        pin!(messages);

        match select(messages, tick).await {
            // commands & messages received from client
            Either::Left((Either::Left((Some(Ok(msg)), _)), _)) => {
                match msg {
                    // 客户端上报心跳(ping), 回复一个 pong.
                    Message::Ping(bytes) => {
                        if !should_be_closed {
                            last_heartbeat = Instant::now();
                            // unwrap :(
                            session.pong(&bytes).await.unwrap();
                        }
                    }
                    // 客户端响应心跳(ping), 此处更新心跳时间
                    Message::Pong(_) => {
                        if !should_be_closed {
                            last_heartbeat = Instant::now();
                        }
                    }
                    // 在文本消息上,(这个我们会做最多的!)将其发送到大厅。大厅将负责将其代理到需要去的地方。
                    Message::Text(text) => {
                        connection.process_message(conn_id, curr_user, text.trim());
                    }
                    // 客户端发送的二进制消息,我们将把它发送到 WebSocket 上下文,WebSocket 上下文会弄清楚如何处理它。
                    // 实际上,这应该永远不会被触发。
                    Message::Binary(_bin) => {
                        log::warn!("unexpected binary message");
                    }
                    // 我们不会响应连续帧(简而言之,这些是无法适应一个消息的 WebSocket 消息)。
                    Message::Continuation(_) => {}
                    // 在 nop 时执行 nop(无操作)。
                    Message::Nop => {}
                    // 客户端主动断开连接。
                    Message::Close(reason) => {
                        break reason;
                    }
                }
            }

            // client WebSocket stream error
            Either::Left((Either::Left((Some(Err(err)), _)), _)) => {
                log::error!("{}", err);
                break None;
            }

            // client WebSocket stream ended
            Either::Left((Either::Left((None, _)), _)) => break None,

            // chat messages received from other room participants
            Either::Left((Either::Right((Some(msg), _)), _)) => {
                if msg != "Close" {
                    match session.text(&*msg).await {
                        Ok(_) => {}
                        Err(e) => {
                            log::error!("msg={}, error={}", &msg, e);
                        }
                    }
                } else {
                    // 当前连接将要被关闭, 停止更新心跳
                    log::info!("当前连接将要被关闭, 停止更新心跳: connid={}", conn_id);
                    should_be_closed = true;
                }
            }

            // all connection's message senders were dropped
            Either::Left((Either::Right((None, _)), _)) => unreachable!(
                "all connection message senders were dropped; chat server may have panicked"
            ),

            // heartbeat internal tick
            Either::Right((_inst, _)) => {
                // if no heartbeat ping/pong received recently, close the connection
                if Instant::now().duration_since(last_heartbeat) > CLIENT_TIMEOUT {
                    log::info!(
                        "client has not sent heartbeat in over ({CLIENT_TIMEOUT:?}), connid={}",
                        conn_id
                    );

                    break None;
                } else if Instant::now().duration_since(last_heartbeat) > CLIENT_TIMEOUT_WARNNING {
                    #[rustfmt::skip]
                    let msg = format!(
                        "Your conneciton will be closed in {}ms",
                        CLIENT_TIMEOUT.as_millis() - Instant::now().duration_since(last_heartbeat).as_millis()
                    );

                    let out = OutputMessage {
                        id: None,
                        level: Level::Warn,
                        ty: Ty::Disconnect,
                        msg: Some(msg),
                        data: None,
                    };

                    if let Some(message) = out.message() {
                        connection.fire_message(conn_id, message);
                    }
                }

                // send heartbeat ping
                let _ = session.ping(b"Are you there?").await;
            }
        };
    };

    connection.disconnect(conn_id);

    // attempt to close connection gracefully
    let _ = session.close(close_reason).await;
}