1use crate::http1_backend::get_pool_and_meta;
2use anyhow::anyhow;
3use detcd::ServiceState;
4use http_body_util::BodyExt;
5use http_pool::body::VariantBody;
6use http_pool::net_pool::{Pool, Pools};
7use hyper::body::{Bytes, Incoming};
8use hyper::{Request, Response, StatusCode};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11
12#[derive(Debug, Copy, Clone, Eq, PartialEq)]
13pub enum ErrorCode {
14 Ok = 0,
16 NoTarget = 1,
18 EmptyPayload = 2,
20 EmptyKey = 3,
22 InvalidServiceId = 4,
24 ErrDecode = 5,
26 ErrEncode = 6,
28 ErrSystem = 7,
30 TargetNotReady = 8,
32}
33
34#[derive(Serialize, Deserialize)]
36pub struct MailboxReq {
37 pub service: String,
39 pub key: Option<String>,
41 pub service_id: Option<u32>,
43 pub payload: String,
45 pub headers: HashMap<String, String>,
47}
48
49impl MailboxReq {
50 pub fn new() -> Self {
51 Self {
52 service: String::new(),
53 key: None,
54 service_id: None,
55 payload: String::new(),
56 headers: HashMap::new(),
57 }
58 }
59}
60
61#[derive(Serialize, Deserialize)]
63pub struct MailboxRsp {
64 pub code: i32,
66 pub msg: Option<String>,
68}
69
70impl MailboxRsp {
71 pub fn new() -> Self {
72 Self {
73 code: ErrorCode::Ok as i32,
74 msg: None,
75 }
76 }
77}
78
79pub trait MailboxParser: Sized {
81 type Error: std::error::Error + Send + Sync + 'static;
82 fn parse(data: Bytes, zip: Option<String>) -> Result<MailboxReq, Self::Error>;
83}
84
85pub async fn write_mail(
87 builder: &kmailbox::Builder,
88 req: &MailboxReq,
89) -> Result<(), kmailbox::KafkaError> {
90 let writer = builder.writer()?;
91
92 let mut mail = kmailbox::SendMail::new()
94 .key(req.key.as_ref())
95 .payload(Some(&req.payload))
96 .partition(req.service_id.map(|id| id as i32));
97
98 for kv in req.headers.iter() {
100 mail = mail.add_header(kv.0, Some(kv.1));
101 }
102
103 writer.write(&req.service, mail).await
104}
105
106pub async fn send_to_mailbox<P: Pool, E: MailboxParser>(
107 builder: &kmailbox::Builder,
108 pools: Pools<P>,
109 req: Request<Incoming>,
110) -> Result<(Response<VariantBody>, Option<anyhow::Error>), anyhow::Error> {
111 let body_data = req.collect().await.map_err(|e| anyhow::Error::new(e))?;
113
114 let mut mb_rsp = MailboxRsp::new();
116 let mut err: Option<anyhow::Error> = None;
117
118 let process = async || {
119 let mut mb_req = match E::parse(body_data.to_bytes(), None) {
121 Ok(r) => r,
122 Err(e) => {
123 err = Some(anyhow::Error::new(e));
124 mb_rsp.code = ErrorCode::ErrDecode as i32;
125 return;
126 }
127 };
128
129 if mb_req.payload.is_empty() {
131 mb_rsp.code = ErrorCode::EmptyPayload as i32;
132 return;
133 }
134 if mb_req.service.is_empty() {
135 mb_rsp.code = ErrorCode::NoTarget as i32;
136 return;
137 }
138
139 let (pool, meta) = match get_pool_and_meta(&pools, &mb_req.service) {
141 Ok((p, m)) => (p, m),
142 Err(e) => {
143 err = Some(e);
144 mb_rsp.code = ErrorCode::NoTarget as i32;
145 return;
146 }
147 };
148
149 let partition = match meta.state {
151 Some(ServiceState::Fixed) => {
152 if mb_req.service_id.is_none() || mb_req.service_id.unwrap() >= meta.instances {
154 mb_rsp.code = ErrorCode::InvalidServiceId as i32;
155 return;
156 }
157 mb_req.service_id
158 }
159 Some(ServiceState::Stateful) => {
160 let key = mb_req.key.as_ref().map_or("", |k| k);
162 if key.is_empty() {
163 mb_rsp.code = ErrorCode::EmptyKey as i32;
164 return;
165 }
166 match pool.get_backend(key) {
168 Some(bs) => bs.id(),
169 None => {
170 mb_rsp.code = ErrorCode::TargetNotReady as i32;
171 return;
172 }
173 }
174 }
175 _ => None,
176 };
177
178 mb_req.service_id = partition;
180 match write_mail(builder, &mb_req).await {
181 Err(e) => {
182 err = Some(anyhow!("{:?}", e));
183 mb_rsp.code = ErrorCode::ErrSystem as i32;
184 return;
185 }
186 _ => {}
187 }
188 };
189
190 process().await;
191
192 let resp = serde_json::to_string(&mb_rsp).unwrap();
193 Ok((
194 super::make_response::response(
195 StatusCode::OK,
196 Some(resp),
197 super::make_response::JSON_CONTENT_TYPE,
198 ),
199 err,
200 ))
201}