Skip to main content

route_util/
mailbox.rs

1use crate::http1_backend::get_pool_and_meta;
2use anyhow::anyhow;
3use detcd::ServiceState;
4use http_body_util::BodyExt;
5use http_pool::body::VariantBody;
6use http_pool::net_pool::{Pool, Pools};
7use hyper::body::{Bytes, Incoming};
8use hyper::{Request, Response, StatusCode};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11
12#[derive(Debug, Copy, Clone, Eq, PartialEq)]
13pub enum ErrorCode {
14    /// 成功
15    Ok = 0,
16    /// 目标服不存在
17    NoTarget = 1,
18    /// 空负载
19    EmptyPayload = 2,
20    /// 空key
21    EmptyKey = 3,
22    /// 无效id
23    InvalidServiceId = 4,
24    /// 解包错误
25    ErrDecode = 5,
26    /// 封包错误
27    ErrEncode = 6,
28    /// 系统错误
29    ErrSystem = 7,
30    /// 目标服还未注册
31    TargetNotReady = 8,
32}
33
34/// 邮箱请求包
35#[derive(Serialize, Deserialize)]
36pub struct MailboxReq {
37    /// 目标服务
38    pub service: String,
39    /// key, 状态服用
40    pub key: Option<String>,
41    /// 服务id, 固定服用
42    pub service_id: Option<u32>,
43    /// 负载
44    pub payload: String,
45    /// 头部
46    pub headers: HashMap<String, String>,
47}
48
49impl MailboxReq {
50    pub fn new() -> Self {
51        Self {
52            service: String::new(),
53            key: None,
54            service_id: None,
55            payload: String::new(),
56            headers: HashMap::new(),
57        }
58    }
59}
60
61/// 邮箱回得包
62#[derive(Serialize, Deserialize)]
63pub struct MailboxRsp {
64    /// 与MailboxErrorCode对应
65    pub code: i32,
66    /// 描述信息
67    pub msg: Option<String>,
68}
69
70impl MailboxRsp {
71    pub fn new() -> Self {
72        Self {
73            code: ErrorCode::Ok as i32,
74            msg: None,
75        }
76    }
77}
78
79/// 解析器
80pub trait MailboxParser: Sized {
81    type Error: std::error::Error + Send + Sync + 'static;
82    fn parse(data: Bytes, zip: Option<String>) -> Result<MailboxReq, Self::Error>;
83}
84
85/// 发送到邮箱
86pub async fn write_mail(
87    builder: &kmailbox::Builder,
88    req: &MailboxReq,
89) -> Result<(), kmailbox::KafkaError> {
90    let writer = builder.writer()?;
91
92    // 构建邮箱结构
93    let mut mail = kmailbox::SendMail::new()
94        .key(req.key.as_ref())
95        .payload(Some(&req.payload))
96        .partition(req.service_id.map(|id| id as i32));
97
98    // 填充头部
99    for kv in req.headers.iter() {
100        mail = mail.add_header(kv.0, Some(kv.1));
101    }
102
103    writer.write(&req.service, mail).await
104}
105
106pub async fn send_to_mailbox<P: Pool, E: MailboxParser>(
107    builder: &kmailbox::Builder,
108    pools: Pools<P>,
109    req: Request<Incoming>,
110) -> Result<(Response<VariantBody>, Option<anyhow::Error>), anyhow::Error> {
111    // 获取完整的body
112    let body_data = req.collect().await.map_err(|e| anyhow::Error::new(e))?;
113
114    // 回复结构
115    let mut mb_rsp = MailboxRsp::new();
116    let mut err: Option<anyhow::Error> = None;
117
118    let process = async || {
119        // 解析body
120        let mut mb_req = match E::parse(body_data.to_bytes(), None) {
121            Ok(r) => r,
122            Err(e) => {
123                err = Some(anyhow::Error::new(e));
124                mb_rsp.code = ErrorCode::ErrDecode as i32;
125                return;
126            }
127        };
128
129        // 校验mb_req是否合法
130        if mb_req.payload.is_empty() {
131            mb_rsp.code = ErrorCode::EmptyPayload as i32;
132            return;
133        }
134        if mb_req.service.is_empty() {
135            mb_rsp.code = ErrorCode::NoTarget as i32;
136            return;
137        }
138
139        // 获取服务类型
140        let (pool, meta) = match get_pool_and_meta(&pools, &mb_req.service) {
141            Ok((p, m)) => (p, m),
142            Err(e) => {
143                err = Some(e);
144                mb_rsp.code = ErrorCode::NoTarget as i32;
145                return;
146            }
147        };
148
149        // 获取分区
150        let partition = match meta.state {
151            Some(ServiceState::Fixed) => {
152                // 校验service id是否合法
153                if mb_req.service_id.is_none() || mb_req.service_id.unwrap() >= meta.instances {
154                    mb_rsp.code = ErrorCode::InvalidServiceId as i32;
155                    return;
156                }
157                mb_req.service_id
158            }
159            Some(ServiceState::Stateful) => {
160                // 校验key是否合法
161                let key = mb_req.key.as_ref().map_or("", |k| k);
162                if key.is_empty() {
163                    mb_rsp.code = ErrorCode::EmptyKey as i32;
164                    return;
165                }
166                // 为了保证邮件的路由规则与消息包的路由规则一致, 这里要根据mb_req.key判断邮件去到哪个目标
167                match pool.get_backend(key) {
168                    Some(bs) => bs.id(),
169                    None => {
170                        mb_rsp.code = ErrorCode::TargetNotReady as i32;
171                        return;
172                    }
173                }
174            }
175            _ => None,
176        };
177
178        // 写邮件
179        mb_req.service_id = partition;
180        match write_mail(builder, &mb_req).await {
181            Err(e) => {
182                err = Some(anyhow!("{:?}", e));
183                mb_rsp.code = ErrorCode::ErrSystem as i32;
184                return;
185            }
186            _ => {}
187        }
188    };
189
190    process().await;
191
192    let resp = serde_json::to_string(&mb_rsp).unwrap();
193    Ok((
194        super::make_response::response(
195            StatusCode::OK,
196            Some(resp),
197            super::make_response::JSON_CONTENT_TYPE,
198        ),
199        err,
200    ))
201}