tansu_storage/service/txn/
add_partitions.rs1use rama::{Context, Service};
16use tansu_sans_io::{AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey, ErrorCode};
17use tracing::instrument;
18
19use crate::{Error, Result, Storage, TxnAddPartitionsRequest, TxnAddPartitionsResponse};
20
21#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
23pub struct AddPartitionService;
24
25impl ApiKey for AddPartitionService {
26 const KEY: i16 = AddPartitionsToTxnRequest::KEY;
27}
28
29impl<G> Service<G, AddPartitionsToTxnRequest> for AddPartitionService
30where
31 G: Storage,
32{
33 type Response = AddPartitionsToTxnResponse;
34 type Error = Error;
35
36 #[instrument(skip(ctx, req))]
37 async fn serve(
38 &self,
39 ctx: Context<G>,
40 req: AddPartitionsToTxnRequest,
41 ) -> Result<Self::Response, Self::Error> {
42 let req = TxnAddPartitionsRequest::try_from(req)?;
43
44 match ctx.state().txn_add_partitions(req).await? {
45 TxnAddPartitionsResponse::VersionZeroToThree(results_by_topic_v_3_and_below) => {
46 Ok(AddPartitionsToTxnResponse::default()
47 .throttle_time_ms(0)
48 .error_code(Some(ErrorCode::None.into()))
49 .results_by_transaction(Some([].into()))
50 .results_by_topic_v_3_and_below(Some(results_by_topic_v_3_and_below)))
51 }
52
53 TxnAddPartitionsResponse::VersionFourPlus(results_by_transaction) => {
54 Ok(AddPartitionsToTxnResponse::default()
55 .throttle_time_ms(0)
56 .error_code(Some(ErrorCode::None.into()))
57 .results_by_transaction(Some(results_by_transaction))
58 .results_by_topic_v_3_and_below(Some([].into())))
59 }
60 }
61 }
62}