rocketmq_remoting/protocol/header/check_transaction_state_request_header.rs
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use cheetah_string::CheetahString;
18use rocketmq_macros::RequestHeaderCodecV2;
19use serde::Deserialize;
20use serde::Serialize;
21
22use crate::rpc::rpc_request_header::RpcRequestHeader;
23
24#[derive(Serialize, Deserialize, Debug, Default, RequestHeaderCodecV2)]
25#[serde(rename_all = "camelCase")]
26pub struct CheckTransactionStateRequestHeader {
27 pub topic: Option<CheetahString>,
28 #[required]
29 pub tran_state_table_offset: i64,
30 #[required]
31 pub commit_log_offset: i64,
32 pub msg_id: Option<CheetahString>,
33 pub transaction_id: Option<CheetahString>,
34 pub offset_msg_id: Option<CheetahString>,
35 #[serde(flatten)]
36 pub rpc_request_header: Option<RpcRequestHeader>,
37}
38
39/*impl CheckTransactionStateRequestHeader {
40 pub const TOPIC: &'static str = "topic";
41 pub const TRAN_STATE_TABLE_OFFSET: &'static str = "tranStateTableOffset";
42 pub const COMMIT_LOG_OFFSET: &'static str = "commitLogOffset";
43 pub const MSG_ID: &'static str = "msgId";
44 pub const TRANSACTION_ID: &'static str = "transactionId";
45 pub const OFFSET_MSG_ID: &'static str = "offsetMsgId";
46}
47
48impl CommandCustomHeader for CheckTransactionStateRequestHeader {
49 fn to_map(&self) -> Option<std::collections::HashMap<CheetahString, CheetahString>> {
50 let mut map = std::collections::HashMap::new();
51 if let Some(value) = self.topic.as_ref() {
52 map.insert(CheetahString::from_static_str(Self::TOPIC), value.clone());
53 }
54 map.insert(
55 CheetahString::from_static_str(Self::TRAN_STATE_TABLE_OFFSET),
56 CheetahString::from_string(self.tran_state_table_offset.to_string()),
57 );
58
59 map.insert(
60 CheetahString::from_static_str(Self::COMMIT_LOG_OFFSET),
61 CheetahString::from_string(self.commit_log_offset.to_string()),
62 );
63 if let Some(value) = self.msg_id.as_ref() {
64 map.insert(CheetahString::from_static_str(Self::MSG_ID), value.clone());
65 }
66 if let Some(value) = self.transaction_id.as_ref() {
67 map.insert(
68 CheetahString::from_static_str(Self::TRANSACTION_ID),
69 value.clone(),
70 );
71 }
72 if let Some(value) = self.offset_msg_id.as_ref() {
73 map.insert(
74 CheetahString::from_static_str(Self::OFFSET_MSG_ID),
75 value.clone(),
76 );
77 }
78 if let Some(value) = self.rpc_request_header.as_ref() {
79 if let Some(value) = value.to_map() {
80 map.extend(value);
81 }
82 }
83 Some(map)
84 }
85}
86
87impl FromMap for CheckTransactionStateRequestHeader {
88 type Error = rocketmq_error::RocketMQError;
89
90 type Target = Self;
91
92 fn from(
93 map: &std::collections::HashMap<CheetahString, CheetahString>,
94 ) -> Result<Self::Target, Self::Error> {
95 Ok(CheckTransactionStateRequestHeader {
96 topic: map
97 .get(&CheetahString::from_static_str(Self::TOPIC))
98 .cloned(),
99 tran_state_table_offset: map
100 .get(&CheetahString::from_static_str(
101 Self::TRAN_STATE_TABLE_OFFSET,
102 ))
103 .and_then(|v| v.parse().ok())
104 .unwrap_or_default(),
105 commit_log_offset: map
106 .get(&CheetahString::from_static_str(Self::COMMIT_LOG_OFFSET))
107 .and_then(|v| v.parse().ok())
108 .unwrap_or_default(),
109 msg_id: map
110 .get(&CheetahString::from_static_str(Self::MSG_ID))
111 .cloned(),
112 transaction_id: map
113 .get(&CheetahString::from_static_str(Self::TRANSACTION_ID))
114 .cloned(),
115 offset_msg_id: map
116 .get(&CheetahString::from_static_str(Self::OFFSET_MSG_ID))
117 .cloned(),
118 rpc_request_header: Some(<RpcRequestHeader as FromMap>::from(map)?),
119 })
120 }
121}
122*/
123
124#[cfg(test)]
125mod tests {
126 use std::collections::HashMap;
127
128 use cheetah_string::CheetahString;
129
130 use super::*;
131 use crate::protocol::command_custom_header::CommandCustomHeader;
132 use crate::protocol::command_custom_header::FromMap;
133
134 #[test]
135 fn check_transaction_state_request_header_serializes_correctly() {
136 let header = CheckTransactionStateRequestHeader {
137 topic: Some(CheetahString::from_static_str("test_topic")),
138 tran_state_table_offset: 123,
139 commit_log_offset: 456,
140 msg_id: Some(CheetahString::from_static_str("test_msg_id")),
141 transaction_id: Some(CheetahString::from_static_str("test_transaction_id")),
142 offset_msg_id: Some(CheetahString::from_static_str("test_offset_msg_id")),
143 rpc_request_header: None,
144 };
145 let map = header.to_map().unwrap();
146 assert_eq!(
147 map.get(&CheetahString::from_static_str("topic")).unwrap(),
148 "test_topic"
149 );
150 assert_eq!(
151 map.get(&CheetahString::from_static_str("tranStateTableOffset"))
152 .unwrap(),
153 "123"
154 );
155 assert_eq!(
156 map.get(&CheetahString::from_static_str("commitLogOffset"))
157 .unwrap(),
158 "456"
159 );
160 assert_eq!(
161 map.get(&CheetahString::from_static_str("msgId")).unwrap(),
162 "test_msg_id"
163 );
164 assert_eq!(
165 map.get(&CheetahString::from_static_str("transactionId"))
166 .unwrap(),
167 "test_transaction_id"
168 );
169 assert_eq!(
170 map.get(&CheetahString::from_static_str("offsetMsgId"))
171 .unwrap(),
172 "test_offset_msg_id"
173 );
174 }
175
176 #[test]
177 fn check_transaction_state_request_header_deserializes_correctly() {
178 let mut map = HashMap::new();
179 map.insert(
180 CheetahString::from_static_str("topic"),
181 CheetahString::from_static_str("test_topic"),
182 );
183 map.insert(
184 CheetahString::from_static_str("tranStateTableOffset"),
185 CheetahString::from_static_str("123"),
186 );
187 map.insert(
188 CheetahString::from_static_str("commitLogOffset"),
189 CheetahString::from_static_str("456"),
190 );
191 map.insert(
192 CheetahString::from_static_str("msgId"),
193 CheetahString::from_static_str("test_msg_id"),
194 );
195 map.insert(
196 CheetahString::from_static_str("transactionId"),
197 CheetahString::from_static_str("test_transaction_id"),
198 );
199 map.insert(
200 CheetahString::from_static_str("offsetMsgId"),
201 CheetahString::from_static_str("test_offset_msg_id"),
202 );
203
204 let header = <CheckTransactionStateRequestHeader as FromMap>::from(&map).unwrap();
205 assert_eq!(header.topic.unwrap(), "test_topic");
206 assert_eq!(header.tran_state_table_offset, 123);
207 assert_eq!(header.commit_log_offset, 456);
208 assert_eq!(header.msg_id.unwrap(), "test_msg_id");
209 assert_eq!(header.transaction_id.unwrap(), "test_transaction_id");
210 assert_eq!(header.offset_msg_id.unwrap(), "test_offset_msg_id");
211 }
212
213 #[test]
214 fn check_transaction_state_request_header_handles_missing_optional_fields() {
215 let mut map = HashMap::new();
216 map.insert(
217 CheetahString::from_static_str("tranStateTableOffset"),
218 CheetahString::from_static_str("123"),
219 );
220 map.insert(
221 CheetahString::from_static_str("commitLogOffset"),
222 CheetahString::from_static_str("456"),
223 );
224
225 let header = <CheckTransactionStateRequestHeader as FromMap>::from(&map).unwrap();
226 assert!(header.topic.is_none());
227 assert_eq!(header.tran_state_table_offset, 123);
228 assert_eq!(header.commit_log_offset, 456);
229 assert!(header.msg_id.is_none());
230 assert!(header.transaction_id.is_none());
231 assert!(header.offset_msg_id.is_none());
232 }
233
234 #[test]
235 fn check_transaction_state_request_header_handles_invalid_data() {
236 let mut map = HashMap::new();
237 map.insert(
238 CheetahString::from_static_str("tranStateTableOffset"),
239 CheetahString::from_static_str("invalid"),
240 );
241 map.insert(
242 CheetahString::from_static_str("commitLogOffset"),
243 CheetahString::from_static_str("invalid"),
244 );
245
246 let result = <CheckTransactionStateRequestHeader as FromMap>::from(&map);
247 assert!(result.is_err());
248 }
249}