use rama::{Context, Service};
use tansu_sans_io::{
ApiKey, CreateTopicsRequest, CreateTopicsResponse, ErrorCode, NULL_TOPIC_ID,
create_topics_response::CreatableTopicResult,
};
use tracing::{debug, instrument};
use crate::{Error, Result, Storage};
#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct CreateTopicsService;
impl ApiKey for CreateTopicsService {
const KEY: i16 = CreateTopicsRequest::KEY;
}
impl<G> Service<G, CreateTopicsRequest> for CreateTopicsService
where
G: Storage,
{
type Response = CreateTopicsResponse;
type Error = Error;
#[instrument(skip(ctx, req))]
async fn serve(
&self,
ctx: Context<G>,
req: CreateTopicsRequest,
) -> Result<Self::Response, Self::Error> {
let mut topics = vec![];
for mut topic in req.topics.unwrap_or_default() {
let name = topic.name.clone();
let num_partitions = Some(match topic.num_partitions {
-1 => {
topic.num_partitions = 3;
topic.num_partitions
}
otherwise => otherwise,
});
let replication_factor = Some(match topic.replication_factor {
-1 => {
topic.replication_factor = 1;
topic.replication_factor
}
otherwise => otherwise,
});
match ctx
.state()
.create_topic(topic, req.validate_only.unwrap_or_default())
.await
{
Ok(topic_id) => {
debug!(?topic_id);
topics.push(
CreatableTopicResult::default()
.name(name)
.topic_id(Some(topic_id.into_bytes()))
.error_code(ErrorCode::None.into())
.error_message(None)
.topic_config_error_code(Some(ErrorCode::None.into()))
.num_partitions(num_partitions)
.replication_factor(replication_factor)
.configs(Some([].into())),
);
}
Err(Error::Api(error_code)) => topics.push(
CreatableTopicResult::default()
.name(name)
.topic_id(Some(NULL_TOPIC_ID))
.error_code(error_code.into())
.error_message(Some(error_code.to_string()))
.topic_config_error_code(None)
.num_partitions(num_partitions)
.replication_factor(replication_factor)
.configs(Some([].into())),
),
Err(error) => {
debug!(?error);
topics.push(
CreatableTopicResult::default()
.name(name)
.topic_id(Some(NULL_TOPIC_ID))
.error_code(ErrorCode::UnknownServerError.into())
.error_message(None)
.topic_config_error_code(None)
.num_partitions(None)
.replication_factor(None)
.configs(Some([].into())),
)
}
}
}
Ok(CreateTopicsResponse::default()
.topics(Some(topics))
.throttle_time_ms(Some(0)))
}
}