use rama::{Context, Service};
use tansu_sans_io::{ApiKey, Frame, Header, JoinGroupRequest};
use tracing::instrument;
use crate::{Error, Result, coordinator::group::Coordinator};
#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct JoinGroupService;
impl ApiKey for JoinGroupService {
const KEY: i16 = JoinGroupRequest::KEY;
}
impl<C> Service<C, Frame> for JoinGroupService
where
C: Coordinator,
{
type Response = Frame;
type Error = Error;
#[instrument(skip(ctx, req))]
async fn serve(&self, mut ctx: Context<C>, req: Frame) -> Result<Self::Response, Self::Error> {
let correlation_id = req.correlation_id()?;
let coordinator = ctx.state_mut();
let client_id = req
.client_id()
.map(|client_id| client_id.map(|client_id| client_id.to_owned()))?;
let join_group = JoinGroupRequest::try_from(req.body)?;
coordinator
.join(
client_id.as_deref(),
join_group.group_id.as_str(),
join_group.session_timeout_ms,
join_group.rebalance_timeout_ms,
join_group.member_id.as_str(),
join_group.group_instance_id.as_deref(),
join_group.protocol_type.as_str(),
join_group.protocols.as_deref(),
join_group.reason.as_deref(),
)
.await
.map(|body| Frame {
size: 0,
header: Header::Response { correlation_id },
body,
})
}
}