tansu_storage/service/txn/
add_offsets.rs1use rama::{Context, Service};
16use tansu_sans_io::{AddOffsetsToTxnRequest, AddOffsetsToTxnResponse, ApiKey};
17use tracing::instrument;
18
19use crate::{Error, Result, Storage};
20
21#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
23pub struct AddOffsetsService;
24
25impl ApiKey for AddOffsetsService {
26 const KEY: i16 = AddOffsetsToTxnRequest::KEY;
27}
28
29impl<G> Service<G, AddOffsetsToTxnRequest> for AddOffsetsService
30where
31 G: Storage,
32{
33 type Response = AddOffsetsToTxnResponse;
34 type Error = Error;
35
36 #[instrument(skip(ctx, req))]
37 async fn serve(
38 &self,
39 ctx: Context<G>,
40 req: AddOffsetsToTxnRequest,
41 ) -> Result<Self::Response, Self::Error> {
42 ctx.state()
43 .txn_add_offsets(
44 req.transactional_id.as_str(),
45 req.producer_id,
46 req.producer_epoch,
47 req.group_id.as_str(),
48 )
49 .await
50 .map(|error_code| {
51 AddOffsetsToTxnResponse::default()
52 .throttle_time_ms(0)
53 .error_code(error_code.into())
54 })
55 }
56}