zero4rs 2.0.0

zero4rs is a powerful, pragmatic, and extremely fast web framework for Rust
Documentation
use std::collections::HashMap;
use std::str::FromStr;

use actix_web::{rt, web, HttpRequest, HttpResponse, Responder};
use harsh::Harsh;

use crate::core::{auth0::Requestor, R};

use crate::core::error2::Error;
use crate::core::error2::Result;
use crate::core::request2::Render;

use super::{
    command::Broadcast,
    connection_new,
    connection_new::Connection,
    input::{Action, Input, TalkMessage},
    output::{Level, Ty},
    service_chat, WEBCHAT_BROADCAST_CHANNEL,
};

/// Handshake and start WebSocket handler.
pub async fn ws(
    req: HttpRequest,
    stream: web::Payload,
    connection: web::Data<Connection>,
    requestor: web::ReqData<Requestor>,
) -> Result<HttpResponse> {
    let (res, session, msg_stream) = actix_ws::handle(&req, stream).map_err(Error::run_time)?;
    let connetion = (**connection).clone();
    let requestor = requestor.into_inner();

    // spawn websocket handler (and don't await it) so that the response is returned immediately
    rt::spawn(connection_new::create(
        connetion, requestor, session, msg_stream,
    ));

    Ok(res)
}

/// get total connections count
pub async fn total(request: HttpRequest, connection: web::Data<Connection>) -> impl Responder {
    request.json(200, R::ok(connection.total().await))
}

/// 返回所有在线用户(不包含自己)
pub async fn users_online(
    _body: web::Bytes,
    request: HttpRequest,
    connection: web::Data<Connection>,
) -> Result<impl Responder> {
    Ok(request.json(200, R::ok(connection.users_online().await)))
}

/// 查询所有连接
pub async fn all_connections(
    _body: web::Bytes,
    request: HttpRequest,
    connection: web::Data<Connection>,
) -> Result<impl Responder> {
    Ok(request.json(200, R::ok(connection.all_connections().await)))
}

/// closing a connection
pub async fn closing(
    body: web::Bytes,
    connection: web::Data<Connection>,
    request: HttpRequest,
) -> Result<impl Responder> {
    // 需验证权限,只有管理员能执行该操作

    let message = match crate::commons::bytes_to_string(body.to_vec()) {
        Ok(val) => val,
        Err(e) => return Ok(request.json(200, R::failed(400, e.to_string()))),
    };

    let connid = Harsh::default()
        .decode(&message)
        .map(|val| val[0])
        .map_err(|e| Error::invalid_request(format!("input={}, error={:?}", message, e)))?;

    connection.closing(connid as usize);

    Ok(request.json(200, R::ok(true)))
}

/// 广播一条消息
pub async fn broadcast(
    query: web::Query<HashMap<String, String>>,
    body: web::Bytes,
    ctx: actix_web::web::Data<crate::server::AppContext>,
    _connection: web::Data<Connection>,
    _requestor: web::ReqData<Requestor>,
    request: HttpRequest,
) -> Result<impl Responder> {
    // 需验证权限,只有管理员能执行该操作
    // TODO
    let level = query
        .get("level")
        .ok_or_else(|| Error::invalid_request("Missing query string parameter: level"))?;

    let ty = query
        .get("ty")
        .ok_or_else(|| Error::invalid_request("Missing query string parameter: ty"))?;

    let level = Level::from_str(level).map_err(|e| {
        Error::invalid_request(format!("Invalid parameter: level={}: error={}", level, e))
    })?;

    let ty = Ty::from_str(ty).map_err(|e| {
        Error::invalid_request(format!("Invalid parameter: ty={}: error={}", ty, e))
    })?;

    let msg = crate::commons::bytes_to_string(body.to_vec())
        .map_err(|e| Error::invalid_request(format!("Invalid parameter: error={}", e)))?;

    let broadcast = Broadcast { level, ty, msg };

    // connection.broadcast(message);

    ctx.rudis()
        .publishs(
            WEBCHAT_BROADCAST_CHANNEL,
            &serde_json::to_string(&broadcast)
                .map_err(|e| Error::invalid_request(format!("Invalid parameter: error={}", e)))?,
        )
        .await
        .map_err(Error::run_time)?;

    Ok(request.json(200, R::ok(true)))
}

