use crate::http1_backend::get_pool_and_meta;
use anyhow::anyhow;
use detcd::ServiceState;
use http_body_util::BodyExt;
use http_pool::body::VariantBody;
use http_pool::net_pool::{Pool, Pools};
use hyper::body::{Bytes, Incoming};
use hyper::{Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum ErrorCode {
Ok = 0,
NoTarget = 1,
EmptyPayload = 2,
EmptyKey = 3,
InvalidServiceId = 4,
ErrDecode = 5,
ErrEncode = 6,
ErrSystem = 7,
TargetNotReady = 8,
}
#[derive(Serialize, Deserialize)]
pub struct MailboxReq {
pub service: String,
pub key: Option<String>,
pub service_id: Option<u32>,
pub payload: String,
pub headers: HashMap<String, String>,
}
impl MailboxReq {
pub fn new() -> Self {
Self {
service: String::new(),
key: None,
service_id: None,
payload: String::new(),
headers: HashMap::new(),
}
}
}
#[derive(Serialize, Deserialize)]
pub struct MailboxRsp {
pub code: i32,
pub msg: Option<String>,
}
impl MailboxRsp {
pub fn new() -> Self {
Self {
code: ErrorCode::Ok as i32,
msg: None,
}
}
}
pub trait MailboxParser: Sized {
type Error: std::error::Error + Send + Sync + 'static;
fn parse(data: Bytes, zip: Option<String>) -> Result<MailboxReq, Self::Error>;
}
pub async fn write_mail(
builder: &kmailbox::Builder,
req: &MailboxReq,
) -> Result<(), kmailbox::KafkaError> {
let writer = builder.writer()?;
let mut mail = kmailbox::SendMail::new()
.key(req.key.as_ref())
.payload(Some(&req.payload))
.partition(req.service_id.map(|id| id as i32));
for kv in req.headers.iter() {
mail = mail.add_header(kv.0, Some(kv.1));
}
writer.write(&req.service, mail).await
}
pub async fn send_to_mailbox<P: Pool, E: MailboxParser>(
builder: &kmailbox::Builder,
pools: Pools<P>,
req: Request<Incoming>,
) -> Result<(Response<VariantBody>, Option<anyhow::Error>), anyhow::Error> {
let body_data = req.collect().await.map_err(|e| anyhow::Error::new(e))?;
let mut mb_rsp = MailboxRsp::new();
let mut err: Option<anyhow::Error> = None;
let process = async || {
let mut mb_req = match E::parse(body_data.to_bytes(), None) {
Ok(r) => r,
Err(e) => {
err = Some(anyhow::Error::new(e));
mb_rsp.code = ErrorCode::ErrDecode as i32;
return;
}
};
if mb_req.payload.is_empty() {
mb_rsp.code = ErrorCode::EmptyPayload as i32;
return;
}
if mb_req.service.is_empty() {
mb_rsp.code = ErrorCode::NoTarget as i32;
return;
}
let (pool, meta) = match get_pool_and_meta(&pools, &mb_req.service) {
Ok((p, m)) => (p, m),
Err(e) => {
err = Some(e);
mb_rsp.code = ErrorCode::NoTarget as i32;
return;
}
};
let partition = match meta.state {
Some(ServiceState::Fixed) => {
if mb_req.service_id.is_none() || mb_req.service_id.unwrap() >= meta.instances {
mb_rsp.code = ErrorCode::InvalidServiceId as i32;
return;
}
mb_req.service_id
}
Some(ServiceState::Stateful) => {
let key = mb_req.key.as_ref().map_or("", |k| k);
if key.is_empty() {
mb_rsp.code = ErrorCode::EmptyKey as i32;
return;
}
match pool.get_backend(key) {
Some(bs) => bs.id(),
None => {
mb_rsp.code = ErrorCode::TargetNotReady as i32;
return;
}
}
}
_ => None,
};
mb_req.service_id = partition;
match write_mail(builder, &mb_req).await {
Err(e) => {
err = Some(anyhow!("{:?}", e));
mb_rsp.code = ErrorCode::ErrSystem as i32;
return;
}
_ => {}
}
};
process().await;
let resp = serde_json::to_string(&mb_rsp).unwrap();
Ok((
super::make_response::response(
StatusCode::OK,
Some(resp),
super::make_response::JSON_CONTENT_TYPE,
),
err,
))
}