Skip to main content

route_util/
grpc_mailbox.rs

1use crate::mailbox::{MailboxParser, MailboxReq};
2use http_pool::body::VariantBody;
3use http_pool::net_pool::{Pool, Pools};
4use hyper::body::{Buf, Bytes, Incoming};
5use hyper::{Request, Response};
6use quick_protobuf::{BytesReader, MessageRead, Reader};
7
8impl<'a> MessageRead<'a> for MailboxReq {
9    fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> quick_protobuf::Result<Self> {
10        let mut msg = MailboxReq::new();
11        while !r.is_eof() {
12            match r.next_tag(bytes)? {
13                1 => msg.service = r.read_string(bytes)?.to_string(),
14                2 => msg.key = r.read_string(bytes).ok().map(|s| s.to_string()),
15                3 => msg.service_id = r.read_varint32(bytes).ok(),
16                4 => msg.payload = r.read_string(bytes)?.to_string(),
17                5 => {
18                    let (k, v) = r.read_map(
19                        bytes,
20                        |r, bytes| r.read_string(bytes),
21                        |r, bytes| r.read_string(bytes),
22                    )?;
23                    msg.headers.insert(k.to_string(), v.to_string());
24                }
25                other => r.read_unknown(bytes, other)?,
26            }
27        }
28        Ok(msg)
29    }
30}
31
32/// grpc解析器
33pub struct GrpcMailboxParser;
34
35impl MailboxParser for GrpcMailboxParser {
36    /// 偷懒不定义新错误类型
37    type Error = quick_protobuf::Error;
38
39    fn parse(mut data: Bytes, _zip: Option<String>) -> Result<MailboxReq, Self::Error> {
40        if data.remaining() < 5 {
41            // 数据不够长
42            return Err(quick_protobuf::Error::UnexpectedEndOfBuffer);
43        }
44
45        // 暂不支持解析被压缩的格式, 因为不知道是什么格式
46        let compressed_flag = data.get_u8();
47        if compressed_flag != 0 {
48            return Err(quick_protobuf::Error::Io(std::io::Error::other(
49                "compression not supported",
50            )));
51        }
52
53        let mut r = Reader::from_bytes(data.to_vec());
54        r.read(MailboxReq::from_reader)
55    }
56}
57
58pub async fn send_to_mailbox<P: Pool>(
59    builder: &kmailbox::Builder,
60    pools: Pools<P>,
61    req: Request<Incoming>,
62) -> Result<(Response<VariantBody>, Option<anyhow::Error>), anyhow::Error> {
63    crate::mailbox::send_to_mailbox::<P, GrpcMailboxParser>(builder, pools, req).await
64}