use nisshi_sans_io::{
ApiKey, ErrorCode, MetadataRequest, MetadataResponse, create_topics_request::CreatableTopic,
};
use rama::{Context, Service};
use tracing::{debug, error, instrument};
use crate::{Error, Result, Storage, TopicId};
#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct MetadataService;
const AUTO_CREATE_NUM_PARTITIONS: i32 = 4;
const AUTO_CREATE_REPLICATION_FACTOR: i16 = 1;
fn is_valid_topic_name(name: &str) -> bool {
!name.is_empty()
&& name != "."
&& name != ".."
&& name.len() <= 249
&& name
.bytes()
.all(|b| b.is_ascii_alphanumeric() || b == b'.' || b == b'_' || b == b'-')
}
impl ApiKey for MetadataService {
const KEY: i16 = MetadataRequest::KEY;
}
impl<G> Service<G, MetadataRequest> for MetadataService
where
G: Storage,
{
type Response = MetadataResponse;
type Error = Error;
#[instrument(skip(ctx, req))]
async fn serve(
&self,
ctx: Context<G>,
req: MetadataRequest,
) -> Result<Self::Response, Self::Error> {
let topics = req
.topics
.map(|topics| topics.iter().map(TopicId::from).collect::<Vec<_>>());
let mut response = ctx
.state()
.metadata(topics.as_deref())
.await
.inspect_err(|err| error!(?err))?;
if req.allow_auto_topic_creation.unwrap_or(true) {
let unknown = response
.topics()
.iter()
.filter(|topic| topic.error_code == i16::from(ErrorCode::UnknownTopicOrPartition))
.filter_map(|topic| topic.name.clone())
.filter(|name| is_valid_topic_name(name))
.collect::<Vec<_>>();
let mut created = false;
for name in unknown {
match ctx
.state()
.create_topic(
CreatableTopic::default()
.name(name)
.num_partitions(AUTO_CREATE_NUM_PARTITIONS)
.replication_factor(AUTO_CREATE_REPLICATION_FACTOR)
.assignments(Some([].into()))
.configs(Some([].into())),
false,
)
.await
{
Ok(topic_id) => {
debug!(?topic_id);
created = true;
}
Err(Error::Api(ErrorCode::TopicAlreadyExists)) => created = true,
Err(err) => error!(?err),
}
}
if created {
response = ctx
.state()
.metadata(topics.as_deref())
.await
.inspect_err(|err| error!(?err))?;
}
}
let brokers = Some(response.brokers().to_owned());
let cluster_id = response.cluster().map(|s| s.into());
let controller_id = response.controller();
let topics = Some(
response
.topics()
.iter()
.map(|topic| {
if topic.error_code == i16::from(ErrorCode::UnknownTopicOrPartition)
&& topic
.name
.as_deref()
.is_some_and(|name| !is_valid_topic_name(name))
{
topic
.clone()
.error_code(ErrorCode::InvalidTopicException.into())
} else {
topic.clone()
}
})
.collect::<Vec<_>>(),
);
let cluster_authorized_operations = Some(-1);
let throttle_time_ms = Some(0);
Ok(MetadataResponse::default()
.throttle_time_ms(throttle_time_ms)
.brokers(brokers)
.cluster_id(cluster_id)
.controller_id(controller_id)
.topics(topics)
.cluster_authorized_operations(cluster_authorized_operations))
}
}