zero4rs 2.0.0

zero4rs is a powerful, pragmatic, and extremely fast web framework for Rust
Documentation
use std::{
    collections::HashMap,
    sync::{
        atomic::{AtomicUsize, Ordering},
        Arc,
    },
};

use harsh::Harsh;
use tokio::sync::mpsc;

use crate::{
    commons::parse_json_string,
    core::websocat::echo::{command::Command, connection_new::Connection},
};

use super::{command::Broadcast, server::EchoServer, WEBCHAT_BROADCAST_CHANNEL};

impl EchoServer {
    pub async fn new(ctx: actix_web::web::Data<crate::server::AppContext>) -> (Self, Connection) {
        let (this, cmd_tx) = {
            let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();

            let this = Self {
                cmd_rx,
                ctx,
                visitor_count: Arc::new(AtomicUsize::new(0)),
                sessions: HashMap::new(),
                users: HashMap::new(),
                subscribes: HashMap::new(),
            };

            (this, cmd_tx)
        };

        let connection = Connection { cmd_tx };

        // 推送服务器时间
        this.start_server_time(&connection);

        // 推送广播通知
        this.subscribe_channel(
            &connection,
            WEBCHAT_BROADCAST_CHANNEL.to_string(),
            |conn, _channel, msg| {
                if let Ok(message) = parse_json_string::<Broadcast>(msg) {
                    conn.broadcast(message.level, message.ty, message.msg);
                }
            },
        );

        (this, connection)
    }

    pub async fn run(mut self) -> std::io::Result<()> {
        while let Some(cmd) = self.cmd_rx.recv().await {
            match cmd {
                Command::Connect {
                    conn_tx,
                    res_tx,
                    user_info,
                } => {
                    let conn_id = self.handle_connect(conn_tx, user_info).await;
                    let _ = res_tx.send(conn_id);
                }
                Command::Autorization { id, conn, token } => {
                    self.handle_autorization(id, conn, token).await;
                }
                Command::Disconnect { conn } => {
                    self.handle_disconnect(conn).await;
                }
                Command::Message { conn, msg } => {
                    self.handle_message(conn, msg).await;
                }
                Command::Total { res_tx } => {
                    let count = &self.visitor_count.fetch_or(0, Ordering::SeqCst);
                    let _ = res_tx.send(*count);
                }
                Command::OnlineUsers { res_tx } => {
                    let conns = &self
                        .users
                        .clone()
                        .keys()
                        .take(10000) // 最多返回10000个
                        .cloned()
                        .collect::<Vec<String>>();

                    let _ = res_tx.send(conns.to_owned());
                }
                Command::AllConnections { res_tx } => {
                    let conns = &self
                        .users
                        .clone()
                        .into_iter()
                        .take(10000) // 最多返回100个
                        .map(|(key, val)| (Harsh::default().encode(&[val.1 as u64]), val.0 .1, key))
                        .collect::<Vec<(String, String, String)>>();

                    let _ = res_tx.send(conns.to_owned());
                }
                Command::Closed { conn } => {
                    if let Some((tx, _)) = self.sessions.get(&conn) {
                        _ = tx.send("Close".into());
                    }
                }
                Command::Broadcast { level, ty, msg } => {
                    self.handle_broadcast(level, ty, msg).await;
                }
                Command::Servertime { msg } => {
                    self.handle_servertime(msg).await;
                }
                Command::Subscribe { id, conn, channel } => {
                    self.handle_subscribe(id, conn, channel).await;
                }
                Command::Unsubscribe { id, conn, channel } => {
                    self.handle_unsubscribe(id, conn, channel).await;
                }
                Command::TalkMsg {
                    id,
                    conn,
                    curr_user,
                    msg,
                } => {
                    self.handle_chat_message(id, conn, &curr_user, msg).await;
                }
                Command::HistoryMsg { id, conn, to } => {
                    self.handle_history_msg(id, conn, &to).await;
                }
                Command::HistoryTalk { id, conn } => {
                    self.handle_history_talk(id, conn).await;
                }
                Command::Location { id, conn, msg } => {
                    self.handle_location_message(id, conn, msg).await;
                }
            }
        }

        Ok(())
    }
}