1use cheetah_string::CheetahString;
19use rocketmq_common::common::message::message_enum::MessageType;
20use rocketmq_common::common::message::MessageConst;
21
22use crate::base::access_channel::AccessChannel;
23use crate::producer::local_transaction_state::LocalTransactionState;
24use crate::trace::trace_bean::TraceBean;
25use crate::trace::trace_constants::TraceConstants;
26use crate::trace::trace_context::TraceContext;
27use crate::trace::trace_transfer_bean::TraceTransferBean;
28use crate::trace::trace_type::TraceType;
29
30pub struct TraceDataEncoder;
36
37impl TraceDataEncoder {
38 pub fn decoder_from_trace_data_string(trace_data: &str) -> Vec<TraceContext> {
46 if trace_data.is_empty() {
48 return Vec::new();
49 }
50
51 let estimated_size = trace_data.matches(TraceConstants::FIELD_SPLITOR).count();
53 let mut res_list = Vec::with_capacity(estimated_size.max(1));
54
55 for context_str in trace_data.split(TraceConstants::FIELD_SPLITOR) {
57 if context_str.is_empty() {
58 continue;
59 }
60
61 let line: Vec<&str> = context_str.split(TraceConstants::CONTENT_SPLITOR).collect();
62 if line.is_empty() {
63 continue;
64 }
65
66 match line[0] {
68 "Pub" => {
69 if let Some(ctx) = Self::decode_pub_context(&line) {
70 res_list.push(ctx);
71 }
72 }
73 "SubBefore" => {
74 if let Some(ctx) = Self::decode_sub_before_context(&line) {
75 res_list.push(ctx);
76 }
77 }
78 "SubAfter" => {
79 if let Some(ctx) = Self::decode_sub_after_context(&line) {
80 res_list.push(ctx);
81 }
82 }
83 "EndTransaction" => {
84 if let Some(ctx) = Self::decode_end_transaction_context(&line) {
85 res_list.push(ctx);
86 }
87 }
88 "Recall" => {
89 if let Some(ctx) = Self::decode_recall_context(&line) {
90 res_list.push(ctx);
91 }
92 }
93 _ => {} }
95 }
96
97 res_list
98 }
99
100 pub fn encoder_from_context_bean(ctx: &TraceContext) -> Option<TraceTransferBean> {
108 let trace_beans = ctx.trace_beans.as_ref()?;
109 if trace_beans.is_empty() {
110 return None;
111 }
112
113 let mut transfer_bean = TraceTransferBean::new();
114 let mut sb = String::with_capacity(256);
116
117 match ctx.trace_type? {
118 TraceType::Pub => {
119 Self::encode_pub_context(ctx, &trace_beans[0], &mut sb);
120 }
121 TraceType::SubBefore => {
122 Self::encode_sub_before_context(ctx, trace_beans, &mut sb);
123 }
124 TraceType::SubAfter => {
125 Self::encode_sub_after_context(ctx, trace_beans, &mut sb);
126 }
127 TraceType::EndTransaction => {
128 Self::encode_end_transaction_context(ctx, &trace_beans[0], &mut sb);
129 }
130 TraceType::Recall => {
131 Self::encode_recall_context(ctx, &trace_beans[0], &mut sb);
132 }
133 }
134
135 transfer_bean.set_trans_data(CheetahString::from_string(sb));
136
137 for bean in trace_beans {
139 transfer_bean.add_key(bean.msg_id.clone());
140
141 if !bean.keys.is_empty() {
142 let keys: Vec<&str> = bean.keys.split(MessageConst::KEY_SEPARATOR).collect();
143 for key in keys {
144 if !key.is_empty() {
145 transfer_bean.add_key(CheetahString::from_slice(key));
146 }
147 }
148 }
149 }
150
151 Some(transfer_bean)
152 }
153
154 #[inline]
157 fn decode_pub_context(line: &[&str]) -> Option<TraceContext> {
158 if line.len() < 12 {
159 return None;
160 }
161
162 let time_stamp = line[1].parse().ok()?;
163 let region_id = CheetahString::from_slice(line[2]);
164 let group_name = CheetahString::from_slice(line[3]);
165 let cost_time = line[10].parse().ok()?;
166 let body_length = line[9].parse().ok()?;
167
168 let msg_type = if let Ok(msg_type_ordinal) = line[11].parse::<usize>() {
170 Self::message_type_from_ordinal(msg_type_ordinal)
171 } else {
172 None
173 };
174
175 let (is_success, offset_msg_id, client_host) = if line.len() == 13 {
177 (
178 line[12].parse().unwrap_or(true),
179 CheetahString::default(),
180 CheetahString::default(),
181 )
182 } else if line.len() == 14 {
183 (
184 line[13].parse().unwrap_or(true),
185 CheetahString::from_slice(line[12]),
186 CheetahString::default(),
187 )
188 } else if line.len() >= 15 {
189 (
190 line[13].parse().unwrap_or(true),
191 CheetahString::from_slice(line[12]),
192 CheetahString::from_slice(line[14]),
193 )
194 } else {
195 (true, CheetahString::default(), CheetahString::default())
196 };
197
198 let bean = TraceBean {
199 topic: CheetahString::from_slice(line[4]),
200 msg_id: CheetahString::from_slice(line[5]),
201 tags: CheetahString::from_slice(line[6]),
202 keys: CheetahString::from_slice(line[7]),
203 store_host: CheetahString::from_slice(line[8]),
204 body_length,
205 msg_type,
206 offset_msg_id,
207 client_host,
208 ..Default::default()
209 };
210
211 let ctx = TraceContext {
212 trace_type: Some(TraceType::Pub),
213 time_stamp,
214 region_id,
215 group_name,
216 cost_time,
217 is_success,
218 trace_beans: Some(vec![bean]),
219 ..Default::default()
220 };
221
222 Some(ctx)
223 }
224
225 #[inline]
226 fn decode_sub_before_context(line: &[&str]) -> Option<TraceContext> {
227 if line.len() < 8 {
228 return None;
229 }
230
231 let bean = TraceBean {
232 msg_id: CheetahString::from_slice(line[5]),
233 retry_times: line[6].parse().ok()?,
234 keys: CheetahString::from_slice(line[7]),
235 ..Default::default()
236 };
237
238 let ctx = TraceContext {
239 trace_type: Some(TraceType::SubBefore),
240 time_stamp: line[1].parse().ok()?,
241 region_id: CheetahString::from_slice(line[2]),
242 group_name: CheetahString::from_slice(line[3]),
243 request_id: CheetahString::from_slice(line[4]),
244 trace_beans: Some(vec![bean]),
245 ..Default::default()
246 };
247
248 Some(ctx)
249 }
250
251 #[inline]
252 fn decode_sub_after_context(line: &[&str]) -> Option<TraceContext> {
253 if line.len() < 6 {
254 return None;
255 }
256
257 let context_code = if line.len() >= 7 {
258 line[6].parse().unwrap_or(0)
259 } else {
260 0
261 };
262
263 let (time_stamp, group_name) = if line.len() >= 9 {
264 (line[7].parse().unwrap_or(0), CheetahString::from_slice(line[8]))
265 } else {
266 (0, CheetahString::default())
267 };
268
269 let bean = TraceBean {
270 msg_id: CheetahString::from_slice(line[2]),
271 keys: CheetahString::from_slice(line[5]),
272 ..Default::default()
273 };
274
275 let ctx = TraceContext {
276 trace_type: Some(TraceType::SubAfter),
277 request_id: CheetahString::from_slice(line[1]),
278 cost_time: line[3].parse().ok()?,
279 is_success: line[4].parse().unwrap_or(false),
280 context_code,
281 time_stamp,
282 group_name,
283 trace_beans: Some(vec![bean]),
284 ..Default::default()
285 };
286
287 Some(ctx)
288 }
289
290 #[inline]
291 fn decode_end_transaction_context(line: &[&str]) -> Option<TraceContext> {
292 if line.len() < 13 {
293 return None;
294 }
295
296 let msg_type = if let Ok(msg_type_ordinal) = line[9].parse::<usize>() {
297 Self::message_type_from_ordinal(msg_type_ordinal)
298 } else {
299 None
300 };
301
302 let bean = TraceBean {
303 topic: CheetahString::from_slice(line[4]),
304 msg_id: CheetahString::from_slice(line[5]),
305 tags: CheetahString::from_slice(line[6]),
306 keys: CheetahString::from_slice(line[7]),
307 store_host: CheetahString::from_slice(line[8]),
308 msg_type,
309 transaction_id: Some(CheetahString::from_slice(line[10])),
310 transaction_state: Self::parse_transaction_state(line[11]),
311 from_transaction_check: line[12].parse().unwrap_or(false),
312 ..Default::default()
313 };
314
315 let ctx = TraceContext {
316 trace_type: Some(TraceType::EndTransaction),
317 time_stamp: line[1].parse().ok()?,
318 region_id: CheetahString::from_slice(line[2]),
319 group_name: CheetahString::from_slice(line[3]),
320 trace_beans: Some(vec![bean]),
321 ..Default::default()
322 };
323
324 Some(ctx)
325 }
326
327 #[inline]
328 fn decode_recall_context(line: &[&str]) -> Option<TraceContext> {
329 if line.len() < 7 {
330 return None;
331 }
332
333 let bean = TraceBean {
334 topic: CheetahString::from_slice(line[4]),
335 msg_id: CheetahString::from_slice(line[5]),
336 ..Default::default()
337 };
338
339 let ctx = TraceContext {
340 trace_type: Some(TraceType::Recall),
341 time_stamp: line[1].parse().ok()?,
342 region_id: CheetahString::from_slice(line[2]),
343 group_name: CheetahString::from_slice(line[3]),
344 is_success: line[6].parse().unwrap_or(false),
345 trace_beans: Some(vec![bean]),
346 ..Default::default()
347 };
348
349 Some(ctx)
350 }
351
352 #[inline]
355 fn encode_pub_context(ctx: &TraceContext, bean: &TraceBean, sb: &mut String) {
356 sb.push_str("Pub");
357 sb.push(TraceConstants::CONTENT_SPLITOR);
358 sb.push_str(&ctx.time_stamp.to_string());
359 sb.push(TraceConstants::CONTENT_SPLITOR);
360 sb.push_str(&ctx.region_id);
361 sb.push(TraceConstants::CONTENT_SPLITOR);
362 sb.push_str(&ctx.group_name);
363 sb.push(TraceConstants::CONTENT_SPLITOR);
364 sb.push_str(&bean.topic);
365 sb.push(TraceConstants::CONTENT_SPLITOR);
366 sb.push_str(&bean.msg_id);
367 sb.push(TraceConstants::CONTENT_SPLITOR);
368 sb.push_str(&bean.tags);
369 sb.push(TraceConstants::CONTENT_SPLITOR);
370 sb.push_str(&bean.keys);
371 sb.push(TraceConstants::CONTENT_SPLITOR);
372 sb.push_str(&bean.store_host);
373 sb.push(TraceConstants::CONTENT_SPLITOR);
374 sb.push_str(&bean.body_length.to_string());
375 sb.push(TraceConstants::CONTENT_SPLITOR);
376 sb.push_str(&ctx.cost_time.to_string());
377 sb.push(TraceConstants::CONTENT_SPLITOR);
378
379 if let Some(msg_type) = &bean.msg_type {
380 sb.push_str(&Self::message_type_to_ordinal(*msg_type).to_string());
381 } else {
382 sb.push('0');
383 }
384 sb.push(TraceConstants::CONTENT_SPLITOR);
385 sb.push_str(&bean.offset_msg_id);
386 sb.push(TraceConstants::CONTENT_SPLITOR);
387 sb.push_str(if ctx.is_success { "true" } else { "false" });
388 sb.push(TraceConstants::CONTENT_SPLITOR);
389 sb.push_str(&bean.client_host);
390 sb.push(TraceConstants::FIELD_SPLITOR);
391 }
392
393 #[inline]
394 fn encode_sub_before_context(ctx: &TraceContext, beans: &[TraceBean], sb: &mut String) {
395 for bean in beans {
396 sb.push_str("SubBefore");
397 sb.push(TraceConstants::CONTENT_SPLITOR);
398 sb.push_str(&ctx.time_stamp.to_string());
399 sb.push(TraceConstants::CONTENT_SPLITOR);
400 sb.push_str(&ctx.region_id);
401 sb.push(TraceConstants::CONTENT_SPLITOR);
402 sb.push_str(&ctx.group_name);
403 sb.push(TraceConstants::CONTENT_SPLITOR);
404 sb.push_str(&ctx.request_id);
405 sb.push(TraceConstants::CONTENT_SPLITOR);
406 sb.push_str(&bean.msg_id);
407 sb.push(TraceConstants::CONTENT_SPLITOR);
408 sb.push_str(&bean.retry_times.to_string());
409 sb.push(TraceConstants::CONTENT_SPLITOR);
410 sb.push_str(&bean.keys);
411 sb.push(TraceConstants::FIELD_SPLITOR);
412 }
413 }
414
415 #[inline]
416 fn encode_sub_after_context(ctx: &TraceContext, beans: &[TraceBean], sb: &mut String) {
417 for bean in beans {
418 sb.push_str("SubAfter");
419 sb.push(TraceConstants::CONTENT_SPLITOR);
420 sb.push_str(&ctx.request_id);
421 sb.push(TraceConstants::CONTENT_SPLITOR);
422 sb.push_str(&bean.msg_id);
423 sb.push(TraceConstants::CONTENT_SPLITOR);
424 sb.push_str(&ctx.cost_time.to_string());
425 sb.push(TraceConstants::CONTENT_SPLITOR);
426 sb.push_str(if ctx.is_success { "true" } else { "false" });
427 sb.push(TraceConstants::CONTENT_SPLITOR);
428 sb.push_str(&bean.keys);
429 sb.push(TraceConstants::CONTENT_SPLITOR);
430 sb.push_str(&ctx.context_code.to_string());
431 sb.push(TraceConstants::CONTENT_SPLITOR);
432
433 if !matches!(ctx.access_channel, Some(AccessChannel::Cloud)) {
435 sb.push_str(&ctx.time_stamp.to_string());
436 sb.push(TraceConstants::CONTENT_SPLITOR);
437 sb.push_str(&ctx.group_name);
438 }
439 sb.push(TraceConstants::FIELD_SPLITOR);
440 }
441 }
442
443 #[inline]
444 fn encode_end_transaction_context(ctx: &TraceContext, bean: &TraceBean, sb: &mut String) {
445 sb.push_str("EndTransaction");
446 sb.push(TraceConstants::CONTENT_SPLITOR);
447 sb.push_str(&ctx.time_stamp.to_string());
448 sb.push(TraceConstants::CONTENT_SPLITOR);
449 sb.push_str(&ctx.region_id);
450 sb.push(TraceConstants::CONTENT_SPLITOR);
451 sb.push_str(&ctx.group_name);
452 sb.push(TraceConstants::CONTENT_SPLITOR);
453 sb.push_str(&bean.topic);
454 sb.push(TraceConstants::CONTENT_SPLITOR);
455 sb.push_str(&bean.msg_id);
456 sb.push(TraceConstants::CONTENT_SPLITOR);
457 sb.push_str(&bean.tags);
458 sb.push(TraceConstants::CONTENT_SPLITOR);
459 sb.push_str(&bean.keys);
460 sb.push(TraceConstants::CONTENT_SPLITOR);
461 sb.push_str(&bean.store_host);
462 sb.push(TraceConstants::CONTENT_SPLITOR);
463
464 if let Some(msg_type) = &bean.msg_type {
465 sb.push_str(&Self::message_type_to_ordinal(*msg_type).to_string());
466 } else {
467 sb.push('0');
468 }
469 sb.push(TraceConstants::CONTENT_SPLITOR);
470
471 if let Some(ref transaction_id) = bean.transaction_id {
472 sb.push_str(transaction_id);
473 }
474 sb.push(TraceConstants::CONTENT_SPLITOR);
475
476 if let Some(ref state) = bean.transaction_state {
477 sb.push_str(&state.to_string());
478 }
479 sb.push(TraceConstants::CONTENT_SPLITOR);
480 sb.push_str(if bean.from_transaction_check { "true" } else { "false" });
481 sb.push(TraceConstants::FIELD_SPLITOR);
482 }
483
484 #[inline]
485 fn encode_recall_context(ctx: &TraceContext, bean: &TraceBean, sb: &mut String) {
486 sb.push_str("Recall");
487 sb.push(TraceConstants::CONTENT_SPLITOR);
488 sb.push_str(&ctx.time_stamp.to_string());
489 sb.push(TraceConstants::CONTENT_SPLITOR);
490 sb.push_str(&ctx.region_id);
491 sb.push(TraceConstants::CONTENT_SPLITOR);
492 sb.push_str(&ctx.group_name);
493 sb.push(TraceConstants::CONTENT_SPLITOR);
494 sb.push_str(&bean.topic);
495 sb.push(TraceConstants::CONTENT_SPLITOR);
496 sb.push_str(&bean.msg_id);
497 sb.push(TraceConstants::CONTENT_SPLITOR);
498 sb.push_str(if ctx.is_success { "true" } else { "false" });
499 sb.push(TraceConstants::FIELD_SPLITOR);
500 }
501
502 #[inline]
505 fn message_type_from_ordinal(ordinal: usize) -> Option<MessageType> {
506 match ordinal {
507 0 => Some(MessageType::NormalMsg),
508 1 => Some(MessageType::TransMsgHalf),
509 2 => Some(MessageType::TransMsgCommit),
510 3 => Some(MessageType::DelayMsg),
511 4 => Some(MessageType::OrderMsg),
512 _ => None,
513 }
514 }
515
516 #[inline]
517 fn message_type_to_ordinal(msg_type: MessageType) -> usize {
518 match msg_type {
519 MessageType::NormalMsg => 0,
520 MessageType::TransMsgHalf => 1,
521 MessageType::TransMsgCommit => 2,
522 MessageType::DelayMsg => 3,
523 MessageType::OrderMsg => 4,
524 }
525 }
526
527 #[inline]
528 fn parse_transaction_state(state_str: &str) -> Option<LocalTransactionState> {
529 match state_str {
530 "COMMIT_MESSAGE" => Some(LocalTransactionState::CommitMessage),
531 "ROLLBACK_MESSAGE" => Some(LocalTransactionState::RollbackMessage),
532 "UNKNOW" | "UNKNOWN" => Some(LocalTransactionState::Unknown),
533 _ => None,
534 }
535 }
536}
537
538#[cfg(test)]
539mod tests {
540 use cheetah_string::CheetahString;
541 use rocketmq_common::common::message::message_enum::MessageType;
542
543 use crate::base::access_channel::AccessChannel;
544 use crate::producer::local_transaction_state::LocalTransactionState;
545 use crate::trace::trace_bean::TraceBean;
546 use crate::trace::trace_context::TraceContext;
547 use crate::trace::trace_type::TraceType;
548
549 use super::*;
550
551 #[test]
552 fn test_decode_empty_trace_data() {
553 let result = TraceDataEncoder::decoder_from_trace_data_string("");
554 assert!(result.is_empty());
555 }
556
557 #[test]
558 fn test_encode_and_decode_pub_context() {
559 let bean = TraceBean {
560 topic: CheetahString::from("test-topic"),
561 msg_id: CheetahString::from("msg-001"),
562 tags: CheetahString::from("tagA"),
563 keys: CheetahString::from("key1 key2"),
564 store_host: CheetahString::from("127.0.0.1:10911"),
565 body_length: 1024,
566 msg_type: Some(MessageType::NormalMsg),
567 offset_msg_id: CheetahString::from("offset-001"),
568 client_host: CheetahString::from("192.168.1.1:12345"),
569 ..Default::default()
570 };
571
572 let ctx = TraceContext {
573 trace_type: Some(TraceType::Pub),
574 time_stamp: 1234567890,
575 region_id: CheetahString::from("us-east-1"),
576 group_name: CheetahString::from("test-group"),
577 cost_time: 100,
578 is_success: true,
579 trace_beans: Some(vec![bean]),
580 ..Default::default()
581 };
582
583 let transfer_bean = TraceDataEncoder::encoder_from_context_bean(&ctx).unwrap();
584 let decoded_contexts = TraceDataEncoder::decoder_from_trace_data_string(&transfer_bean.trans_data);
585
586 assert_eq!(decoded_contexts.len(), 1);
587 let decoded = &decoded_contexts[0];
588 assert_eq!(decoded.trace_type, Some(TraceType::Pub));
589 assert_eq!(decoded.time_stamp, 1234567890);
590 assert_eq!(decoded.region_id, CheetahString::from("us-east-1"));
591 assert_eq!(decoded.group_name, CheetahString::from("test-group"));
592 assert_eq!(decoded.cost_time, 100);
593 assert!(decoded.is_success);
594 assert!(decoded.trace_beans.is_some());
595 let decoded_bean = &decoded.trace_beans.as_ref().unwrap()[0];
596 assert_eq!(decoded_bean.topic, CheetahString::from("test-topic"));
597 assert_eq!(decoded_bean.msg_id, CheetahString::from("msg-001"));
598 assert_eq!(decoded_bean.tags, CheetahString::from("tagA"));
599 assert_eq!(decoded_bean.keys, CheetahString::from("key1 key2"));
600 assert_eq!(decoded_bean.store_host, CheetahString::from("127.0.0.1:10911"));
601 assert_eq!(decoded_bean.body_length, 1024);
602 assert_eq!(decoded_bean.msg_type, Some(MessageType::NormalMsg));
603 assert_eq!(decoded_bean.offset_msg_id, CheetahString::from("offset-001"));
604 assert_eq!(decoded_bean.client_host, CheetahString::from("192.168.1.1:12345"));
605 }
606
607 #[test]
608 fn test_encode_and_decode_sub_before_context() {
609 let bean = TraceBean {
610 msg_id: CheetahString::from("msg-002"),
611 retry_times: 2,
612 keys: CheetahString::from("key3"),
613 ..Default::default()
614 };
615
616 let ctx = TraceContext {
617 trace_type: Some(TraceType::SubBefore),
618 time_stamp: 9876543210,
619 region_id: CheetahString::from("eu-west-1"),
620 group_name: CheetahString::from("consumer-group"),
621 request_id: CheetahString::from("req-123"),
622 trace_beans: Some(vec![bean]),
623 ..Default::default()
624 };
625
626 let transfer_bean = TraceDataEncoder::encoder_from_context_bean(&ctx).unwrap();
627 let decoded_contexts = TraceDataEncoder::decoder_from_trace_data_string(&transfer_bean.trans_data);
628
629 assert_eq!(decoded_contexts.len(), 1);
630 let decoded = &decoded_contexts[0];
631 assert_eq!(decoded.trace_type, Some(TraceType::SubBefore));
632 assert_eq!(decoded.time_stamp, 9876543210);
633 assert_eq!(decoded.region_id, CheetahString::from("eu-west-1"));
634 assert_eq!(decoded.group_name, CheetahString::from("consumer-group"));
635 assert_eq!(decoded.request_id, CheetahString::from("req-123"));
636 assert!(decoded.trace_beans.is_some());
637 let decoded_bean = &decoded.trace_beans.as_ref().unwrap()[0];
638 assert_eq!(decoded_bean.msg_id, CheetahString::from("msg-002"));
639 assert_eq!(decoded_bean.retry_times, 2);
640 assert_eq!(decoded_bean.keys, CheetahString::from("key3"));
641 }
642
643 #[test]
644 fn test_encode_and_decode_sub_after_context() {
645 let bean = TraceBean {
646 msg_id: CheetahString::from("msg-003"),
647 keys: CheetahString::from("key4 key5"),
648 ..Default::default()
649 };
650
651 let ctx = TraceContext {
652 trace_type: Some(TraceType::SubAfter),
653 request_id: CheetahString::from("req-456"),
654 cost_time: 200,
655 is_success: false,
656 context_code: 1,
657 time_stamp: 1122334455,
658 group_name: CheetahString::from("consumer-group-2"),
659 trace_beans: Some(vec![bean]),
660 ..Default::default()
661 };
662
663 let transfer_bean = TraceDataEncoder::encoder_from_context_bean(&ctx).unwrap();
664 let decoded_contexts = TraceDataEncoder::decoder_from_trace_data_string(&transfer_bean.trans_data);
665
666 assert_eq!(decoded_contexts.len(), 1);
667 let decoded = &decoded_contexts[0];
668 assert_eq!(decoded.trace_type, Some(TraceType::SubAfter));
669 assert_eq!(decoded.request_id, CheetahString::from("req-456"));
670 assert_eq!(decoded.cost_time, 200);
671 assert!(!decoded.is_success);
672 assert_eq!(decoded.context_code, 1);
673 assert_eq!(decoded.time_stamp, 1122334455);
674 assert_eq!(decoded.group_name, CheetahString::from("consumer-group-2"));
675 assert!(decoded.trace_beans.is_some());
676 let decoded_bean = &decoded.trace_beans.as_ref().unwrap()[0];
677 assert_eq!(decoded_bean.msg_id, CheetahString::from("msg-003"));
678 assert_eq!(decoded_bean.keys, CheetahString::from("key4 key5"));
679 }
680
681 #[test]
682 fn test_encode_and_decode_sub_after_cloud_channel() {
683 let bean = TraceBean {
684 msg_id: CheetahString::from("msg-004"),
685 keys: CheetahString::from("key6"),
686 ..Default::default()
687 };
688
689 let ctx = TraceContext {
690 trace_type: Some(TraceType::SubAfter),
691 request_id: CheetahString::from("req-789"),
692 cost_time: 150,
693 is_success: true,
694 context_code: 0,
695 access_channel: Some(AccessChannel::Cloud),
696 time_stamp: 1122334455,
697 group_name: CheetahString::from("consumer-group-cloud"),
698 trace_beans: Some(vec![bean]),
699 ..Default::default()
700 };
701
702 let transfer_bean = TraceDataEncoder::encoder_from_context_bean(&ctx).unwrap();
703 let decoded_contexts = TraceDataEncoder::decoder_from_trace_data_string(&transfer_bean.trans_data);
704
705 assert_eq!(decoded_contexts.len(), 1);
706 let decoded = &decoded_contexts[0];
707 assert_eq!(decoded.trace_type, Some(TraceType::SubAfter));
708 assert_eq!(decoded.request_id, CheetahString::from("req-789"));
709 assert_eq!(decoded.cost_time, 150);
710 assert!(decoded.is_success);
711 assert_eq!(decoded.context_code, 0);
712 assert_eq!(decoded.time_stamp, 0);
713 assert_eq!(decoded.group_name, CheetahString::default());
714 }
715
716 #[test]
717 fn test_encode_and_decode_end_transaction_context() {
718 let bean = TraceBean {
719 topic: CheetahString::from("tx-topic"),
720 msg_id: CheetahString::from("msg-tx-001"),
721 tags: CheetahString::from("tx-tag"),
722 keys: CheetahString::from("tx-key"),
723 store_host: CheetahString::from("127.0.0.1:10911"),
724 msg_type: Some(MessageType::TransMsgCommit),
725 transaction_id: Some(CheetahString::from("tx-id-001")),
726 transaction_state: Some(LocalTransactionState::CommitMessage),
727 from_transaction_check: true,
728 ..Default::default()
729 };
730
731 let ctx = TraceContext {
732 trace_type: Some(TraceType::EndTransaction),
733 time_stamp: 1234567890123,
734 region_id: CheetahString::from("ap-southeast-1"),
735 group_name: CheetahString::from("tx-group"),
736 trace_beans: Some(vec![bean]),
737 ..Default::default()
738 };
739
740 let transfer_bean = TraceDataEncoder::encoder_from_context_bean(&ctx).unwrap();
741 let decoded_contexts = TraceDataEncoder::decoder_from_trace_data_string(&transfer_bean.trans_data);
742
743 assert_eq!(decoded_contexts.len(), 1);
744 let decoded = &decoded_contexts[0];
745 assert_eq!(decoded.trace_type, Some(TraceType::EndTransaction));
746 assert_eq!(decoded.time_stamp, 1234567890123);
747 assert_eq!(decoded.region_id, CheetahString::from("ap-southeast-1"));
748 assert_eq!(decoded.group_name, CheetahString::from("tx-group"));
749 assert!(decoded.trace_beans.is_some());
750 let decoded_bean = &decoded.trace_beans.as_ref().unwrap()[0];
751 assert_eq!(decoded_bean.topic, CheetahString::from("tx-topic"));
752 assert_eq!(decoded_bean.msg_id, CheetahString::from("msg-tx-001"));
753 assert_eq!(decoded_bean.tags, CheetahString::from("tx-tag"));
754 assert_eq!(decoded_bean.keys, CheetahString::from("tx-key"));
755 assert_eq!(decoded_bean.store_host, CheetahString::from("127.0.0.1:10911"));
756 assert_eq!(decoded_bean.msg_type, Some(MessageType::TransMsgCommit));
757 assert_eq!(decoded_bean.transaction_id, Some(CheetahString::from("tx-id-001")));
758 assert_eq!(
759 decoded_bean.transaction_state,
760 Some(LocalTransactionState::CommitMessage)
761 );
762 assert!(decoded_bean.from_transaction_check);
763 }
764
765 #[test]
766 fn test_encode_and_decode_recall_context() {
767 let bean = TraceBean {
768 topic: CheetahString::from("recall-topic"),
769 msg_id: CheetahString::from("msg-recall-001"),
770 ..Default::default()
771 };
772
773 let ctx = TraceContext {
774 trace_type: Some(TraceType::Recall),
775 time_stamp: 9876543210987,
776 region_id: CheetahString::from("sa-east-1"),
777 group_name: CheetahString::from("recall-group"),
778 is_success: true,
779 trace_beans: Some(vec![bean]),
780 ..Default::default()
781 };
782
783 let transfer_bean = TraceDataEncoder::encoder_from_context_bean(&ctx).unwrap();
784 let decoded_contexts = TraceDataEncoder::decoder_from_trace_data_string(&transfer_bean.trans_data);
785
786 assert_eq!(decoded_contexts.len(), 1);
787 let decoded = &decoded_contexts[0];
788 assert_eq!(decoded.trace_type, Some(TraceType::Recall));
789 assert_eq!(decoded.time_stamp, 9876543210987);
790 assert_eq!(decoded.region_id, CheetahString::from("sa-east-1"));
791 assert_eq!(decoded.group_name, CheetahString::from("recall-group"));
792 assert!(decoded.is_success);
793 assert!(decoded.trace_beans.is_some());
794 let decoded_bean = &decoded.trace_beans.as_ref().unwrap()[0];
795 assert_eq!(decoded_bean.topic, CheetahString::from("recall-topic"));
796 assert_eq!(decoded_bean.msg_id, CheetahString::from("msg-recall-001"));
797 }
798
799 #[test]
800 fn test_encode_invalid_context() {
801 let ctx_no_beans = TraceContext {
802 trace_type: Some(TraceType::Pub),
803 trace_beans: None,
804 ..Default::default()
805 };
806 assert!(TraceDataEncoder::encoder_from_context_bean(&ctx_no_beans).is_none());
807
808 let ctx_empty_beans = TraceContext {
809 trace_type: Some(TraceType::Pub),
810 trace_beans: Some(vec![]),
811 ..Default::default()
812 };
813 assert!(TraceDataEncoder::encoder_from_context_bean(&ctx_empty_beans).is_none());
814 let ctx_no_trace_type = TraceContext {
815 trace_type: None,
816 trace_beans: Some(vec![TraceBean::default()]),
817 ..Default::default()
818 };
819 assert!(TraceDataEncoder::encoder_from_context_bean(&ctx_no_trace_type).is_none());
820 }
821
822 #[test]
823 fn test_transfer_bean_keys() {
824 let bean = TraceBean {
825 msg_id: CheetahString::from("msg-key-test"),
826 keys: CheetahString::from("key1 key2 key3"),
827 ..Default::default()
828 };
829
830 let ctx = TraceContext {
831 trace_type: Some(TraceType::Pub),
832 trace_beans: Some(vec![bean]),
833 ..Default::default()
834 };
835
836 let transfer_bean = TraceDataEncoder::encoder_from_context_bean(&ctx).unwrap();
837 assert!(transfer_bean.trans_key.contains(&CheetahString::from("msg-key-test")));
838 assert!(transfer_bean.trans_key.contains(&CheetahString::from("key1")));
839 assert!(transfer_bean.trans_key.contains(&CheetahString::from("key2")));
840 assert!(transfer_bean.trans_key.contains(&CheetahString::from("key3")));
841 }
842
843 #[test]
844 fn test_message_type_ordinal_conversion() {
845 assert_eq!(TraceDataEncoder::message_type_to_ordinal(MessageType::NormalMsg), 0);
846 assert_eq!(TraceDataEncoder::message_type_to_ordinal(MessageType::TransMsgHalf), 1);
847 assert_eq!(
848 TraceDataEncoder::message_type_to_ordinal(MessageType::TransMsgCommit),
849 2
850 );
851 assert_eq!(TraceDataEncoder::message_type_to_ordinal(MessageType::DelayMsg), 3);
852 assert_eq!(TraceDataEncoder::message_type_to_ordinal(MessageType::OrderMsg), 4);
853
854 assert_eq!(
855 TraceDataEncoder::message_type_from_ordinal(0),
856 Some(MessageType::NormalMsg)
857 );
858 assert_eq!(
859 TraceDataEncoder::message_type_from_ordinal(1),
860 Some(MessageType::TransMsgHalf)
861 );
862 assert_eq!(
863 TraceDataEncoder::message_type_from_ordinal(2),
864 Some(MessageType::TransMsgCommit)
865 );
866 assert_eq!(
867 TraceDataEncoder::message_type_from_ordinal(3),
868 Some(MessageType::DelayMsg)
869 );
870 assert_eq!(
871 TraceDataEncoder::message_type_from_ordinal(4),
872 Some(MessageType::OrderMsg)
873 );
874 assert_eq!(TraceDataEncoder::message_type_from_ordinal(99), None);
875 }
876
877 #[test]
878 fn test_parse_transaction_state() {
879 assert_eq!(
880 TraceDataEncoder::parse_transaction_state("COMMIT_MESSAGE"),
881 Some(LocalTransactionState::CommitMessage)
882 );
883 assert_eq!(
884 TraceDataEncoder::parse_transaction_state("ROLLBACK_MESSAGE"),
885 Some(LocalTransactionState::RollbackMessage)
886 );
887 assert_eq!(
888 TraceDataEncoder::parse_transaction_state("UNKNOW"),
889 Some(LocalTransactionState::Unknown)
890 );
891 assert_eq!(
892 TraceDataEncoder::parse_transaction_state("UNKNOWN"),
893 Some(LocalTransactionState::Unknown)
894 );
895 assert_eq!(TraceDataEncoder::parse_transaction_state("INVALID"), None);
896 }
897}