Skip to main content

rocketmq_remoting/protocol/header/
end_transaction_request_header.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use cheetah_string::CheetahString;
16use rocketmq_macros::RequestHeaderCodecV2;
17use serde::Deserialize;
18use serde::Serialize;
19
20use crate::rpc::rpc_request_header::RpcRequestHeader;
21
22#[derive(Clone, Debug, Serialize, Deserialize, Default, RequestHeaderCodecV2)]
23#[serde(rename_all = "camelCase")]
24pub struct EndTransactionRequestHeader {
25    #[required]
26    pub topic: CheetahString,
27
28    #[required]
29    pub producer_group: CheetahString,
30
31    //ConsumeQueue Offset
32    #[required]
33    pub tran_state_table_offset: u64,
34
35    // Offset of the message in the CommitLog
36    #[required]
37    pub commit_log_offset: u64,
38
39    //TRANSACTION_COMMIT_TYPE,TRANSACTION_ROLLBACK_TYPE,TRANSACTION_NOT_TYPE
40    #[required]
41    pub commit_or_rollback: i32,
42
43    //Whether the check-back is initiated by the Broker
44    #[required]
45    pub from_transaction_check: bool,
46
47    #[required]
48    pub msg_id: CheetahString,
49
50    pub transaction_id: Option<CheetahString>,
51
52    #[serde(flatten)]
53    pub rpc_request_header: RpcRequestHeader,
54}
55
56#[cfg(test)]
57mod tests {
58    use super::*;
59
60    #[test]
61    fn end_transaction_request_header_default() {
62        let header = EndTransactionRequestHeader::default();
63        assert_eq!(header.topic, "");
64        assert_eq!(header.producer_group, "");
65        assert_eq!(header.tran_state_table_offset, 0);
66        assert_eq!(header.commit_log_offset, 0);
67        assert_eq!(header.commit_or_rollback, 0);
68        assert!(!header.from_transaction_check);
69        assert_eq!(header.msg_id, "");
70        assert!(header.transaction_id.is_none());
71    }
72
73    #[test]
74    fn end_transaction_request_header_clone() {
75        let header = EndTransactionRequestHeader {
76            topic: CheetahString::from("topic1"),
77            producer_group: CheetahString::from("group1"),
78            tran_state_table_offset: 123,
79            commit_log_offset: 456,
80            commit_or_rollback: 1,
81            from_transaction_check: true,
82            msg_id: CheetahString::from("msg1"),
83            transaction_id: Some(CheetahString::from("tran1")),
84            rpc_request_header: RpcRequestHeader::default(),
85        };
86        let cloned_header = header.clone();
87        assert_eq!(cloned_header.topic, "topic1");
88        assert_eq!(cloned_header.producer_group, "group1");
89        assert_eq!(cloned_header.tran_state_table_offset, 123);
90        assert_eq!(cloned_header.commit_log_offset, 456);
91        assert_eq!(cloned_header.commit_or_rollback, 1);
92        assert!(cloned_header.from_transaction_check);
93        assert_eq!(cloned_header.msg_id, "msg1");
94        assert_eq!(cloned_header.transaction_id.as_ref().unwrap(), "tran1");
95    }
96
97    #[test]
98    fn end_transaction_request_header_serialization() {
99        let header = EndTransactionRequestHeader {
100            topic: CheetahString::from("topic1"),
101            producer_group: CheetahString::from("group1"),
102            tran_state_table_offset: 123,
103            commit_log_offset: 456,
104            commit_or_rollback: 1,
105            from_transaction_check: true,
106            msg_id: CheetahString::from("msg1"),
107            transaction_id: Some(CheetahString::from("tran1")),
108            rpc_request_header: RpcRequestHeader {
109                broker_name: Some(CheetahString::from("broker1")),
110                ..Default::default()
111            },
112        };
113        let json = serde_json::to_string(&header).unwrap();
114        assert!(json.contains("\"topic\":\"topic1\""));
115        assert!(json.contains("\"producerGroup\":\"group1\""));
116        assert!(json.contains("\"tranStateTableOffset\":123"));
117        assert!(json.contains("\"commitLogOffset\":456"));
118        assert!(json.contains("\"commitOrRollback\":1"));
119        assert!(json.contains("\"fromTransactionCheck\":true"));
120        assert!(json.contains("\"msgId\":\"msg1\""));
121        assert!(json.contains("\"transactionId\":\"tran1\""));
122        assert!(json.contains("\"brokerName\":\"broker1\""));
123    }
124
125    #[test]
126    fn end_transaction_request_header_deserialization() {
127        let json = r#"{"topic":"topic1","producerGroup":"group1","tranStateTableOffset":123,"commitLogOffset":456,"commitOrRollback":1,"fromTransactionCheck":true,"msgId":"msg1","transactionId":"tran1","brokerName":"broker1"}"#;
128        let header: EndTransactionRequestHeader = serde_json::from_str(json).unwrap();
129        assert_eq!(header.topic, "topic1");
130        assert_eq!(header.producer_group, "group1");
131        assert_eq!(header.tran_state_table_offset, 123);
132        assert_eq!(header.commit_log_offset, 456);
133        assert_eq!(header.commit_or_rollback, 1);
134        assert!(header.from_transaction_check);
135        assert_eq!(header.msg_id, "msg1");
136        assert_eq!(header.transaction_id.as_ref().unwrap(), "tran1");
137        assert_eq!(header.rpc_request_header.broker_name.as_ref().unwrap(), "broker1");
138    }
139    #[test]
140    fn end_transaction_request_header_required_fields() {
141        let json = r#"{"topic":"topic1","producerGroup":"group1","tranStateTableOffset":123,"commitLogOffset":456,"commitOrRollback":1,"fromTransactionCheck":true,"msgId":"msg1"}"#;
142        let header: EndTransactionRequestHeader = serde_json::from_str(json).unwrap();
143        assert_eq!(header.topic, "topic1");
144        assert_eq!(header.producer_group, "group1");
145        assert_eq!(header.tran_state_table_offset, 123);
146        assert_eq!(header.commit_log_offset, 456);
147        assert_eq!(header.commit_or_rollback, 1);
148        assert!(header.from_transaction_check);
149        assert_eq!(header.msg_id, "msg1");
150        assert!(header.transaction_id.is_none());
151    }
152
153    #[test]
154    fn end_transaction_request_header_rpc_request_header() {
155        let header = EndTransactionRequestHeader {
156            topic: CheetahString::from("topic1"),
157            producer_group: CheetahString::from("group1"),
158            tran_state_table_offset: 123,
159            commit_log_offset: 456,
160            commit_or_rollback: 1,
161            from_transaction_check: true,
162            msg_id: CheetahString::from("msg1"),
163            transaction_id: Some(CheetahString::from("tran1")),
164            rpc_request_header: RpcRequestHeader {
165                namespace: Some(CheetahString::from("namespace1")),
166                namespaced: Some(true),
167                broker_name: Some(CheetahString::from("broker1")),
168                oneway: Some(false),
169            },
170        };
171        assert_eq!(header.rpc_request_header.namespace.as_ref().unwrap(), "namespace1");
172        assert!(header.rpc_request_header.namespaced.unwrap());
173        assert_eq!(header.rpc_request_header.broker_name.as_ref().unwrap(), "broker1");
174        assert!(!header.rpc_request_header.oneway.unwrap());
175    }
176
177    #[test]
178    fn end_transaction_request_header_to_map() {
179        use crate::protocol::command_custom_header::CommandCustomHeader;
180
181        let header = EndTransactionRequestHeader {
182            topic: CheetahString::from("topic1"),
183            producer_group: CheetahString::from("group1"),
184            tran_state_table_offset: 123,
185            commit_log_offset: 456,
186            commit_or_rollback: 1,
187            from_transaction_check: true,
188            msg_id: CheetahString::from("msg1"),
189            transaction_id: Some(CheetahString::from("tran1")),
190            rpc_request_header: RpcRequestHeader::default(),
191        };
192
193        let map = header.to_map().unwrap();
194        assert_eq!(
195            map.get(&CheetahString::from_static_str(EndTransactionRequestHeader::TOPIC))
196                .unwrap(),
197            "topic1"
198        );
199        assert_eq!(
200            map.get(&CheetahString::from_static_str(
201                EndTransactionRequestHeader::PRODUCER_GROUP
202            ))
203            .unwrap(),
204            "group1"
205        );
206        assert_eq!(
207            map.get(&CheetahString::from_static_str(
208                EndTransactionRequestHeader::TRAN_STATE_TABLE_OFFSET
209            ))
210            .unwrap(),
211            "123"
212        );
213        assert_eq!(
214            map.get(&CheetahString::from_static_str(
215                EndTransactionRequestHeader::COMMIT_LOG_OFFSET
216            ))
217            .unwrap(),
218            "456"
219        );
220        assert_eq!(
221            map.get(&CheetahString::from_static_str(
222                EndTransactionRequestHeader::COMMIT_OR_ROLLBACK
223            ))
224            .unwrap(),
225            "1"
226        );
227        assert_eq!(
228            map.get(&CheetahString::from_static_str(
229                EndTransactionRequestHeader::FROM_TRANSACTION_CHECK
230            ))
231            .unwrap(),
232            "true"
233        );
234        assert_eq!(
235            map.get(&CheetahString::from_static_str(EndTransactionRequestHeader::MSG_ID))
236                .unwrap(),
237            "msg1"
238        );
239        assert_eq!(
240            map.get(&CheetahString::from_static_str(
241                EndTransactionRequestHeader::TRANSACTION_ID
242            ))
243            .unwrap(),
244            "tran1"
245        );
246    }
247
248    #[test]
249    fn end_transaction_request_header_to_map_without_transaction_id() {
250        use crate::protocol::command_custom_header::CommandCustomHeader;
251
252        let header = EndTransactionRequestHeader {
253            topic: CheetahString::from("topic1"),
254            producer_group: CheetahString::from("group1"),
255            tran_state_table_offset: 123,
256            commit_log_offset: 456,
257            commit_or_rollback: 1,
258            from_transaction_check: true,
259            msg_id: CheetahString::from("msg1"),
260            transaction_id: None,
261            rpc_request_header: RpcRequestHeader::default(),
262        };
263
264        let map = header.to_map().unwrap();
265        assert_eq!(
266            map.get(&CheetahString::from_static_str(EndTransactionRequestHeader::TOPIC))
267                .unwrap(),
268            "topic1"
269        );
270        assert!(!map.contains_key(&CheetahString::from_static_str(
271            EndTransactionRequestHeader::TRANSACTION_ID
272        )));
273    }
274
275    #[test]
276    fn end_transaction_request_header_from_map() {
277        use crate::protocol::command_custom_header::FromMap;
278        use std::collections::HashMap;
279
280        let mut map = HashMap::new();
281        map.insert(
282            CheetahString::from_static_str(EndTransactionRequestHeader::TOPIC),
283            CheetahString::from_static_str("topic1"),
284        );
285        map.insert(
286            CheetahString::from_static_str(EndTransactionRequestHeader::PRODUCER_GROUP),
287            CheetahString::from_static_str("group1"),
288        );
289        map.insert(
290            CheetahString::from_static_str(EndTransactionRequestHeader::TRAN_STATE_TABLE_OFFSET),
291            CheetahString::from_static_str("123"),
292        );
293        map.insert(
294            CheetahString::from_static_str(EndTransactionRequestHeader::COMMIT_LOG_OFFSET),
295            CheetahString::from_static_str("456"),
296        );
297        map.insert(
298            CheetahString::from_static_str(EndTransactionRequestHeader::COMMIT_OR_ROLLBACK),
299            CheetahString::from_static_str("1"),
300        );
301        map.insert(
302            CheetahString::from_static_str(EndTransactionRequestHeader::FROM_TRANSACTION_CHECK),
303            CheetahString::from_static_str("true"),
304        );
305        map.insert(
306            CheetahString::from_static_str(EndTransactionRequestHeader::MSG_ID),
307            CheetahString::from_static_str("msg1"),
308        );
309        map.insert(
310            CheetahString::from_static_str(EndTransactionRequestHeader::TRANSACTION_ID),
311            CheetahString::from_static_str("tran1"),
312        );
313
314        let header = <EndTransactionRequestHeader as FromMap>::from(&map).unwrap();
315        assert_eq!(header.topic, "topic1");
316        assert_eq!(header.producer_group, "group1");
317        assert_eq!(header.tran_state_table_offset, 123);
318        assert_eq!(header.commit_log_offset, 456);
319        assert_eq!(header.commit_or_rollback, 1);
320        assert!(header.from_transaction_check);
321        assert_eq!(header.msg_id, "msg1");
322        assert_eq!(header.transaction_id.unwrap(), "tran1");
323    }
324
325    #[test]
326    fn end_transaction_request_header_from_map_without_transaction_id() {
327        use crate::protocol::command_custom_header::FromMap;
328        use std::collections::HashMap;
329
330        let mut map = HashMap::new();
331        map.insert(
332            CheetahString::from_static_str(EndTransactionRequestHeader::TOPIC),
333            CheetahString::from_static_str("topic1"),
334        );
335        map.insert(
336            CheetahString::from_static_str(EndTransactionRequestHeader::PRODUCER_GROUP),
337            CheetahString::from_static_str("group1"),
338        );
339        map.insert(
340            CheetahString::from_static_str(EndTransactionRequestHeader::TRAN_STATE_TABLE_OFFSET),
341            CheetahString::from_static_str("123"),
342        );
343        map.insert(
344            CheetahString::from_static_str(EndTransactionRequestHeader::COMMIT_LOG_OFFSET),
345            CheetahString::from_static_str("456"),
346        );
347        map.insert(
348            CheetahString::from_static_str(EndTransactionRequestHeader::COMMIT_OR_ROLLBACK),
349            CheetahString::from_static_str("1"),
350        );
351        map.insert(
352            CheetahString::from_static_str(EndTransactionRequestHeader::FROM_TRANSACTION_CHECK),
353            CheetahString::from_static_str("true"),
354        );
355        map.insert(
356            CheetahString::from_static_str(EndTransactionRequestHeader::MSG_ID),
357            CheetahString::from_static_str("msg1"),
358        );
359
360        let header = <EndTransactionRequestHeader as FromMap>::from(&map).unwrap();
361        assert_eq!(header.topic, "topic1");
362        assert_eq!(header.producer_group, "group1");
363        assert_eq!(header.tran_state_table_offset, 123);
364        assert_eq!(header.commit_log_offset, 456);
365        assert_eq!(header.commit_or_rollback, 1);
366        assert!(header.from_transaction_check);
367        assert_eq!(header.msg_id, "msg1");
368        assert!(header.transaction_id.is_none());
369    }
370
371    #[test]
372    fn end_transaction_request_header_from_map_missing_required_field() {
373        use crate::protocol::command_custom_header::FromMap;
374        use std::collections::HashMap;
375
376        #[derive(Debug, Clone, Copy, PartialEq, Eq)]
377        enum RequiredField {
378            Topic,
379            ProducerGroup,
380            TranStateTableOffset,
381            CommitLogOffset,
382            CommitOrRollback,
383            FromTransactionCheck,
384            MsgId,
385        }
386
387        impl RequiredField {
388            fn as_str(&self) -> &'static str {
389                match self {
390                    RequiredField::Topic => EndTransactionRequestHeader::TOPIC,
391                    RequiredField::ProducerGroup => EndTransactionRequestHeader::PRODUCER_GROUP,
392                    RequiredField::TranStateTableOffset => EndTransactionRequestHeader::TRAN_STATE_TABLE_OFFSET,
393                    RequiredField::CommitLogOffset => EndTransactionRequestHeader::COMMIT_LOG_OFFSET,
394                    RequiredField::CommitOrRollback => EndTransactionRequestHeader::COMMIT_OR_ROLLBACK,
395                    RequiredField::FromTransactionCheck => EndTransactionRequestHeader::FROM_TRANSACTION_CHECK,
396                    RequiredField::MsgId => EndTransactionRequestHeader::MSG_ID,
397                }
398            }
399
400            fn test_value(&self) -> &'static str {
401                match self {
402                    RequiredField::Topic => "topic1",
403                    RequiredField::ProducerGroup => "group1",
404                    RequiredField::TranStateTableOffset => "123",
405                    RequiredField::CommitLogOffset => "456",
406                    RequiredField::CommitOrRollback => "1",
407                    RequiredField::FromTransactionCheck => "true",
408                    RequiredField::MsgId => "msg1",
409                }
410            }
411        }
412
413        let all_required_fields: &[RequiredField] = &[
414            RequiredField::Topic,
415            RequiredField::ProducerGroup,
416            RequiredField::TranStateTableOffset,
417            RequiredField::CommitLogOffset,
418            RequiredField::CommitOrRollback,
419            RequiredField::FromTransactionCheck,
420            RequiredField::MsgId,
421        ];
422
423        for &missing_field in all_required_fields {
424            let mut map = HashMap::new();
425            for &field in all_required_fields {
426                if field != missing_field {
427                    map.insert(
428                        CheetahString::from_static_str(field.as_str()),
429                        CheetahString::from_static_str(field.test_value()),
430                    );
431                }
432            }
433
434            let result = <EndTransactionRequestHeader as FromMap>::from(&map);
435            assert!(
436                result.is_err(),
437                "Expected failure when missing required field: {:?}",
438                missing_field
439            );
440        }
441    }
442
443    #[test]
444    fn end_transaction_request_header_from_map_invalid_numeric_field() {
445        use crate::protocol::command_custom_header::FromMap;
446        use std::collections::HashMap;
447
448        let mut map = HashMap::new();
449        map.insert(
450            CheetahString::from_static_str(EndTransactionRequestHeader::TOPIC),
451            CheetahString::from_static_str("topic1"),
452        );
453        map.insert(
454            CheetahString::from_static_str(EndTransactionRequestHeader::PRODUCER_GROUP),
455            CheetahString::from_static_str("group1"),
456        );
457        map.insert(
458            CheetahString::from_static_str(EndTransactionRequestHeader::TRAN_STATE_TABLE_OFFSET),
459            CheetahString::from_static_str("invalid"),
460        );
461        map.insert(
462            CheetahString::from_static_str(EndTransactionRequestHeader::COMMIT_LOG_OFFSET),
463            CheetahString::from_static_str("456"),
464        );
465        map.insert(
466            CheetahString::from_static_str(EndTransactionRequestHeader::COMMIT_OR_ROLLBACK),
467            CheetahString::from_static_str("1"),
468        );
469        map.insert(
470            CheetahString::from_static_str(EndTransactionRequestHeader::FROM_TRANSACTION_CHECK),
471            CheetahString::from_static_str("true"),
472        );
473        map.insert(
474            CheetahString::from_static_str(EndTransactionRequestHeader::MSG_ID),
475            CheetahString::from_static_str("msg1"),
476        );
477
478        let result = <EndTransactionRequestHeader as FromMap>::from(&map);
479        assert!(result.is_err());
480    }
481
482    #[test]
483    fn end_transaction_request_header_from_map_invalid_bool_field() {
484        use crate::protocol::command_custom_header::FromMap;
485        use std::collections::HashMap;
486
487        let mut map = HashMap::new();
488        map.insert(
489            CheetahString::from_static_str(EndTransactionRequestHeader::TOPIC),
490            CheetahString::from_static_str("topic1"),
491        );
492        map.insert(
493            CheetahString::from_static_str(EndTransactionRequestHeader::PRODUCER_GROUP),
494            CheetahString::from_static_str("group1"),
495        );
496        map.insert(
497            CheetahString::from_static_str(EndTransactionRequestHeader::TRAN_STATE_TABLE_OFFSET),
498            CheetahString::from_static_str("123"),
499        );
500        map.insert(
501            CheetahString::from_static_str(EndTransactionRequestHeader::COMMIT_LOG_OFFSET),
502            CheetahString::from_static_str("456"),
503        );
504        map.insert(
505            CheetahString::from_static_str(EndTransactionRequestHeader::COMMIT_OR_ROLLBACK),
506            CheetahString::from_static_str("1"),
507        );
508        map.insert(
509            CheetahString::from_static_str(EndTransactionRequestHeader::FROM_TRANSACTION_CHECK),
510            CheetahString::from_static_str("invalid"),
511        );
512        map.insert(
513            CheetahString::from_static_str(EndTransactionRequestHeader::MSG_ID),
514            CheetahString::from_static_str("msg1"),
515        );
516
517        let result = <EndTransactionRequestHeader as FromMap>::from(&map);
518        assert!(result.is_err());
519    }
520
521    #[test]
522    fn end_transaction_request_header_from_map_transaction_state_values() {
523        use crate::protocol::command_custom_header::FromMap;
524        use rocketmq_common::common::sys_flag::message_sys_flag::MessageSysFlag;
525        use std::collections::HashMap;
526
527        struct TestCase {
528            value: i32,
529        }
530
531        let test_cases = &[
532            TestCase {
533                value: MessageSysFlag::TRANSACTION_NOT_TYPE,
534            },
535            TestCase {
536                value: MessageSysFlag::TRANSACTION_COMMIT_TYPE,
537            },
538            TestCase {
539                value: MessageSysFlag::TRANSACTION_ROLLBACK_TYPE,
540            },
541        ];
542
543        for case in test_cases {
544            let mut map = HashMap::new();
545            map.insert(
546                CheetahString::from_static_str(EndTransactionRequestHeader::TOPIC),
547                CheetahString::from_static_str("topic1"),
548            );
549            map.insert(
550                CheetahString::from_static_str(EndTransactionRequestHeader::PRODUCER_GROUP),
551                CheetahString::from_static_str("group1"),
552            );
553            map.insert(
554                CheetahString::from_static_str(EndTransactionRequestHeader::TRAN_STATE_TABLE_OFFSET),
555                CheetahString::from_static_str("123"),
556            );
557            map.insert(
558                CheetahString::from_static_str(EndTransactionRequestHeader::COMMIT_LOG_OFFSET),
559                CheetahString::from_static_str("456"),
560            );
561            map.insert(
562                CheetahString::from_static_str(EndTransactionRequestHeader::COMMIT_OR_ROLLBACK),
563                CheetahString::from(case.value.to_string()),
564            );
565            map.insert(
566                CheetahString::from_static_str(EndTransactionRequestHeader::FROM_TRANSACTION_CHECK),
567                CheetahString::from_static_str("true"),
568            );
569            map.insert(
570                CheetahString::from_static_str(EndTransactionRequestHeader::MSG_ID),
571                CheetahString::from_static_str("msg1"),
572            );
573
574            let result = <EndTransactionRequestHeader as FromMap>::from(&map);
575            assert!(
576                result.is_ok(),
577                "commit_or_rollback={} should parse successfully but got {:?}",
578                case.value,
579                result
580            );
581            let header = result.unwrap();
582            assert_eq!(
583                header.commit_or_rollback, case.value,
584                "commit_or_rollback should be {}",
585                case.value
586            );
587        }
588    }
589
590    #[test]
591    fn end_transaction_request_header_roundtrip() {
592        use crate::protocol::command_custom_header::CommandCustomHeader;
593        use crate::protocol::command_custom_header::FromMap;
594        use rocketmq_common::common::sys_flag::message_sys_flag::MessageSysFlag;
595
596        let original = EndTransactionRequestHeader {
597            topic: CheetahString::from("topic1"),
598            producer_group: CheetahString::from("group1"),
599            tran_state_table_offset: 123,
600            commit_log_offset: 456,
601            commit_or_rollback: MessageSysFlag::TRANSACTION_COMMIT_TYPE,
602            from_transaction_check: true,
603            msg_id: CheetahString::from("msg1"),
604            transaction_id: Some(CheetahString::from("tran1")),
605            rpc_request_header: RpcRequestHeader::default(),
606        };
607
608        let map = original.to_map().unwrap();
609        let restored = <EndTransactionRequestHeader as FromMap>::from(&map).unwrap();
610
611        assert_eq!(original.topic, restored.topic);
612        assert_eq!(original.producer_group, restored.producer_group);
613        assert_eq!(original.tran_state_table_offset, restored.tran_state_table_offset);
614        assert_eq!(original.commit_log_offset, restored.commit_log_offset);
615        assert_eq!(original.commit_or_rollback, restored.commit_or_rollback);
616        assert_eq!(original.from_transaction_check, restored.from_transaction_check);
617        assert_eq!(original.msg_id, restored.msg_id);
618        assert_eq!(original.transaction_id, restored.transaction_id);
619    }
620
621    #[test]
622    fn end_transaction_request_header_via_remoting_command() {
623        use crate::code::request_code::RequestCode;
624        use crate::protocol::remoting_command::RemotingCommand;
625        use bytes::BytesMut;
626        use rocketmq_common::common::sys_flag::message_sys_flag::MessageSysFlag;
627
628        let original = EndTransactionRequestHeader {
629            topic: CheetahString::from("topic1"),
630            producer_group: CheetahString::from("group1"),
631            tran_state_table_offset: 123,
632            commit_log_offset: 456,
633            commit_or_rollback: MessageSysFlag::TRANSACTION_COMMIT_TYPE,
634            from_transaction_check: true,
635            msg_id: CheetahString::from("msg1"),
636            transaction_id: Some(CheetahString::from("tran1")),
637            rpc_request_header: RpcRequestHeader::default(),
638        };
639
640        let mut command = RemotingCommand::create_request_command(RequestCode::EndTransaction, original);
641        let header_bytes = command.encode_header().expect("Failed to encode header");
642
643        let mut buf = BytesMut::from(header_bytes);
644        let decoded_command = RemotingCommand::decode(&mut buf).expect("Failed to decode command");
645        let decoded_command = decoded_command.expect("Decoded command is None");
646
647        assert_eq!(decoded_command.code(), RequestCode::EndTransaction as i32);
648
649        let decoded_header = decoded_command
650            .decode_command_custom_header::<EndTransactionRequestHeader>()
651            .expect("Failed to decode header");
652
653        assert_eq!(decoded_header.topic, "topic1");
654        assert_eq!(decoded_header.producer_group, "group1");
655        assert_eq!(decoded_header.tran_state_table_offset, 123);
656        assert_eq!(decoded_header.commit_log_offset, 456);
657        assert_eq!(
658            decoded_header.commit_or_rollback,
659            MessageSysFlag::TRANSACTION_COMMIT_TYPE
660        );
661        assert!(decoded_header.from_transaction_check);
662        assert_eq!(decoded_header.msg_id, "msg1");
663        assert_eq!(decoded_header.transaction_id.unwrap(), "tran1");
664    }
665}