use rama::{Context, Service};
use tansu_sans_io::{ApiKey, MetadataRequest, MetadataResponse};
use tracing::{error, instrument};
use crate::{Error, Result, Storage, TopicId};
#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct MetadataService;
impl ApiKey for MetadataService {
const KEY: i16 = MetadataRequest::KEY;
}
impl<G> Service<G, MetadataRequest> for MetadataService
where
G: Storage,
{
type Response = MetadataResponse;
type Error = Error;
#[instrument(skip(ctx, req))]
async fn serve(
&self,
ctx: Context<G>,
req: MetadataRequest,
) -> Result<Self::Response, Self::Error> {
let topics = req
.topics
.map(|topics| topics.iter().map(TopicId::from).collect::<Vec<_>>());
let response = ctx
.state()
.metadata(topics.as_deref())
.await
.inspect_err(|err| error!(?err))?;
let brokers = Some(response.brokers().to_owned());
let cluster_id = response.cluster().map(|s| s.into());
let controller_id = response.controller();
let topics = Some(response.topics().to_owned());
let cluster_authorized_operations = Some(-1);
let throttle_time_ms = Some(0);
Ok(MetadataResponse::default()
.throttle_time_ms(throttle_time_ms)
.brokers(brokers)
.cluster_id(cluster_id)
.controller_id(controller_id)
.topics(topics)
.cluster_authorized_operations(cluster_authorized_operations))
}
}