tansu_storage/service/txn/
offset_commit.rs1use rama::{Context, Service};
16use tansu_sans_io::{ApiKey, TxnOffsetCommitResponse};
17use tracing::instrument;
18
19use crate::{Error, Result, Storage};
20
21#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
23pub struct OffsetCommitService;
24
25impl ApiKey for OffsetCommitService {
26 const KEY: i16 = tansu_sans_io::TxnOffsetCommitRequest::KEY;
27}
28
29impl<G> Service<G, tansu_sans_io::TxnOffsetCommitRequest> for OffsetCommitService
30where
31 G: Storage,
32{
33 type Response = TxnOffsetCommitResponse;
34 type Error = Error;
35
36 #[instrument(skip(ctx, req))]
37 async fn serve(
38 &self,
39 ctx: Context<G>,
40 req: tansu_sans_io::TxnOffsetCommitRequest,
41 ) -> Result<Self::Response, Self::Error> {
42 let responses = ctx
43 .state()
44 .txn_offset_commit(crate::TxnOffsetCommitRequest {
45 transaction_id: req.transactional_id.to_owned(),
46 group_id: req.group_id.to_owned(),
47 producer_id: req.producer_id,
48 producer_epoch: req.producer_epoch,
49 generation_id: req.generation_id,
50 member_id: req.member_id,
51 group_instance_id: req.group_instance_id,
52 topics: req.topics.unwrap_or_default(),
53 })
54 .await?;
55
56 Ok(TxnOffsetCommitResponse::default()
57 .throttle_time_ms(0)
58 .topics(Some(responses)))
59 }
60}