1use cheetah_string::CheetahString;
16use rocketmq_macros::RequestHeaderCodecV2;
17use serde::Deserialize;
18use serde::Serialize;
19
20use crate::rpc::rpc_request_header::RpcRequestHeader;
21
22#[derive(Clone, Debug, Serialize, Deserialize, Default, RequestHeaderCodecV2)]
23#[serde(rename_all = "camelCase")]
24pub struct EndTransactionRequestHeader {
25 #[required]
26 pub topic: CheetahString,
27
28 #[required]
29 pub producer_group: CheetahString,
30
31 #[required]
33 pub tran_state_table_offset: u64,
34
35 #[required]
37 pub commit_log_offset: u64,
38
39 #[required]
41 pub commit_or_rollback: i32,
42
43 #[required]
45 pub from_transaction_check: bool,
46
47 #[required]
48 pub msg_id: CheetahString,
49
50 pub transaction_id: Option<CheetahString>,
51
52 #[serde(flatten)]
53 pub rpc_request_header: RpcRequestHeader,
54}
55
56#[cfg(test)]
57mod tests {
58 use super::*;
59
60 #[test]
61 fn end_transaction_request_header_default() {
62 let header = EndTransactionRequestHeader::default();
63 assert_eq!(header.topic, "");
64 assert_eq!(header.producer_group, "");
65 assert_eq!(header.tran_state_table_offset, 0);
66 assert_eq!(header.commit_log_offset, 0);
67 assert_eq!(header.commit_or_rollback, 0);
68 assert!(!header.from_transaction_check);
69 assert_eq!(header.msg_id, "");
70 assert!(header.transaction_id.is_none());
71 }
72
73 #[test]
74 fn end_transaction_request_header_clone() {
75 let header = EndTransactionRequestHeader {
76 topic: CheetahString::from("topic1"),
77 producer_group: CheetahString::from("group1"),
78 tran_state_table_offset: 123,
79 commit_log_offset: 456,
80 commit_or_rollback: 1,
81 from_transaction_check: true,
82 msg_id: CheetahString::from("msg1"),
83 transaction_id: Some(CheetahString::from("tran1")),
84 rpc_request_header: RpcRequestHeader::default(),
85 };
86 let cloned_header = header.clone();
87 assert_eq!(cloned_header.topic, "topic1");
88 assert_eq!(cloned_header.producer_group, "group1");
89 assert_eq!(cloned_header.tran_state_table_offset, 123);
90 assert_eq!(cloned_header.commit_log_offset, 456);
91 assert_eq!(cloned_header.commit_or_rollback, 1);
92 assert!(cloned_header.from_transaction_check);
93 assert_eq!(cloned_header.msg_id, "msg1");
94 assert_eq!(cloned_header.transaction_id.as_ref().unwrap(), "tran1");
95 }
96
97 #[test]
98 fn end_transaction_request_header_serialization() {
99 let header = EndTransactionRequestHeader {
100 topic: CheetahString::from("topic1"),
101 producer_group: CheetahString::from("group1"),
102 tran_state_table_offset: 123,
103 commit_log_offset: 456,
104 commit_or_rollback: 1,
105 from_transaction_check: true,
106 msg_id: CheetahString::from("msg1"),
107 transaction_id: Some(CheetahString::from("tran1")),
108 rpc_request_header: RpcRequestHeader {
109 broker_name: Some(CheetahString::from("broker1")),
110 ..Default::default()
111 },
112 };
113 let json = serde_json::to_string(&header).unwrap();
114 assert!(json.contains("\"topic\":\"topic1\""));
115 assert!(json.contains("\"producerGroup\":\"group1\""));
116 assert!(json.contains("\"tranStateTableOffset\":123"));
117 assert!(json.contains("\"commitLogOffset\":456"));
118 assert!(json.contains("\"commitOrRollback\":1"));
119 assert!(json.contains("\"fromTransactionCheck\":true"));
120 assert!(json.contains("\"msgId\":\"msg1\""));
121 assert!(json.contains("\"transactionId\":\"tran1\""));
122 assert!(json.contains("\"brokerName\":\"broker1\""));
123 }
124
125 #[test]
126 fn end_transaction_request_header_deserialization() {
127 let json = r#"{"topic":"topic1","producerGroup":"group1","tranStateTableOffset":123,"commitLogOffset":456,"commitOrRollback":1,"fromTransactionCheck":true,"msgId":"msg1","transactionId":"tran1","brokerName":"broker1"}"#;
128 let header: EndTransactionRequestHeader = serde_json::from_str(json).unwrap();
129 assert_eq!(header.topic, "topic1");
130 assert_eq!(header.producer_group, "group1");
131 assert_eq!(header.tran_state_table_offset, 123);
132 assert_eq!(header.commit_log_offset, 456);
133 assert_eq!(header.commit_or_rollback, 1);
134 assert!(header.from_transaction_check);
135 assert_eq!(header.msg_id, "msg1");
136 assert_eq!(header.transaction_id.as_ref().unwrap(), "tran1");
137 assert_eq!(header.rpc_request_header.broker_name.as_ref().unwrap(), "broker1");
138 }
139 #[test]
140 fn end_transaction_request_header_required_fields() {
141 let json = r#"{"topic":"topic1","producerGroup":"group1","tranStateTableOffset":123,"commitLogOffset":456,"commitOrRollback":1,"fromTransactionCheck":true,"msgId":"msg1"}"#;
142 let header: EndTransactionRequestHeader = serde_json::from_str(json).unwrap();
143 assert_eq!(header.topic, "topic1");
144 assert_eq!(header.producer_group, "group1");
145 assert_eq!(header.tran_state_table_offset, 123);
146 assert_eq!(header.commit_log_offset, 456);
147 assert_eq!(header.commit_or_rollback, 1);
148 assert!(header.from_transaction_check);
149 assert_eq!(header.msg_id, "msg1");
150 assert!(header.transaction_id.is_none());
151 }
152
153 #[test]
154 fn end_transaction_request_header_rpc_request_header() {
155 let header = EndTransactionRequestHeader {
156 topic: CheetahString::from("topic1"),
157 producer_group: CheetahString::from("group1"),
158 tran_state_table_offset: 123,
159 commit_log_offset: 456,
160 commit_or_rollback: 1,
161 from_transaction_check: true,
162 msg_id: CheetahString::from("msg1"),
163 transaction_id: Some(CheetahString::from("tran1")),
164 rpc_request_header: RpcRequestHeader {
165 namespace: Some(CheetahString::from("namespace1")),
166 namespaced: Some(true),
167 broker_name: Some(CheetahString::from("broker1")),
168 oneway: Some(false),
169 },
170 };
171 assert_eq!(header.rpc_request_header.namespace.as_ref().unwrap(), "namespace1");
172 assert!(header.rpc_request_header.namespaced.unwrap());
173 assert_eq!(header.rpc_request_header.broker_name.as_ref().unwrap(), "broker1");
174 assert!(!header.rpc_request_header.oneway.unwrap());
175 }
176
177 #[test]
178 fn end_transaction_request_header_to_map() {
179 use crate::protocol::command_custom_header::CommandCustomHeader;
180
181 let header = EndTransactionRequestHeader {
182 topic: CheetahString::from("topic1"),
183 producer_group: CheetahString::from("group1"),
184 tran_state_table_offset: 123,
185 commit_log_offset: 456,
186 commit_or_rollback: 1,
187 from_transaction_check: true,
188 msg_id: CheetahString::from("msg1"),
189 transaction_id: Some(CheetahString::from("tran1")),
190 rpc_request_header: RpcRequestHeader::default(),
191 };
192
193 let map = header.to_map().unwrap();
194 assert_eq!(
195 map.get(&CheetahString::from_static_str(EndTransactionRequestHeader::TOPIC))
196 .unwrap(),
197 "topic1"
198 );
199 assert_eq!(
200 map.get(&CheetahString::from_static_str(
201 EndTransactionRequestHeader::PRODUCER_GROUP
202 ))
203 .unwrap(),
204 "group1"
205 );
206 assert_eq!(
207 map.get(&CheetahString::from_static_str(
208 EndTransactionRequestHeader::TRAN_STATE_TABLE_OFFSET
209 ))
210 .unwrap(),
211 "123"
212 );
213 assert_eq!(
214 map.get(&CheetahString::from_static_str(
215 EndTransactionRequestHeader::COMMIT_LOG_OFFSET
216 ))
217 .unwrap(),
218 "456"
219 );
220 assert_eq!(
221 map.get(&CheetahString::from_static_str(
222 EndTransactionRequestHeader::COMMIT_OR_ROLLBACK
223 ))
224 .unwrap(),
225 "1"
226 );
227 assert_eq!(
228 map.get(&CheetahString::from_static_str(
229 EndTransactionRequestHeader::FROM_TRANSACTION_CHECK
230 ))
231 .unwrap(),
232 "true"
233 );
234 assert_eq!(
235 map.get(&CheetahString::from_static_str(EndTransactionRequestHeader::MSG_ID))
236 .unwrap(),
237 "msg1"
238 );
239 assert_eq!(
240 map.get(&CheetahString::from_static_str(
241 EndTransactionRequestHeader::TRANSACTION_ID
242 ))
243 .unwrap(),
244 "tran1"
245 );
246 }
247
248 #[test]
249 fn end_transaction_request_header_to_map_without_transaction_id() {
250 use crate::protocol::command_custom_header::CommandCustomHeader;
251
252 let header = EndTransactionRequestHeader {
253 topic: CheetahString::from("topic1"),
254 producer_group: CheetahString::from("group1"),
255 tran_state_table_offset: 123,
256 commit_log_offset: 456,
257 commit_or_rollback: 1,
258 from_transaction_check: true,
259 msg_id: CheetahString::from("msg1"),
260 transaction_id: None,
261 rpc_request_header: RpcRequestHeader::default(),
262 };
263
264 let map = header.to_map().unwrap();
265 assert_eq!(
266 map.get(&CheetahString::from_static_str(EndTransactionRequestHeader::TOPIC))
267 .unwrap(),
268 "topic1"
269 );
270 assert!(!map.contains_key(&CheetahString::from_static_str(
271 EndTransactionRequestHeader::TRANSACTION_ID
272 )));
273 }
274
275 #[test]
276 fn end_transaction_request_header_from_map() {
277 use crate::protocol::command_custom_header::FromMap;
278 use std::collections::HashMap;
279
280 let mut map = HashMap::new();
281 map.insert(
282 CheetahString::from_static_str(EndTransactionRequestHeader::TOPIC),
283 CheetahString::from_static_str("topic1"),
284 );
285 map.insert(
286 CheetahString::from_static_str(EndTransactionRequestHeader::PRODUCER_GROUP),
287 CheetahString::from_static_str("group1"),
288 );
289 map.insert(
290 CheetahString::from_static_str(EndTransactionRequestHeader::TRAN_STATE_TABLE_OFFSET),
291 CheetahString::from_static_str("123"),
292 );
293 map.insert(
294 CheetahString::from_static_str(EndTransactionRequestHeader::COMMIT_LOG_OFFSET),
295 CheetahString::from_static_str("456"),
296 );
297 map.insert(
298 CheetahString::from_static_str(EndTransactionRequestHeader::COMMIT_OR_ROLLBACK),
299 CheetahString::from_static_str("1"),
300 );
301 map.insert(
302 CheetahString::from_static_str(EndTransactionRequestHeader::FROM_TRANSACTION_CHECK),
303 CheetahString::from_static_str("true"),
304 );
305 map.insert(
306 CheetahString::from_static_str(EndTransactionRequestHeader::MSG_ID),
307 CheetahString::from_static_str("msg1"),
308 );
309 map.insert(
310 CheetahString::from_static_str(EndTransactionRequestHeader::TRANSACTION_ID),
311 CheetahString::from_static_str("tran1"),
312 );
313
314 let header = <EndTransactionRequestHeader as FromMap>::from(&map).unwrap();
315 assert_eq!(header.topic, "topic1");
316 assert_eq!(header.producer_group, "group1");
317 assert_eq!(header.tran_state_table_offset, 123);
318 assert_eq!(header.commit_log_offset, 456);
319 assert_eq!(header.commit_or_rollback, 1);
320 assert!(header.from_transaction_check);
321 assert_eq!(header.msg_id, "msg1");
322 assert_eq!(header.transaction_id.unwrap(), "tran1");
323 }
324
325 #[test]
326 fn end_transaction_request_header_from_map_without_transaction_id() {
327 use crate::protocol::command_custom_header::FromMap;
328 use std::collections::HashMap;
329
330 let mut map = HashMap::new();
331 map.insert(
332 CheetahString::from_static_str(EndTransactionRequestHeader::TOPIC),
333 CheetahString::from_static_str("topic1"),
334 );
335 map.insert(
336 CheetahString::from_static_str(EndTransactionRequestHeader::PRODUCER_GROUP),
337 CheetahString::from_static_str("group1"),
338 );
339 map.insert(
340 CheetahString::from_static_str(EndTransactionRequestHeader::TRAN_STATE_TABLE_OFFSET),
341 CheetahString::from_static_str("123"),
342 );
343 map.insert(
344 CheetahString::from_static_str(EndTransactionRequestHeader::COMMIT_LOG_OFFSET),
345 CheetahString::from_static_str("456"),
346 );
347 map.insert(
348 CheetahString::from_static_str(EndTransactionRequestHeader::COMMIT_OR_ROLLBACK),
349 CheetahString::from_static_str("1"),
350 );
351 map.insert(
352 CheetahString::from_static_str(EndTransactionRequestHeader::FROM_TRANSACTION_CHECK),
353 CheetahString::from_static_str("true"),
354 );
355 map.insert(
356 CheetahString::from_static_str(EndTransactionRequestHeader::MSG_ID),
357 CheetahString::from_static_str("msg1"),
358 );
359
360 let header = <EndTransactionRequestHeader as FromMap>::from(&map).unwrap();
361 assert_eq!(header.topic, "topic1");
362 assert_eq!(header.producer_group, "group1");
363 assert_eq!(header.tran_state_table_offset, 123);
364 assert_eq!(header.commit_log_offset, 456);
365 assert_eq!(header.commit_or_rollback, 1);
366 assert!(header.from_transaction_check);
367 assert_eq!(header.msg_id, "msg1");
368 assert!(header.transaction_id.is_none());
369 }
370
371 #[test]
372 fn end_transaction_request_header_from_map_missing_required_field() {
373 use crate::protocol::command_custom_header::FromMap;
374 use std::collections::HashMap;
375
376 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
377 enum RequiredField {
378 Topic,
379 ProducerGroup,
380 TranStateTableOffset,
381 CommitLogOffset,
382 CommitOrRollback,
383 FromTransactionCheck,
384 MsgId,
385 }
386
387 impl RequiredField {
388 fn as_str(&self) -> &'static str {
389 match self {
390 RequiredField::Topic => EndTransactionRequestHeader::TOPIC,
391 RequiredField::ProducerGroup => EndTransactionRequestHeader::PRODUCER_GROUP,
392 RequiredField::TranStateTableOffset => EndTransactionRequestHeader::TRAN_STATE_TABLE_OFFSET,
393 RequiredField::CommitLogOffset => EndTransactionRequestHeader::COMMIT_LOG_OFFSET,
394 RequiredField::CommitOrRollback => EndTransactionRequestHeader::COMMIT_OR_ROLLBACK,
395 RequiredField::FromTransactionCheck => EndTransactionRequestHeader::FROM_TRANSACTION_CHECK,
396 RequiredField::MsgId => EndTransactionRequestHeader::MSG_ID,
397 }
398 }
399
400 fn test_value(&self) -> &'static str {
401 match self {
402 RequiredField::Topic => "topic1",
403 RequiredField::ProducerGroup => "group1",
404 RequiredField::TranStateTableOffset => "123",
405 RequiredField::CommitLogOffset => "456",
406 RequiredField::CommitOrRollback => "1",
407 RequiredField::FromTransactionCheck => "true",
408 RequiredField::MsgId => "msg1",
409 }
410 }
411 }
412
413 let all_required_fields: &[RequiredField] = &[
414 RequiredField::Topic,
415 RequiredField::ProducerGroup,
416 RequiredField::TranStateTableOffset,
417 RequiredField::CommitLogOffset,
418 RequiredField::CommitOrRollback,
419 RequiredField::FromTransactionCheck,
420 RequiredField::MsgId,
421 ];
422
423 for &missing_field in all_required_fields {
424 let mut map = HashMap::new();
425 for &field in all_required_fields {
426 if field != missing_field {
427 map.insert(
428 CheetahString::from_static_str(field.as_str()),
429 CheetahString::from_static_str(field.test_value()),
430 );
431 }
432 }
433
434 let result = <EndTransactionRequestHeader as FromMap>::from(&map);
435 assert!(
436 result.is_err(),
437 "Expected failure when missing required field: {:?}",
438 missing_field
439 );
440 }
441 }
442
443 #[test]
444 fn end_transaction_request_header_from_map_invalid_numeric_field() {
445 use crate::protocol::command_custom_header::FromMap;
446 use std::collections::HashMap;
447
448 let mut map = HashMap::new();
449 map.insert(
450 CheetahString::from_static_str(EndTransactionRequestHeader::TOPIC),
451 CheetahString::from_static_str("topic1"),
452 );
453 map.insert(
454 CheetahString::from_static_str(EndTransactionRequestHeader::PRODUCER_GROUP),
455 CheetahString::from_static_str("group1"),
456 );
457 map.insert(
458 CheetahString::from_static_str(EndTransactionRequestHeader::TRAN_STATE_TABLE_OFFSET),
459 CheetahString::from_static_str("invalid"),
460 );
461 map.insert(
462 CheetahString::from_static_str(EndTransactionRequestHeader::COMMIT_LOG_OFFSET),
463 CheetahString::from_static_str("456"),
464 );
465 map.insert(
466 CheetahString::from_static_str(EndTransactionRequestHeader::COMMIT_OR_ROLLBACK),
467 CheetahString::from_static_str("1"),
468 );
469 map.insert(
470 CheetahString::from_static_str(EndTransactionRequestHeader::FROM_TRANSACTION_CHECK),
471 CheetahString::from_static_str("true"),
472 );
473 map.insert(
474 CheetahString::from_static_str(EndTransactionRequestHeader::MSG_ID),
475 CheetahString::from_static_str("msg1"),
476 );
477
478 let result = <EndTransactionRequestHeader as FromMap>::from(&map);
479 assert!(result.is_err());
480 }
481
482 #[test]
483 fn end_transaction_request_header_from_map_invalid_bool_field() {
484 use crate::protocol::command_custom_header::FromMap;
485 use std::collections::HashMap;
486
487 let mut map = HashMap::new();
488 map.insert(
489 CheetahString::from_static_str(EndTransactionRequestHeader::TOPIC),
490 CheetahString::from_static_str("topic1"),
491 );
492 map.insert(
493 CheetahString::from_static_str(EndTransactionRequestHeader::PRODUCER_GROUP),
494 CheetahString::from_static_str("group1"),
495 );
496 map.insert(
497 CheetahString::from_static_str(EndTransactionRequestHeader::TRAN_STATE_TABLE_OFFSET),
498 CheetahString::from_static_str("123"),
499 );
500 map.insert(
501 CheetahString::from_static_str(EndTransactionRequestHeader::COMMIT_LOG_OFFSET),
502 CheetahString::from_static_str("456"),
503 );
504 map.insert(
505 CheetahString::from_static_str(EndTransactionRequestHeader::COMMIT_OR_ROLLBACK),
506 CheetahString::from_static_str("1"),
507 );
508 map.insert(
509 CheetahString::from_static_str(EndTransactionRequestHeader::FROM_TRANSACTION_CHECK),
510 CheetahString::from_static_str("invalid"),
511 );
512 map.insert(
513 CheetahString::from_static_str(EndTransactionRequestHeader::MSG_ID),
514 CheetahString::from_static_str("msg1"),
515 );
516
517 let result = <EndTransactionRequestHeader as FromMap>::from(&map);
518 assert!(result.is_err());
519 }
520
521 #[test]
522 fn end_transaction_request_header_from_map_transaction_state_values() {
523 use crate::protocol::command_custom_header::FromMap;
524 use rocketmq_common::common::sys_flag::message_sys_flag::MessageSysFlag;
525 use std::collections::HashMap;
526
527 struct TestCase {
528 value: i32,
529 }
530
531 let test_cases = &[
532 TestCase {
533 value: MessageSysFlag::TRANSACTION_NOT_TYPE,
534 },
535 TestCase {
536 value: MessageSysFlag::TRANSACTION_COMMIT_TYPE,
537 },
538 TestCase {
539 value: MessageSysFlag::TRANSACTION_ROLLBACK_TYPE,
540 },
541 ];
542
543 for case in test_cases {
544 let mut map = HashMap::new();
545 map.insert(
546 CheetahString::from_static_str(EndTransactionRequestHeader::TOPIC),
547 CheetahString::from_static_str("topic1"),
548 );
549 map.insert(
550 CheetahString::from_static_str(EndTransactionRequestHeader::PRODUCER_GROUP),
551 CheetahString::from_static_str("group1"),
552 );
553 map.insert(
554 CheetahString::from_static_str(EndTransactionRequestHeader::TRAN_STATE_TABLE_OFFSET),
555 CheetahString::from_static_str("123"),
556 );
557 map.insert(
558 CheetahString::from_static_str(EndTransactionRequestHeader::COMMIT_LOG_OFFSET),
559 CheetahString::from_static_str("456"),
560 );
561 map.insert(
562 CheetahString::from_static_str(EndTransactionRequestHeader::COMMIT_OR_ROLLBACK),
563 CheetahString::from(case.value.to_string()),
564 );
565 map.insert(
566 CheetahString::from_static_str(EndTransactionRequestHeader::FROM_TRANSACTION_CHECK),
567 CheetahString::from_static_str("true"),
568 );
569 map.insert(
570 CheetahString::from_static_str(EndTransactionRequestHeader::MSG_ID),
571 CheetahString::from_static_str("msg1"),
572 );
573
574 let result = <EndTransactionRequestHeader as FromMap>::from(&map);
575 assert!(
576 result.is_ok(),
577 "commit_or_rollback={} should parse successfully but got {:?}",
578 case.value,
579 result
580 );
581 let header = result.unwrap();
582 assert_eq!(
583 header.commit_or_rollback, case.value,
584 "commit_or_rollback should be {}",
585 case.value
586 );
587 }
588 }
589
590 #[test]
591 fn end_transaction_request_header_roundtrip() {
592 use crate::protocol::command_custom_header::CommandCustomHeader;
593 use crate::protocol::command_custom_header::FromMap;
594 use rocketmq_common::common::sys_flag::message_sys_flag::MessageSysFlag;
595
596 let original = EndTransactionRequestHeader {
597 topic: CheetahString::from("topic1"),
598 producer_group: CheetahString::from("group1"),
599 tran_state_table_offset: 123,
600 commit_log_offset: 456,
601 commit_or_rollback: MessageSysFlag::TRANSACTION_COMMIT_TYPE,
602 from_transaction_check: true,
603 msg_id: CheetahString::from("msg1"),
604 transaction_id: Some(CheetahString::from("tran1")),
605 rpc_request_header: RpcRequestHeader::default(),
606 };
607
608 let map = original.to_map().unwrap();
609 let restored = <EndTransactionRequestHeader as FromMap>::from(&map).unwrap();
610
611 assert_eq!(original.topic, restored.topic);
612 assert_eq!(original.producer_group, restored.producer_group);
613 assert_eq!(original.tran_state_table_offset, restored.tran_state_table_offset);
614 assert_eq!(original.commit_log_offset, restored.commit_log_offset);
615 assert_eq!(original.commit_or_rollback, restored.commit_or_rollback);
616 assert_eq!(original.from_transaction_check, restored.from_transaction_check);
617 assert_eq!(original.msg_id, restored.msg_id);
618 assert_eq!(original.transaction_id, restored.transaction_id);
619 }
620
621 #[test]
622 fn end_transaction_request_header_via_remoting_command() {
623 use crate::code::request_code::RequestCode;
624 use crate::protocol::remoting_command::RemotingCommand;
625 use bytes::BytesMut;
626 use rocketmq_common::common::sys_flag::message_sys_flag::MessageSysFlag;
627
628 let original = EndTransactionRequestHeader {
629 topic: CheetahString::from("topic1"),
630 producer_group: CheetahString::from("group1"),
631 tran_state_table_offset: 123,
632 commit_log_offset: 456,
633 commit_or_rollback: MessageSysFlag::TRANSACTION_COMMIT_TYPE,
634 from_transaction_check: true,
635 msg_id: CheetahString::from("msg1"),
636 transaction_id: Some(CheetahString::from("tran1")),
637 rpc_request_header: RpcRequestHeader::default(),
638 };
639
640 let mut command = RemotingCommand::create_request_command(RequestCode::EndTransaction, original);
641 let header_bytes = command.encode_header().expect("Failed to encode header");
642
643 let mut buf = BytesMut::from(header_bytes);
644 let decoded_command = RemotingCommand::decode(&mut buf).expect("Failed to decode command");
645 let decoded_command = decoded_command.expect("Decoded command is None");
646
647 assert_eq!(decoded_command.code(), RequestCode::EndTransaction as i32);
648
649 let decoded_header = decoded_command
650 .decode_command_custom_header::<EndTransactionRequestHeader>()
651 .expect("Failed to decode header");
652
653 assert_eq!(decoded_header.topic, "topic1");
654 assert_eq!(decoded_header.producer_group, "group1");
655 assert_eq!(decoded_header.tran_state_table_offset, 123);
656 assert_eq!(decoded_header.commit_log_offset, 456);
657 assert_eq!(
658 decoded_header.commit_or_rollback,
659 MessageSysFlag::TRANSACTION_COMMIT_TYPE
660 );
661 assert!(decoded_header.from_transaction_check);
662 assert_eq!(decoded_header.msg_id, "msg1");
663 assert_eq!(decoded_header.transaction_id.unwrap(), "tran1");
664 }
665}