use super::mailbox::{ErrorCode, WriteMailRequest, WriteMailResponse};
use crate::util::get_pool_and_meta;
use anyhow::anyhow;
use detcd::ServiceState;
use http_pool::net_pool::{Pool, Pools};
pub(super) async fn write_to_mailbox<P: Pool>(
builder: &kmailbox::Builder,
ns: Option<&String>,
pools: Pools<P>,
mut request: WriteMailRequest,
) -> (WriteMailResponse, Option<anyhow::Error>) {
let mut response = WriteMailResponse::new();
let mut err = None;
loop {
if request.payload.is_empty() {
response.set_code(ErrorCode::EmptyPayload);
break;
}
if request.service.is_empty() {
response.set_code(ErrorCode::NoTarget);
break;
}
let (pool, meta) = match get_pool_and_meta(&pools, &request.service) {
Ok((p, m)) => (p, m),
Err(e) => {
err = Some(e);
response.set_code(ErrorCode::NoTarget);
break;
}
};
let partition = match meta.state {
Some(ServiceState::Fixed) => {
if request.service_id.is_none() || request.service_id.unwrap() >= meta.instances {
response.set_code(ErrorCode::InvalidServiceId);
break;
}
request.service_id
}
Some(ServiceState::Stateful) => {
let key = request.key.as_ref().map_or("", |k| k);
if key.is_empty() {
response.set_code(ErrorCode::EmptyKey);
break;
}
match pool.get_backend(key) {
Some(bs) => bs.id(),
None => {
response.set_code(ErrorCode::TargetNotReady);
break;
}
}
}
_ => None,
};
request.service_id = partition;
match write_mail(builder, ns, &request).await {
Err(e) => {
err = Some(anyhow!("{:?}", e));
response.set_code(ErrorCode::ErrSystem);
break;
}
_ => {}
}
break;
}
(response, err)
}
async fn write_mail(
builder: &kmailbox::Builder,
ns: Option<&String>,
request: &WriteMailRequest,
) -> Result<(), kmailbox::KafkaError> {
let writer = builder.writer()?;
let mut sm = kmailbox::SendMail::new()
.key(request.key.as_ref())
.payload(Some(&request.payload))
.partition(request.service_id.map(|id| id as i32));
for kv in request.headers.iter() {
sm = sm.add_header(kv.0, Some(kv.1));
}
let name = ns.map_or(request.service.clone(), |ns| {
if ns.is_empty() {
request.service.clone()
} else {
ns.clone() + "." + &request.service
}
});
writer.write(&name, sm).await
}