route-util 0.1.0

route utils
Documentation
use crate::mailbox::{MailboxParser, MailboxReq};
use http_pool::body::VariantBody;
use http_pool::net_pool::{Pool, Pools};
use hyper::body::{Buf, Bytes, Incoming};
use hyper::{Request, Response};
use quick_protobuf::{BytesReader, MessageRead, Reader};

impl<'a> MessageRead<'a> for MailboxReq {
    fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> quick_protobuf::Result<Self> {
        let mut msg = MailboxReq::new();
        while !r.is_eof() {
            match r.next_tag(bytes)? {
                1 => msg.service = r.read_string(bytes)?.to_string(),
                2 => msg.key = r.read_string(bytes).ok().map(|s| s.to_string()),
                3 => msg.service_id = r.read_varint32(bytes).ok(),
                4 => msg.payload = r.read_string(bytes)?.to_string(),
                5 => {
                    let (k, v) = r.read_map(
                        bytes,
                        |r, bytes| r.read_string(bytes),
                        |r, bytes| r.read_string(bytes),
                    )?;
                    msg.headers.insert(k.to_string(), v.to_string());
                }
                other => r.read_unknown(bytes, other)?,
            }
        }
        Ok(msg)
    }
}

/// grpc解析器
pub struct GrpcMailboxParser;

impl MailboxParser for GrpcMailboxParser {
    /// 偷懒不定义新错误类型
    type Error = quick_protobuf::Error;

    fn parse(mut data: Bytes, _zip: Option<String>) -> Result<MailboxReq, Self::Error> {
        if data.remaining() < 5 {
            // 数据不够长
            return Err(quick_protobuf::Error::UnexpectedEndOfBuffer);
        }

        // 暂不支持解析被压缩的格式, 因为不知道是什么格式
        let compressed_flag = data.get_u8();
        if compressed_flag != 0 {
            return Err(quick_protobuf::Error::Io(std::io::Error::other(
                "compression not supported",
            )));
        }

        let mut r = Reader::from_bytes(data.to_vec());
        r.read(MailboxReq::from_reader)
    }
}

pub async fn send_to_mailbox<P: Pool>(
    builder: &kmailbox::Builder,
    pools: Pools<P>,
    req: Request<Incoming>,
) -> Result<(Response<VariantBody>, Option<anyhow::Error>), anyhow::Error> {
    crate::mailbox::send_to_mailbox::<P, GrpcMailboxParser>(builder, pools, req).await
}