zero4rs 2.0.0

zero4rs is a powerful, pragmatic, and extremely fast web framework for Rust
Documentation
use std::str::FromStr;
use std::time::{Duration, Instant};

use actix::prelude::*;
use actix_web_actors::ws;
use actix_web_actors::ws::Message::Text;

use crate::websocket::devstreams::Connect;
use crate::websocket::devstreams::Disconnect;
use crate::websocket::devstreams::Subscribe;
use crate::websocket::devstreams::SubscribeChannel;

use crate::websocket::devstreams::InputMessage;
use crate::websocket::devstreams::InputMessageType;
use crate::websocket::devstreams::OutputMessage;
use crate::websocket::devstreams::OutputMessageType;
use crate::websocket::devstreams::TypedMessage;

use crate::websocket::devstreams::Server;

use crate::websocket::devstreams::ConnectId;

const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);

pub struct Connection {
    pub id: ConnectId,
    pub hb: Instant,
    pub addr: actix::Addr<Server>,
}

impl Connection {
    pub fn new(addr: actix::Addr<Server>) -> Self {
        Self {
            // id: crate::commons::ytid(),
            id: crate::commons::base64_uuid(),
            hb: Instant::now(),
            addr,
        }
    }

    fn hb(&self, ctx: &mut <Self as Actor>::Context) {
        ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
            if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
                ctx.stop();
                return;
            }

            ctx.ping(b"Are you there?");
        });
    }
}

impl Actor for Connection {
    type Context = ws::WebsocketContext<Self>;

    fn started(&mut self, ctx: &mut Self::Context) {
        self.hb(ctx);

        let addr = ctx.address();

        self.addr
            // send (asynchronously), or do_send (同步)
            .send(
                // 新的连接
                Connect {
                    addr: addr.recipient(),
                    id: self.id.to_owned(),
                },
            )
            .into_actor(self)
            .then(|res, _, ctx| {
                match res {
                    Ok(_res) => (),
                    _ => ctx.stop(),
                }
                fut::ready(())
            })
            .wait(ctx);
    }

    fn stopping(&mut self, _: &mut Self::Context) -> Running {
        self.addr.do_send(
            // 停止时,连接断开
            Disconnect {
                id: self.id.to_owned(),
            },
        );

        Running::Stop
    }
}

impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Connection {
    fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
        match msg {
            // 客户端上报心跳(ping), 回复一个 pong.
            Ok(ws::Message::Ping(msg)) => {
                self.hb = Instant::now();
                ctx.pong(&msg);
            }
            // 客户端响应心跳(ping), 此处更新存活时间
            Ok(ws::Message::Pong(_)) => {
                self.hb = Instant::now();
            }
            // 客户端发送的二进制消息,我们将把它发送到 WebSocket 上下文,WebSocket 上下文会弄清楚如何处理它。
            // 实际上,这应该永远不会被触发。
            Ok(ws::Message::Binary(bin)) => ctx.binary(bin),
            // 客户端主动断开连接。
            Ok(ws::Message::Close(reason)) => {
                ctx.close(reason);
                ctx.stop();
            }
            // 我们不会响应连续帧(简而言之,这些是无法适应一个消息的 WebSocket 消息)。
            Ok(ws::Message::Continuation(_)) => {
                ctx.stop();
            }
            // 在 nop 时执行 nop(无操作)。
            Ok(ws::Message::Nop) => (),
            // 在文本消息上,(这个我们会做最多的!)将其发送到大厅。大厅将负责将其代理到需要去的地方。
            Ok(Text(s)) => match serde_json::from_str::<InputMessage>(&s) {
                Ok(content) => ctx.notify(content),
                Err(err) => ctx.text(OutputMessage::err(err.to_string(), &self.id)),
            },
            // 在出现错误时,打印日志。你可能想合理地实现在这里该做什么。
            Err(e) => {
                log::error!("error={:?}", e);
            }
        }
    }
}

impl Handler<InputMessage> for Connection {
    type Result = ();

    fn handle(&mut self, msg: InputMessage, ctx: &mut Self::Context) -> Self::Result {
        let val = msg.data;

        let out = match msg.ty {
            InputMessageType::Msg => {
                let out = OutputMessage {
                    ty: OutputMessageType::Msg,
                    msg: TypedMessage::TextMessage(val.as_str().unwrap().to_string()),
                    id: self.id.to_owned(),
                };

                out
            }
            InputMessageType::Subscribe => {
                match crate::commons::get_value_as_string(&val, "channel") {
                    Ok(Some(c)) => match SubscribeChannel::from_str(&c) {
                        Ok(channel) => {
                            self.addr.do_send(Subscribe {
                                id: self.id.to_owned(),
                                channel,
                            });

                            OutputMessage {
                                ty: OutputMessageType::Msg,
                                msg: TypedMessage::TextMessage(format!(
                                    "subscribe success: channel={}",
                                    c
                                )),
                                id: self.id.to_owned(),
                            }
                        }
                        Err(_) => OutputMessage {
                            ty: OutputMessageType::Err,
                            msg: TypedMessage::TextMessage("invalid channel".to_string()),
                            id: self.id.to_owned(),
                        },
                    },
                    Ok(None) => OutputMessage {
                        ty: OutputMessageType::Err,
                        msg: TypedMessage::TextMessage("channel is required".to_string()),
                        id: self.id.to_owned(),
                    },
                    Err(e) => OutputMessage {
                        ty: OutputMessageType::Err,
                        msg: TypedMessage::TextMessage(format!("invalid channel: error={:?}", e)),
                        id: self.id.to_owned(),
                    },
                }
            }
        };

        ctx.notify(out)
    }
}

impl Handler<OutputMessage> for Connection {
    type Result = ();

    fn handle(&mut self, msg: OutputMessage, ctx: &mut Self::Context) {
        ctx.text(msg.output());
    }
}