route_util/
grpc_mailbox.rs1use crate::mailbox::{MailboxParser, MailboxReq};
2use http_pool::body::VariantBody;
3use http_pool::net_pool::{Pool, Pools};
4use hyper::body::{Buf, Bytes, Incoming};
5use hyper::{Request, Response};
6use quick_protobuf::{BytesReader, MessageRead, Reader};
7
8impl<'a> MessageRead<'a> for MailboxReq {
9 fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> quick_protobuf::Result<Self> {
10 let mut msg = MailboxReq::new();
11 while !r.is_eof() {
12 match r.next_tag(bytes)? {
13 1 => msg.service = r.read_string(bytes)?.to_string(),
14 2 => msg.key = r.read_string(bytes).ok().map(|s| s.to_string()),
15 3 => msg.service_id = r.read_varint32(bytes).ok(),
16 4 => msg.payload = r.read_string(bytes)?.to_string(),
17 5 => {
18 let (k, v) = r.read_map(
19 bytes,
20 |r, bytes| r.read_string(bytes),
21 |r, bytes| r.read_string(bytes),
22 )?;
23 msg.headers.insert(k.to_string(), v.to_string());
24 }
25 other => r.read_unknown(bytes, other)?,
26 }
27 }
28 Ok(msg)
29 }
30}
31
32pub struct GrpcMailboxParser;
34
35impl MailboxParser for GrpcMailboxParser {
36 type Error = quick_protobuf::Error;
38
39 fn parse(mut data: Bytes, _zip: Option<String>) -> Result<MailboxReq, Self::Error> {
40 if data.remaining() < 5 {
41 return Err(quick_protobuf::Error::UnexpectedEndOfBuffer);
43 }
44
45 let compressed_flag = data.get_u8();
47 if compressed_flag != 0 {
48 return Err(quick_protobuf::Error::Io(std::io::Error::other(
49 "compression not supported",
50 )));
51 }
52
53 let mut r = Reader::from_bytes(data.to_vec());
54 r.read(MailboxReq::from_reader)
55 }
56}
57
58pub async fn send_to_mailbox<P: Pool>(
59 builder: &kmailbox::Builder,
60 pools: Pools<P>,
61 req: Request<Incoming>,
62) -> Result<(Response<VariantBody>, Option<anyhow::Error>), anyhow::Error> {
63 crate::mailbox::send_to_mailbox::<P, GrpcMailboxParser>(builder, pools, req).await
64}