use rama::{Context, Service};
use tansu_sans_io::{AddOffsetsToTxnRequest, AddOffsetsToTxnResponse, ApiKey};
use tracing::instrument;
use crate::{Error, Result, Storage};
#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct AddOffsetsService;
impl ApiKey for AddOffsetsService {
const KEY: i16 = AddOffsetsToTxnRequest::KEY;
}
impl<G> Service<G, AddOffsetsToTxnRequest> for AddOffsetsService
where
G: Storage,
{
type Response = AddOffsetsToTxnResponse;
type Error = Error;
#[instrument(skip(ctx, req))]
async fn serve(
&self,
ctx: Context<G>,
req: AddOffsetsToTxnRequest,
) -> Result<Self::Response, Self::Error> {
ctx.state()
.txn_add_offsets(
req.transactional_id.as_str(),
req.producer_id,
req.producer_epoch,
req.group_id.as_str(),
)
.await
.map(|error_code| {
AddOffsetsToTxnResponse::default()
.throttle_time_ms(0)
.error_code(error_code.into())
})
}
}