rocketmq_remoting/protocol/header/
check_transaction_state_request_header.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use cheetah_string::CheetahString;
18use rocketmq_macros::RequestHeaderCodecV2;
19use serde::Deserialize;
20use serde::Serialize;
21
22use crate::rpc::rpc_request_header::RpcRequestHeader;
23
24#[derive(Serialize, Deserialize, Debug, Default, RequestHeaderCodecV2)]
25#[serde(rename_all = "camelCase")]
26pub struct CheckTransactionStateRequestHeader {
27    pub topic: Option<CheetahString>,
28    #[required]
29    pub tran_state_table_offset: i64,
30    #[required]
31    pub commit_log_offset: i64,
32    pub msg_id: Option<CheetahString>,
33    pub transaction_id: Option<CheetahString>,
34    pub offset_msg_id: Option<CheetahString>,
35    #[serde(flatten)]
36    pub rpc_request_header: Option<RpcRequestHeader>,
37}
38
39/*impl CheckTransactionStateRequestHeader {
40    pub const TOPIC: &'static str = "topic";
41    pub const TRAN_STATE_TABLE_OFFSET: &'static str = "tranStateTableOffset";
42    pub const COMMIT_LOG_OFFSET: &'static str = "commitLogOffset";
43    pub const MSG_ID: &'static str = "msgId";
44    pub const TRANSACTION_ID: &'static str = "transactionId";
45    pub const OFFSET_MSG_ID: &'static str = "offsetMsgId";
46}
47
48impl CommandCustomHeader for CheckTransactionStateRequestHeader {
49    fn to_map(&self) -> Option<std::collections::HashMap<CheetahString, CheetahString>> {
50        let mut map = std::collections::HashMap::new();
51        if let Some(value) = self.topic.as_ref() {
52            map.insert(CheetahString::from_static_str(Self::TOPIC), value.clone());
53        }
54        map.insert(
55            CheetahString::from_static_str(Self::TRAN_STATE_TABLE_OFFSET),
56            CheetahString::from_string(self.tran_state_table_offset.to_string()),
57        );
58
59        map.insert(
60            CheetahString::from_static_str(Self::COMMIT_LOG_OFFSET),
61            CheetahString::from_string(self.commit_log_offset.to_string()),
62        );
63        if let Some(value) = self.msg_id.as_ref() {
64            map.insert(CheetahString::from_static_str(Self::MSG_ID), value.clone());
65        }
66        if let Some(value) = self.transaction_id.as_ref() {
67            map.insert(
68                CheetahString::from_static_str(Self::TRANSACTION_ID),
69                value.clone(),
70            );
71        }
72        if let Some(value) = self.offset_msg_id.as_ref() {
73            map.insert(
74                CheetahString::from_static_str(Self::OFFSET_MSG_ID),
75                value.clone(),
76            );
77        }
78        if let Some(value) = self.rpc_request_header.as_ref() {
79            if let Some(value) = value.to_map() {
80                map.extend(value);
81            }
82        }
83        Some(map)
84    }
85}
86
87impl FromMap for CheckTransactionStateRequestHeader {
88    type Error = rocketmq_error::RocketMQError;
89
90    type Target = Self;
91
92    fn from(
93        map: &std::collections::HashMap<CheetahString, CheetahString>,
94    ) -> Result<Self::Target, Self::Error> {
95        Ok(CheckTransactionStateRequestHeader {
96            topic: map
97                .get(&CheetahString::from_static_str(Self::TOPIC))
98                .cloned(),
99            tran_state_table_offset: map
100                .get(&CheetahString::from_static_str(
101                    Self::TRAN_STATE_TABLE_OFFSET,
102                ))
103                .and_then(|v| v.parse().ok())
104                .unwrap_or_default(),
105            commit_log_offset: map
106                .get(&CheetahString::from_static_str(Self::COMMIT_LOG_OFFSET))
107                .and_then(|v| v.parse().ok())
108                .unwrap_or_default(),
109            msg_id: map
110                .get(&CheetahString::from_static_str(Self::MSG_ID))
111                .cloned(),
112            transaction_id: map
113                .get(&CheetahString::from_static_str(Self::TRANSACTION_ID))
114                .cloned(),
115            offset_msg_id: map
116                .get(&CheetahString::from_static_str(Self::OFFSET_MSG_ID))
117                .cloned(),
118            rpc_request_header: Some(<RpcRequestHeader as FromMap>::from(map)?),
119        })
120    }
121}
122*/
123
124#[cfg(test)]
125mod tests {
126    use std::collections::HashMap;
127
128    use cheetah_string::CheetahString;
129
130    use super::*;
131    use crate::protocol::command_custom_header::CommandCustomHeader;
132    use crate::protocol::command_custom_header::FromMap;
133
134    #[test]
135    fn check_transaction_state_request_header_serializes_correctly() {
136        let header = CheckTransactionStateRequestHeader {
137            topic: Some(CheetahString::from_static_str("test_topic")),
138            tran_state_table_offset: 123,
139            commit_log_offset: 456,
140            msg_id: Some(CheetahString::from_static_str("test_msg_id")),
141            transaction_id: Some(CheetahString::from_static_str("test_transaction_id")),
142            offset_msg_id: Some(CheetahString::from_static_str("test_offset_msg_id")),
143            rpc_request_header: None,
144        };
145        let map = header.to_map().unwrap();
146        assert_eq!(
147            map.get(&CheetahString::from_static_str("topic")).unwrap(),
148            "test_topic"
149        );
150        assert_eq!(
151            map.get(&CheetahString::from_static_str("tranStateTableOffset"))
152                .unwrap(),
153            "123"
154        );
155        assert_eq!(
156            map.get(&CheetahString::from_static_str("commitLogOffset"))
157                .unwrap(),
158            "456"
159        );
160        assert_eq!(
161            map.get(&CheetahString::from_static_str("msgId")).unwrap(),
162            "test_msg_id"
163        );
164        assert_eq!(
165            map.get(&CheetahString::from_static_str("transactionId"))
166                .unwrap(),
167            "test_transaction_id"
168        );
169        assert_eq!(
170            map.get(&CheetahString::from_static_str("offsetMsgId"))
171                .unwrap(),
172            "test_offset_msg_id"
173        );
174    }
175
176    #[test]
177    fn check_transaction_state_request_header_deserializes_correctly() {
178        let mut map = HashMap::new();
179        map.insert(
180            CheetahString::from_static_str("topic"),
181            CheetahString::from_static_str("test_topic"),
182        );
183        map.insert(
184            CheetahString::from_static_str("tranStateTableOffset"),
185            CheetahString::from_static_str("123"),
186        );
187        map.insert(
188            CheetahString::from_static_str("commitLogOffset"),
189            CheetahString::from_static_str("456"),
190        );
191        map.insert(
192            CheetahString::from_static_str("msgId"),
193            CheetahString::from_static_str("test_msg_id"),
194        );
195        map.insert(
196            CheetahString::from_static_str("transactionId"),
197            CheetahString::from_static_str("test_transaction_id"),
198        );
199        map.insert(
200            CheetahString::from_static_str("offsetMsgId"),
201            CheetahString::from_static_str("test_offset_msg_id"),
202        );
203
204        let header = <CheckTransactionStateRequestHeader as FromMap>::from(&map).unwrap();
205        assert_eq!(header.topic.unwrap(), "test_topic");
206        assert_eq!(header.tran_state_table_offset, 123);
207        assert_eq!(header.commit_log_offset, 456);
208        assert_eq!(header.msg_id.unwrap(), "test_msg_id");
209        assert_eq!(header.transaction_id.unwrap(), "test_transaction_id");
210        assert_eq!(header.offset_msg_id.unwrap(), "test_offset_msg_id");
211    }
212
213    #[test]
214    fn check_transaction_state_request_header_handles_missing_optional_fields() {
215        let mut map = HashMap::new();
216        map.insert(
217            CheetahString::from_static_str("tranStateTableOffset"),
218            CheetahString::from_static_str("123"),
219        );
220        map.insert(
221            CheetahString::from_static_str("commitLogOffset"),
222            CheetahString::from_static_str("456"),
223        );
224
225        let header = <CheckTransactionStateRequestHeader as FromMap>::from(&map).unwrap();
226        assert!(header.topic.is_none());
227        assert_eq!(header.tran_state_table_offset, 123);
228        assert_eq!(header.commit_log_offset, 456);
229        assert!(header.msg_id.is_none());
230        assert!(header.transaction_id.is_none());
231        assert!(header.offset_msg_id.is_none());
232    }
233
234    #[test]
235    fn check_transaction_state_request_header_handles_invalid_data() {
236        let mut map = HashMap::new();
237        map.insert(
238            CheetahString::from_static_str("tranStateTableOffset"),
239            CheetahString::from_static_str("invalid"),
240        );
241        map.insert(
242            CheetahString::from_static_str("commitLogOffset"),
243            CheetahString::from_static_str("invalid"),
244        );
245
246        let result = <CheckTransactionStateRequestHeader as FromMap>::from(&map);
247        assert!(result.is_err());
248    }
249}