use rama::{Context, Service};
use tansu_sans_io::{
ApiKey, DeleteTopicsRequest, DeleteTopicsResponse, delete_topics_response::DeletableTopicResult,
};
use tracing::instrument;
use crate::{Error, Result, Storage};
#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct DeleteTopicsService;
impl ApiKey for DeleteTopicsService {
const KEY: i16 = DeleteTopicsRequest::KEY;
}
impl<G> Service<G, DeleteTopicsRequest> for DeleteTopicsService
where
G: Storage,
{
type Response = DeleteTopicsResponse;
type Error = Error;
#[instrument(skip(ctx, req))]
async fn serve(
&self,
ctx: Context<G>,
req: DeleteTopicsRequest,
) -> Result<Self::Response, Self::Error> {
let mut responses = vec![];
for topic in req.topics.unwrap_or_default() {
let error_code = ctx.state().delete_topic(&topic.clone().into()).await?;
responses.push(
DeletableTopicResult::default()
.name(topic.name.clone())
.topic_id(Some(topic.topic_id))
.error_code(i16::from(error_code))
.error_message(Some(error_code.to_string())),
);
}
for topic in req.topic_names.unwrap_or_default() {
let error_code = ctx.state().delete_topic(&topic.clone().into()).await?;
responses.push(
DeletableTopicResult::default()
.name(Some(topic))
.topic_id(None)
.error_code(i16::from(error_code))
.error_message(Some(error_code.to_string())),
);
}
Ok(DeleteTopicsResponse::default()
.throttle_time_ms(Some(0))
.responses(Some(responses)))
}
}