/// 发送一条聊天信息
pub async fn talk_message(
    query: web::Query<HashMap<String, String>>,
    data: web::Json<Input>,
    _ctx: actix_web::web::Data<crate::server::AppContext>,
    connection: web::Data<Connection>,
    requestor: web::ReqData<Requestor>,
    request: HttpRequest,
) -> Result<impl Responder> {
    if Action::Talk != data.action {
        return Ok(request.json(200, R::failed(413, "invalid action")));
    }

    let connid = query
        .get("connid")
        .ok_or_else(|| Error::invalid_request("Missing query string parameter: connid"))?;

    let connid = Harsh::default()
        .decode(connid)
        .map(|val| val[0])
        .map_err(|e| Error::invalid_request(format!("input={}, error={:?}", connid, e)))?;

    let input = data.into_inner();
    let user = requestor.user();

    match TalkMessage::parse(&input.data) {
        Ok(data) => {
            connection.fire_talk_message(connid as usize, user, input.id, data.clone());

            Ok(request.json(200, R::ok(data)))
        }
        Err(e) => Ok(request.json(200, R::failed(413, format!("invalid data: error={:?}", e)))),
    }
}

/// 发送一条聊天信息
pub async fn send_message(
    query: web::Query<HashMap<String, String>>,
    message: web::Bytes,
    context: actix_web::web::Data<crate::server::AppContext>,
    requestor: web::ReqData<Requestor>,
    request: HttpRequest,
) -> Result<impl Responder> {
    let mut _to: String = String::new();

    if let Some(to) = query.get("to") {
        to.clone_into(&mut _to);
    } else {
        return Ok(request.json(200, R::failed(413, "Missing query string parameter: to")));
    }

    let message = match crate::commons::bytes_to_string(message.to_vec()) {
        Ok(val) => val,
        Err(e) => return Ok(request.json(200, R::failed(400, e.to_string()))),
    };

    let _from: String = requestor.user().user_id();

    let msg1 = TalkMessage {
        to: _to,
        to_role: "normal_user".to_owned(),
        msg: message,
        msg_type: "text".to_owned(),
    };

    let result = service_chat::save_chat_message2(
        &context,
        Some(uuid::Uuid::new_v4().to_string()),
        &_from,
        msg1,
    )
    .await?;

    Ok(request.json(200, R::ok(result)))
}

/// 查询和某人的历史聊天记录
pub async fn history_message(
    data: web::Form<HashMap<String, String>>,
    _ctx: actix_web::web::Data<crate::server::AppContext>,
    connection: web::Data<Connection>,
    _requestor: web::ReqData<Requestor>,
    request: HttpRequest,
) -> Result<impl Responder> {
    let mut _to: String = String::new();

    if let Some(to) = data.get("to") {
        to.clone_into(&mut _to);
    } else {
        return Ok(request.json(200, R::failed(413, "Missing POST parameter: to")));
    }

    let _from: String = _requestor.user().user_id();

    connection.fire_history_message(Some("1".to_string()), 1, _to);

    Ok(request.json(200, R::ok(true)))
}

/// 查询和某人的历史聊天记录
pub async fn query_history_msg(
    data: web::Query<HashMap<String, String>>,
    context: actix_web::web::Data<crate::server::AppContext>,
    _requestor: web::ReqData<Requestor>,
    request: HttpRequest,
) -> Result<impl Responder> {
    let mut _to: String = String::new();

    if let Some(to) = data.get("to") {
        to.clone_into(&mut _to);
    } else {
        return Ok(request.json(200, R::failed(413, "Missing query string parameter: to")));
    }

    let _from: String = _requestor.user().user_id();
    let _result = service_chat::query_history_msg(&context, &_from, &_to).await?;

    Ok(request.json(200, R::ok(_result)))
}

/// 查询和某人的最后一条聊天信息
pub async fn query_last_message(
    data: web::Query<HashMap<String, String>>,
    context: actix_web::web::Data<crate::server::AppContext>,
    _requestor: web::ReqData<Requestor>,
    request: HttpRequest,
) -> Result<impl Responder> {
    let mut _to: String = String::new();

    if let Some(to) = data.get("to") {
        to.clone_into(&mut _to);
    } else {
        return Ok(request.json(200, R::failed(413, "Missing query string parameter: to")));
    }

    let _from: String = _requestor.user().user_id();

    let result = service_chat::query_last_message(&context, &_from, &_to).await?;

    Ok(request.json(200, R::ok(result)))
}

/// 查询历史对话(返回所有对话人的user_id)
pub async fn history_talk(
    _ctx: actix_web::web::Data<crate::server::AppContext>,
    _requestor: web::ReqData<Requestor>,
    request: HttpRequest,
) -> Result<impl Responder> {
    let _from: String = _requestor.user().user_id();

    let result = service_chat::query_history_talk(&_ctx, &_from)
        .await
        .map_err(Error::UnexpectedError)?;

    Ok(request.json(200, R::ok(result)))
}