use rama::{Context, Service};
use tansu_sans_io::{ApiKey, Frame, Header, OffsetCommitRequest};
use tracing::instrument;
use crate::{
Error, Result,
coordinator::group::{Coordinator, OffsetCommit},
};
#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct OffsetCommitService;
impl ApiKey for OffsetCommitService {
const KEY: i16 = OffsetCommitRequest::KEY;
}
impl<C> Service<C, Frame> for OffsetCommitService
where
C: Coordinator,
{
type Response = Frame;
type Error = Error;
#[instrument(skip(ctx, req))]
async fn serve(&self, mut ctx: Context<C>, req: Frame) -> Result<Self::Response, Self::Error> {
let correlation_id = req.correlation_id()?;
let coordinator = ctx.state_mut();
let mut offset_commit = OffsetCommitRequest::try_from(req.body)?;
_ = offset_commit
.retention_time_ms
.take_if(|retention_ms| retention_ms.is_negative());
coordinator
.offset_commit(OffsetCommit {
group_id: offset_commit.group_id.as_str(),
generation_id_or_member_epoch: offset_commit.generation_id_or_member_epoch,
member_id: offset_commit.member_id.as_deref(),
group_instance_id: offset_commit.group_instance_id.as_deref(),
retention_time_ms: offset_commit.retention_time_ms,
topics: offset_commit.topics.as_deref(),
})
.await
.map(|body| Frame {
size: 0,
header: Header::Response { correlation_id },
body,
})
}
}