zero4rs 2.0.0

zero4rs is a powerful, pragmatic, and extremely fast web framework for Rust
Documentation
use lazy_static::lazy_static;

use serde_json::json;

use std::collections::HashMap;
use std::sync::Mutex;
use std::time::{Duration, Instant};

use actix::prelude::*;
use actix_web::web::Bytes;
use actix_web_actors::ws;

use crate::core::auth0::Requestor;
use crate::websocket::webchat::actions::ConnectionWarnning;
use crate::websocket::webchat::actions::Disconnect;
use crate::websocket::webchat::input::subscribes::SubscribeAction;
use crate::websocket::webchat::input::subscribes::UnsubscribeAction;
use crate::websocket::webchat::input::Action;
use crate::websocket::webchat::input::Input;
use crate::websocket::webchat::output::Level;
use crate::websocket::webchat::output::OutputMessage;
use crate::websocket::webchat::output::OutputMessageType;
use crate::websocket::webchat::output::TypedMessage;
use crate::websocket::webchat::server::Server;
use crate::websocket::webchat::ConnectId;

use crate::websocket::webchat::ctx::TIMER_INTERVAL;

use super::input::chats::SendMessageAction;

lazy_static! {
    #[rustfmt::skip]
    pub static ref ALL_CONNECTIONS_LINE: Mutex<HashMap<String, i64>> = Mutex::new(HashMap::new());
    pub static ref CONNECTION_TO_REMOVE: Mutex<HashMap<String, i64>> = Mutex::new(HashMap::new());
}

const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(2);
const CLIENT_TIMEOUT_WARNNING: Duration = Duration::from_secs(10);
const CLIENT_TIMEOUT_REMOVE: Duration = Duration::from_secs(30);

pub struct Connection {
    pub id: ConnectId,
    pub hb: Instant,
    // 这是套接字所在大厅的地址。这将用于向大厅发送数据。
    // 向大厅发送信息时可能如下所示:self.addr.do_send('hi!')。
    pub addr: actix::Addr<Server>,
    pub requestor: Requestor,
}

impl Handler<OutputMessage> for Connection {
    type Result = ();

    fn handle(&mut self, msg: OutputMessage, ctx: &mut Self::Context) {
        if let Some(output) = msg.output() {
            ctx.text(output);
        }
    }
}

impl Connection {
    pub fn established(addr: actix::Addr<Server>, requestor: Requestor) -> Self {
        let connection_id = crate::commons::base64_uuid();
        let mut map = ALL_CONNECTIONS_LINE.lock().unwrap();

        map.insert(
            connection_id.to_owned(),
            chrono::Utc::now().timestamp_millis(),
        );

        Self {
            id: connection_id,
            hb: Instant::now(),
            addr,
            requestor,
        }
    }

    pub fn ping(&self, ctx: &mut <Self as Actor>::Context) {
        ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
            if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT_REMOVE {
                act.addr.do_send(Disconnect {
                    id: act.id.to_owned(),
                });

                act.clear_connection();
                ctx.stop();
            } else if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT_WARNNING {
                #[rustfmt::skip]
                act.addr.send(ConnectionWarnning {
                    id: act.id.to_owned(),
                    at: format!(
                        "{}ms",
                        CLIENT_TIMEOUT_REMOVE.as_millis() - Instant::now().duration_since(act.hb).as_millis()
                    ),
                })
                .into_actor(act)
                .then(|_, _, _| {
                    fut::ready(())
                })
                .wait(ctx);
            }

            ctx.ping(b"Are you there?");
        });
    }

    pub fn server_time(&self, ctx: &mut <Self as Actor>::Context) {
        ctx.run_interval(*TIMER_INTERVAL, |_, ctx| {
            let s = chrono::Utc::now()
                .format("%Y-%m-%dT%H:%M:%S.%3f")
                .to_string()
                + "Z";

            ctx.binary(Bytes::from(s));
        });
    }

    pub fn has_connection(connection_id: &str) -> bool {
        let map = ALL_CONNECTIONS_LINE.lock().unwrap();

        map.contains_key(connection_id)
    }

    pub fn pending_remove(connection_id: &str) {
        if !Self::has_connection(connection_id) {
            return;
        }

        let mut map = CONNECTION_TO_REMOVE.lock().unwrap();

        if !map.contains_key(connection_id) {
            map.insert(
                connection_id.to_owned(),
                chrono::Utc::now().timestamp_millis(),
            );
        }
    }

    pub fn should_be_remove(&self) -> bool {
        let map = CONNECTION_TO_REMOVE.lock().unwrap();
        map.contains_key(&self.id)
    }

    pub fn clear_connection(&self) {
        let mut map1 = CONNECTION_TO_REMOVE.lock().unwrap();
        map1.remove(&self.id);
        let mut map2 = ALL_CONNECTIONS_LINE.lock().unwrap();
        map2.remove(&self.id);
    }
}

impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Connection {
    fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
        match msg {
            // 客户端上报心跳(ping), 回复一个 pong.
            Ok(ws::Message::Ping(msg)) => {
                if !self.should_be_remove() {
                    self.hb = Instant::now();
                    ctx.pong(&msg);
                }
            }
            // 客户端响应心跳(ping), 此处更新存活时间
            Ok(ws::Message::Pong(_)) => {
                if !self.should_be_remove() {
                    self.hb = Instant::now();
                }
            }
            // 客户端发送的二进制消息,我们将把它发送到 WebSocket 上下文,WebSocket 上下文会弄清楚如何处理它。
            // 实际上,这应该永远不会被触发。
            Ok(ws::Message::Binary(bin)) => ctx.binary(bin),
            // 客户端主动断开连接。
            Ok(ws::Message::Close(reason)) => {
                self.clear_connection();
                ctx.close(reason);
                ctx.stop();
            }
            // 我们不会响应连续帧(简而言之,这些是无法适应一个消息的 WebSocket 消息)。
            Ok(ws::Message::Continuation(_)) => {
                ctx.stop();
            }
            // 在 nop 时执行 nop(无操作)。
            Ok(ws::Message::Nop) => (),
            // 在文本消息上,(这个我们会做最多的!)将其发送到大厅。大厅将负责将其代理到需要去的地方。
            Ok(ws::Message::Text(msg)) => match serde_json::from_str::<Input>(&msg) {
                Ok(content) => ctx.notify(content),
                Err(err) => {
                    if msg.starts_with('{') && msg.ends_with('}') {
                        ctx.text(OutputMessage::err(
                            OutputMessageType::Failed,
                            &err.to_string(),
                            None,
                        ))
                    } else {
                        let s = format!("{}", msg);
                        ctx.binary(Bytes::from(s));
                    }
                }
            },
            // 在出现错误时,打印日志。你可能想合理地实现在这里该做什么。
            Err(e) => {
                log::error!("connection_id={}, error={:?}", self.id, e);
            } // _ => ctx.stop(),
        }
    }
}

impl Handler<Input> for Connection {
    type Result = ();

    fn handle(&mut self, msg: Input, ctx: &mut Self::Context) -> Self::Result {
        let val = msg.data;

        let out = match msg.action {
            Action::Subscribe => match SubscribeAction::parse(&self.id, &val) {
                Ok(subscribe) => {
                    let out = OutputMessage {
                        id: msg.id,
                        level: Level::Info,
                        ty: OutputMessageType::Success,
                        msg: TypedMessage::JsonMessage(
                            json! ({"action": Action::Subscribe, "channel": subscribe.channel}),
                        ),
                    };

                    self.addr.do_send(subscribe);

                    out
                }
                Err(err) => OutputMessage {
                    id: msg.id,
                    level: Level::Err,
                    ty: OutputMessageType::Failed,
                    msg: TypedMessage::JsonMessage(
                        json! ({"action": Action::Subscribe, "description": err}),
                    ),
                },
            },
            Action::Unsubscribe => match UnsubscribeAction::parse(&self.id, &val) {
                Ok(unsubscribe) => {
                    let out = OutputMessage {
                        id: msg.id,
                        level: Level::Info,
                        ty: OutputMessageType::Success,
                        msg: TypedMessage::JsonMessage(
                            json! ({"action": Action::Unsubscribe, "channel": unsubscribe.channel}),
                        ),
                    };

                    self.addr.do_send(unsubscribe);

                    out
                }
                Err(err) => OutputMessage {
                    id: msg.id,
                    level: Level::Err,
                    ty: OutputMessageType::Failed,
                    msg: TypedMessage::JsonMessage(
                        json! ({"action": Action::Unsubscribe, "description": err}),
                    ),
                },
            },
            Action::SendMessage => match SendMessageAction::parse(&self.id, &val) {
                Ok(message) => {
                    self.addr.do_send(message);

                    OutputMessage {
                        id: msg.id,
                        level: Level::Info,
                        ty: OutputMessageType::Accept,
                        msg: TypedMessage::JsonMessage(json!(true)),
                    }
                }
                Err(err) => OutputMessage {
                    id: msg.id,
                    level: Level::Err,
                    ty: OutputMessageType::Failed,
                    msg: TypedMessage::JsonMessage(
                        json! ({"action": Action::SendMessage, "description": err}),
                    ),
                },
            },
        };

        ctx.notify(out)
    }
}