route-util 0.1.0

route utils
Documentation
use crate::http1_backend::get_pool_and_meta;
use anyhow::anyhow;
use detcd::ServiceState;
use http_body_util::BodyExt;
use http_pool::body::VariantBody;
use http_pool::net_pool::{Pool, Pools};
use hyper::body::{Bytes, Incoming};
use hyper::{Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum ErrorCode {
    /// 成功
    Ok = 0,
    /// 目标服不存在
    NoTarget = 1,
    /// 空负载
    EmptyPayload = 2,
    /// 空key
    EmptyKey = 3,
    /// 无效id
    InvalidServiceId = 4,
    /// 解包错误
    ErrDecode = 5,
    /// 封包错误
    ErrEncode = 6,
    /// 系统错误
    ErrSystem = 7,
    /// 目标服还未注册
    TargetNotReady = 8,
}

/// 邮箱请求包
#[derive(Serialize, Deserialize)]
pub struct MailboxReq {
    /// 目标服务
    pub service: String,
    /// key, 状态服用
    pub key: Option<String>,
    /// 服务id, 固定服用
    pub service_id: Option<u32>,
    /// 负载
    pub payload: String,
    /// 头部
    pub headers: HashMap<String, String>,
}

impl MailboxReq {
    pub fn new() -> Self {
        Self {
            service: String::new(),
            key: None,
            service_id: None,
            payload: String::new(),
            headers: HashMap::new(),
        }
    }
}

/// 邮箱回得包
#[derive(Serialize, Deserialize)]
pub struct MailboxRsp {
    /// 与MailboxErrorCode对应
    pub code: i32,
    /// 描述信息
    pub msg: Option<String>,
}

impl MailboxRsp {
    pub fn new() -> Self {
        Self {
            code: ErrorCode::Ok as i32,
            msg: None,
        }
    }
}

/// 解析器
pub trait MailboxParser: Sized {
    type Error: std::error::Error + Send + Sync + 'static;
    fn parse(data: Bytes, zip: Option<String>) -> Result<MailboxReq, Self::Error>;
}

/// 发送到邮箱
pub async fn write_mail(
    builder: &kmailbox::Builder,
    req: &MailboxReq,
) -> Result<(), kmailbox::KafkaError> {
    let writer = builder.writer()?;

    // 构建邮箱结构
    let mut mail = kmailbox::SendMail::new()
        .key(req.key.as_ref())
        .payload(Some(&req.payload))
        .partition(req.service_id.map(|id| id as i32));

    // 填充头部
    for kv in req.headers.iter() {
        mail = mail.add_header(kv.0, Some(kv.1));
    }

    writer.write(&req.service, mail).await
}

pub async fn send_to_mailbox<P: Pool, E: MailboxParser>(
    builder: &kmailbox::Builder,
    pools: Pools<P>,
    req: Request<Incoming>,
) -> Result<(Response<VariantBody>, Option<anyhow::Error>), anyhow::Error> {
    // 获取完整的body
    let body_data = req.collect().await.map_err(|e| anyhow::Error::new(e))?;

    // 回复结构
    let mut mb_rsp = MailboxRsp::new();
    let mut err: Option<anyhow::Error> = None;

    let process = async || {
        // 解析body
        let mut mb_req = match E::parse(body_data.to_bytes(), None) {
            Ok(r) => r,
            Err(e) => {
                err = Some(anyhow::Error::new(e));
                mb_rsp.code = ErrorCode::ErrDecode as i32;
                return;
            }
        };

        // 校验mb_req是否合法
        if mb_req.payload.is_empty() {
            mb_rsp.code = ErrorCode::EmptyPayload as i32;
            return;
        }
        if mb_req.service.is_empty() {
            mb_rsp.code = ErrorCode::NoTarget as i32;
            return;
        }

        // 获取服务类型
        let (pool, meta) = match get_pool_and_meta(&pools, &mb_req.service) {
            Ok((p, m)) => (p, m),
            Err(e) => {
                err = Some(e);
                mb_rsp.code = ErrorCode::NoTarget as i32;
                return;
            }
        };

        // 获取分区
        let partition = match meta.state {
            Some(ServiceState::Fixed) => {
                // 校验service id是否合法
                if mb_req.service_id.is_none() || mb_req.service_id.unwrap() >= meta.instances {
                    mb_rsp.code = ErrorCode::InvalidServiceId as i32;
                    return;
                }
                mb_req.service_id
            }
            Some(ServiceState::Stateful) => {
                // 校验key是否合法
                let key = mb_req.key.as_ref().map_or("", |k| k);
                if key.is_empty() {
                    mb_rsp.code = ErrorCode::EmptyKey as i32;
                    return;
                }
                // 为了保证邮件的路由规则与消息包的路由规则一致, 这里要根据mb_req.key判断邮件去到哪个目标
                match pool.get_backend(key) {
                    Some(bs) => bs.id(),
                    None => {
                        mb_rsp.code = ErrorCode::TargetNotReady as i32;
                        return;
                    }
                }
            }
            _ => None,
        };

        // 写邮件
        mb_req.service_id = partition;
        match write_mail(builder, &mb_req).await {
            Err(e) => {
                err = Some(anyhow!("{:?}", e));
                mb_rsp.code = ErrorCode::ErrSystem as i32;
                return;
            }
            _ => {}
        }
    };

    process().await;

    let resp = serde_json::to_string(&mb_rsp).unwrap();
    Ok((
        super::make_response::response(
            StatusCode::OK,
            Some(resp),
            super::make_response::JSON_CONTENT_TYPE,
        ),
        err,
    ))
}