use crate::mailbox::{MailboxParser, MailboxReq};
use http_pool::body::VariantBody;
use http_pool::net_pool::{Pool, Pools};
use hyper::body::{Buf, Bytes, Incoming};
use hyper::{Request, Response};
use quick_protobuf::{BytesReader, MessageRead, Reader};
impl<'a> MessageRead<'a> for MailboxReq {
fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> quick_protobuf::Result<Self> {
let mut msg = MailboxReq::new();
while !r.is_eof() {
match r.next_tag(bytes)? {
1 => msg.service = r.read_string(bytes)?.to_string(),
2 => msg.key = r.read_string(bytes).ok().map(|s| s.to_string()),
3 => msg.service_id = r.read_varint32(bytes).ok(),
4 => msg.payload = r.read_string(bytes)?.to_string(),
5 => {
let (k, v) = r.read_map(
bytes,
|r, bytes| r.read_string(bytes),
|r, bytes| r.read_string(bytes),
)?;
msg.headers.insert(k.to_string(), v.to_string());
}
other => r.read_unknown(bytes, other)?,
}
}
Ok(msg)
}
}
pub struct GrpcMailboxParser;
impl MailboxParser for GrpcMailboxParser {
type Error = quick_protobuf::Error;
fn parse(mut data: Bytes, _zip: Option<String>) -> Result<MailboxReq, Self::Error> {
if data.remaining() < 5 {
return Err(quick_protobuf::Error::UnexpectedEndOfBuffer);
}
let compressed_flag = data.get_u8();
if compressed_flag != 0 {
return Err(quick_protobuf::Error::Io(std::io::Error::other(
"compression not supported",
)));
}
let mut r = Reader::from_bytes(data.to_vec());
r.read(MailboxReq::from_reader)
}
}
pub async fn send_to_mailbox<P: Pool>(
builder: &kmailbox::Builder,
pools: Pools<P>,
req: Request<Incoming>,
) -> Result<(Response<VariantBody>, Option<anyhow::Error>), anyhow::Error> {
crate::mailbox::send_to_mailbox::<P, GrpcMailboxParser>(builder, pools, req).await
}