use rama::{Context, Service};
use tansu_sans_io::{ApiKey, TxnOffsetCommitResponse};
use tracing::instrument;
use crate::{Error, Result, Storage};
#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct OffsetCommitService;
impl ApiKey for OffsetCommitService {
const KEY: i16 = tansu_sans_io::TxnOffsetCommitRequest::KEY;
}
impl<G> Service<G, tansu_sans_io::TxnOffsetCommitRequest> for OffsetCommitService
where
G: Storage,
{
type Response = TxnOffsetCommitResponse;
type Error = Error;
#[instrument(skip(ctx, req))]
async fn serve(
&self,
ctx: Context<G>,
req: tansu_sans_io::TxnOffsetCommitRequest,
) -> Result<Self::Response, Self::Error> {
let responses = ctx
.state()
.txn_offset_commit(crate::TxnOffsetCommitRequest {
transaction_id: req.transactional_id.to_owned(),
group_id: req.group_id.to_owned(),
producer_id: req.producer_id,
producer_epoch: req.producer_epoch,
generation_id: req.generation_id,
member_id: req.member_id,
group_instance_id: req.group_instance_id,
topics: req.topics.unwrap_or_default(),
})
.await?;
Ok(TxnOffsetCommitResponse::default()
.throttle_time_ms(0)
.topics(Some(responses)))
}
}