Skip to main content

rocketmq_client_rust/trace/
trace_data_encoder.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use cheetah_string::CheetahString;
19use rocketmq_common::common::message::message_enum::MessageType;
20use rocketmq_common::common::message::MessageConst;
21
22use crate::base::access_channel::AccessChannel;
23use crate::producer::local_transaction_state::LocalTransactionState;
24use crate::trace::trace_bean::TraceBean;
25use crate::trace::trace_constants::TraceConstants;
26use crate::trace::trace_context::TraceContext;
27use crate::trace::trace_transfer_bean::TraceTransferBean;
28use crate::trace::trace_type::TraceType;
29
30/// Encodes and decodes trace data for RocketMQ message tracing.
31///
32/// Provides utilities for converting structured trace contexts into wire format strings
33/// and parsing trace data strings back into trace contexts. The encoder uses pre-allocated
34/// buffers and zero-copy string parsing to minimize allocations.
35pub struct TraceDataEncoder;
36
37impl TraceDataEncoder {
38    /// Decodes trace data from a string into structured trace contexts.
39    ///
40    /// Parses a trace data string containing one or more encoded trace records separated
41    /// by field separators. The result vector is pre-allocated based on separator count
42    /// to minimize allocations. Invalid or malformed records are silently skipped.
43    ///
44    /// Returns an empty vector if the input string is empty.
45    pub fn decoder_from_trace_data_string(trace_data: &str) -> Vec<TraceContext> {
46        // Early return for empty input
47        if trace_data.is_empty() {
48            return Vec::new();
49        }
50
51        // Pre-allocate result vector based on field separator count
52        let estimated_size = trace_data.matches(TraceConstants::FIELD_SPLITOR).count();
53        let mut res_list = Vec::with_capacity(estimated_size.max(1));
54
55        // Split by field separator
56        for context_str in trace_data.split(TraceConstants::FIELD_SPLITOR) {
57            if context_str.is_empty() {
58                continue;
59            }
60
61            let line: Vec<&str> = context_str.split(TraceConstants::CONTENT_SPLITOR).collect();
62            if line.is_empty() {
63                continue;
64            }
65
66            // Match trace type and parse accordingly
67            match line[0] {
68                "Pub" => {
69                    if let Some(ctx) = Self::decode_pub_context(&line) {
70                        res_list.push(ctx);
71                    }
72                }
73                "SubBefore" => {
74                    if let Some(ctx) = Self::decode_sub_before_context(&line) {
75                        res_list.push(ctx);
76                    }
77                }
78                "SubAfter" => {
79                    if let Some(ctx) = Self::decode_sub_after_context(&line) {
80                        res_list.push(ctx);
81                    }
82                }
83                "EndTransaction" => {
84                    if let Some(ctx) = Self::decode_end_transaction_context(&line) {
85                        res_list.push(ctx);
86                    }
87                }
88                "Recall" => {
89                    if let Some(ctx) = Self::decode_recall_context(&line) {
90                        res_list.push(ctx);
91                    }
92                }
93                _ => {} // Unknown trace type, skip
94            }
95        }
96
97        res_list
98    }
99
100    /// Encodes a trace context into a transfer bean for transmission.
101    ///
102    /// Serializes the trace context into a wire format string and extracts message keys
103    /// for indexing. The string builder is pre-allocated with 256 bytes capacity to reduce
104    /// allocations during encoding.
105    ///
106    /// Returns `None` if the context has no trace beans or an invalid trace type.
107    pub fn encoder_from_context_bean(ctx: &TraceContext) -> Option<TraceTransferBean> {
108        let trace_beans = ctx.trace_beans.as_ref()?;
109        if trace_beans.is_empty() {
110            return None;
111        }
112
113        let mut transfer_bean = TraceTransferBean::new();
114        // Pre-allocate string builder with estimated capacity
115        let mut sb = String::with_capacity(256);
116
117        match ctx.trace_type? {
118            TraceType::Pub => {
119                Self::encode_pub_context(ctx, &trace_beans[0], &mut sb);
120            }
121            TraceType::SubBefore => {
122                Self::encode_sub_before_context(ctx, trace_beans, &mut sb);
123            }
124            TraceType::SubAfter => {
125                Self::encode_sub_after_context(ctx, trace_beans, &mut sb);
126            }
127            TraceType::EndTransaction => {
128                Self::encode_end_transaction_context(ctx, &trace_beans[0], &mut sb);
129            }
130            TraceType::Recall => {
131                Self::encode_recall_context(ctx, &trace_beans[0], &mut sb);
132            }
133        }
134
135        transfer_bean.set_trans_data(CheetahString::from_string(sb));
136
137        // Extract keys from trace beans
138        for bean in trace_beans {
139            transfer_bean.add_key(bean.msg_id.clone());
140
141            if !bean.keys.is_empty() {
142                let keys: Vec<&str> = bean.keys.split(MessageConst::KEY_SEPARATOR).collect();
143                for key in keys {
144                    if !key.is_empty() {
145                        transfer_bean.add_key(CheetahString::from_slice(key));
146                    }
147                }
148            }
149        }
150
151        Some(transfer_bean)
152    }
153
154    // ==================== Decoder Helper Methods ====================
155
156    #[inline]
157    fn decode_pub_context(line: &[&str]) -> Option<TraceContext> {
158        if line.len() < 12 {
159            return None;
160        }
161
162        let time_stamp = line[1].parse().ok()?;
163        let region_id = CheetahString::from_slice(line[2]);
164        let group_name = CheetahString::from_slice(line[3]);
165        let cost_time = line[10].parse().ok()?;
166        let body_length = line[9].parse().ok()?;
167
168        // Parse message type
169        let msg_type = if let Ok(msg_type_ordinal) = line[11].parse::<usize>() {
170            Self::message_type_from_ordinal(msg_type_ordinal)
171        } else {
172            None
173        };
174
175        // Handle different line lengths for backward compatibility
176        let (is_success, offset_msg_id, client_host) = if line.len() == 13 {
177            (
178                line[12].parse().unwrap_or(true),
179                CheetahString::default(),
180                CheetahString::default(),
181            )
182        } else if line.len() == 14 {
183            (
184                line[13].parse().unwrap_or(true),
185                CheetahString::from_slice(line[12]),
186                CheetahString::default(),
187            )
188        } else if line.len() >= 15 {
189            (
190                line[13].parse().unwrap_or(true),
191                CheetahString::from_slice(line[12]),
192                CheetahString::from_slice(line[14]),
193            )
194        } else {
195            (true, CheetahString::default(), CheetahString::default())
196        };
197
198        let bean = TraceBean {
199            topic: CheetahString::from_slice(line[4]),
200            msg_id: CheetahString::from_slice(line[5]),
201            tags: CheetahString::from_slice(line[6]),
202            keys: CheetahString::from_slice(line[7]),
203            store_host: CheetahString::from_slice(line[8]),
204            body_length,
205            msg_type,
206            offset_msg_id,
207            client_host,
208            ..Default::default()
209        };
210
211        let ctx = TraceContext {
212            trace_type: Some(TraceType::Pub),
213            time_stamp,
214            region_id,
215            group_name,
216            cost_time,
217            is_success,
218            trace_beans: Some(vec![bean]),
219            ..Default::default()
220        };
221
222        Some(ctx)
223    }
224
225    #[inline]
226    fn decode_sub_before_context(line: &[&str]) -> Option<TraceContext> {
227        if line.len() < 8 {
228            return None;
229        }
230
231        let bean = TraceBean {
232            msg_id: CheetahString::from_slice(line[5]),
233            retry_times: line[6].parse().ok()?,
234            keys: CheetahString::from_slice(line[7]),
235            ..Default::default()
236        };
237
238        let ctx = TraceContext {
239            trace_type: Some(TraceType::SubBefore),
240            time_stamp: line[1].parse().ok()?,
241            region_id: CheetahString::from_slice(line[2]),
242            group_name: CheetahString::from_slice(line[3]),
243            request_id: CheetahString::from_slice(line[4]),
244            trace_beans: Some(vec![bean]),
245            ..Default::default()
246        };
247
248        Some(ctx)
249    }
250
251    #[inline]
252    fn decode_sub_after_context(line: &[&str]) -> Option<TraceContext> {
253        if line.len() < 6 {
254            return None;
255        }
256
257        let context_code = if line.len() >= 7 {
258            line[6].parse().unwrap_or(0)
259        } else {
260            0
261        };
262
263        let (time_stamp, group_name) = if line.len() >= 9 {
264            (line[7].parse().unwrap_or(0), CheetahString::from_slice(line[8]))
265        } else {
266            (0, CheetahString::default())
267        };
268
269        let bean = TraceBean {
270            msg_id: CheetahString::from_slice(line[2]),
271            keys: CheetahString::from_slice(line[5]),
272            ..Default::default()
273        };
274
275        let ctx = TraceContext {
276            trace_type: Some(TraceType::SubAfter),
277            request_id: CheetahString::from_slice(line[1]),
278            cost_time: line[3].parse().ok()?,
279            is_success: line[4].parse().unwrap_or(false),
280            context_code,
281            time_stamp,
282            group_name,
283            trace_beans: Some(vec![bean]),
284            ..Default::default()
285        };
286
287        Some(ctx)
288    }
289
290    #[inline]
291    fn decode_end_transaction_context(line: &[&str]) -> Option<TraceContext> {
292        if line.len() < 13 {
293            return None;
294        }
295
296        let msg_type = if let Ok(msg_type_ordinal) = line[9].parse::<usize>() {
297            Self::message_type_from_ordinal(msg_type_ordinal)
298        } else {
299            None
300        };
301
302        let bean = TraceBean {
303            topic: CheetahString::from_slice(line[4]),
304            msg_id: CheetahString::from_slice(line[5]),
305            tags: CheetahString::from_slice(line[6]),
306            keys: CheetahString::from_slice(line[7]),
307            store_host: CheetahString::from_slice(line[8]),
308            msg_type,
309            transaction_id: Some(CheetahString::from_slice(line[10])),
310            transaction_state: Self::parse_transaction_state(line[11]),
311            from_transaction_check: line[12].parse().unwrap_or(false),
312            ..Default::default()
313        };
314
315        let ctx = TraceContext {
316            trace_type: Some(TraceType::EndTransaction),
317            time_stamp: line[1].parse().ok()?,
318            region_id: CheetahString::from_slice(line[2]),
319            group_name: CheetahString::from_slice(line[3]),
320            trace_beans: Some(vec![bean]),
321            ..Default::default()
322        };
323
324        Some(ctx)
325    }
326
327    #[inline]
328    fn decode_recall_context(line: &[&str]) -> Option<TraceContext> {
329        if line.len() < 7 {
330            return None;
331        }
332
333        let bean = TraceBean {
334            topic: CheetahString::from_slice(line[4]),
335            msg_id: CheetahString::from_slice(line[5]),
336            ..Default::default()
337        };
338
339        let ctx = TraceContext {
340            trace_type: Some(TraceType::Recall),
341            time_stamp: line[1].parse().ok()?,
342            region_id: CheetahString::from_slice(line[2]),
343            group_name: CheetahString::from_slice(line[3]),
344            is_success: line[6].parse().unwrap_or(false),
345            trace_beans: Some(vec![bean]),
346            ..Default::default()
347        };
348
349        Some(ctx)
350    }
351
352    // ==================== Encoder Helper Methods ====================
353
354    #[inline]
355    fn encode_pub_context(ctx: &TraceContext, bean: &TraceBean, sb: &mut String) {
356        sb.push_str("Pub");
357        sb.push(TraceConstants::CONTENT_SPLITOR);
358        sb.push_str(&ctx.time_stamp.to_string());
359        sb.push(TraceConstants::CONTENT_SPLITOR);
360        sb.push_str(&ctx.region_id);
361        sb.push(TraceConstants::CONTENT_SPLITOR);
362        sb.push_str(&ctx.group_name);
363        sb.push(TraceConstants::CONTENT_SPLITOR);
364        sb.push_str(&bean.topic);
365        sb.push(TraceConstants::CONTENT_SPLITOR);
366        sb.push_str(&bean.msg_id);
367        sb.push(TraceConstants::CONTENT_SPLITOR);
368        sb.push_str(&bean.tags);
369        sb.push(TraceConstants::CONTENT_SPLITOR);
370        sb.push_str(&bean.keys);
371        sb.push(TraceConstants::CONTENT_SPLITOR);
372        sb.push_str(&bean.store_host);
373        sb.push(TraceConstants::CONTENT_SPLITOR);
374        sb.push_str(&bean.body_length.to_string());
375        sb.push(TraceConstants::CONTENT_SPLITOR);
376        sb.push_str(&ctx.cost_time.to_string());
377        sb.push(TraceConstants::CONTENT_SPLITOR);
378
379        if let Some(msg_type) = &bean.msg_type {
380            sb.push_str(&Self::message_type_to_ordinal(*msg_type).to_string());
381        } else {
382            sb.push('0');
383        }
384        sb.push(TraceConstants::CONTENT_SPLITOR);
385        sb.push_str(&bean.offset_msg_id);
386        sb.push(TraceConstants::CONTENT_SPLITOR);
387        sb.push_str(if ctx.is_success { "true" } else { "false" });
388        sb.push(TraceConstants::CONTENT_SPLITOR);
389        sb.push_str(&bean.client_host);
390        sb.push(TraceConstants::FIELD_SPLITOR);
391    }
392
393    #[inline]
394    fn encode_sub_before_context(ctx: &TraceContext, beans: &[TraceBean], sb: &mut String) {
395        for bean in beans {
396            sb.push_str("SubBefore");
397            sb.push(TraceConstants::CONTENT_SPLITOR);
398            sb.push_str(&ctx.time_stamp.to_string());
399            sb.push(TraceConstants::CONTENT_SPLITOR);
400            sb.push_str(&ctx.region_id);
401            sb.push(TraceConstants::CONTENT_SPLITOR);
402            sb.push_str(&ctx.group_name);
403            sb.push(TraceConstants::CONTENT_SPLITOR);
404            sb.push_str(&ctx.request_id);
405            sb.push(TraceConstants::CONTENT_SPLITOR);
406            sb.push_str(&bean.msg_id);
407            sb.push(TraceConstants::CONTENT_SPLITOR);
408            sb.push_str(&bean.retry_times.to_string());
409            sb.push(TraceConstants::CONTENT_SPLITOR);
410            sb.push_str(&bean.keys);
411            sb.push(TraceConstants::FIELD_SPLITOR);
412        }
413    }
414
415    #[inline]
416    fn encode_sub_after_context(ctx: &TraceContext, beans: &[TraceBean], sb: &mut String) {
417        for bean in beans {
418            sb.push_str("SubAfter");
419            sb.push(TraceConstants::CONTENT_SPLITOR);
420            sb.push_str(&ctx.request_id);
421            sb.push(TraceConstants::CONTENT_SPLITOR);
422            sb.push_str(&bean.msg_id);
423            sb.push(TraceConstants::CONTENT_SPLITOR);
424            sb.push_str(&ctx.cost_time.to_string());
425            sb.push(TraceConstants::CONTENT_SPLITOR);
426            sb.push_str(if ctx.is_success { "true" } else { "false" });
427            sb.push(TraceConstants::CONTENT_SPLITOR);
428            sb.push_str(&bean.keys);
429            sb.push(TraceConstants::CONTENT_SPLITOR);
430            sb.push_str(&ctx.context_code.to_string());
431            sb.push(TraceConstants::CONTENT_SPLITOR);
432
433            // Only add timestamp and group name if not CLOUD access channel
434            if !matches!(ctx.access_channel, Some(AccessChannel::Cloud)) {
435                sb.push_str(&ctx.time_stamp.to_string());
436                sb.push(TraceConstants::CONTENT_SPLITOR);
437                sb.push_str(&ctx.group_name);
438            }
439            sb.push(TraceConstants::FIELD_SPLITOR);
440        }
441    }
442
443    #[inline]
444    fn encode_end_transaction_context(ctx: &TraceContext, bean: &TraceBean, sb: &mut String) {
445        sb.push_str("EndTransaction");
446        sb.push(TraceConstants::CONTENT_SPLITOR);
447        sb.push_str(&ctx.time_stamp.to_string());
448        sb.push(TraceConstants::CONTENT_SPLITOR);
449        sb.push_str(&ctx.region_id);
450        sb.push(TraceConstants::CONTENT_SPLITOR);
451        sb.push_str(&ctx.group_name);
452        sb.push(TraceConstants::CONTENT_SPLITOR);
453        sb.push_str(&bean.topic);
454        sb.push(TraceConstants::CONTENT_SPLITOR);
455        sb.push_str(&bean.msg_id);
456        sb.push(TraceConstants::CONTENT_SPLITOR);
457        sb.push_str(&bean.tags);
458        sb.push(TraceConstants::CONTENT_SPLITOR);
459        sb.push_str(&bean.keys);
460        sb.push(TraceConstants::CONTENT_SPLITOR);
461        sb.push_str(&bean.store_host);
462        sb.push(TraceConstants::CONTENT_SPLITOR);
463
464        if let Some(msg_type) = &bean.msg_type {
465            sb.push_str(&Self::message_type_to_ordinal(*msg_type).to_string());
466        } else {
467            sb.push('0');
468        }
469        sb.push(TraceConstants::CONTENT_SPLITOR);
470
471        if let Some(ref transaction_id) = bean.transaction_id {
472            sb.push_str(transaction_id);
473        }
474        sb.push(TraceConstants::CONTENT_SPLITOR);
475
476        if let Some(ref state) = bean.transaction_state {
477            sb.push_str(&state.to_string());
478        }
479        sb.push(TraceConstants::CONTENT_SPLITOR);
480        sb.push_str(if bean.from_transaction_check { "true" } else { "false" });
481        sb.push(TraceConstants::FIELD_SPLITOR);
482    }
483
484    #[inline]
485    fn encode_recall_context(ctx: &TraceContext, bean: &TraceBean, sb: &mut String) {
486        sb.push_str("Recall");
487        sb.push(TraceConstants::CONTENT_SPLITOR);
488        sb.push_str(&ctx.time_stamp.to_string());
489        sb.push(TraceConstants::CONTENT_SPLITOR);
490        sb.push_str(&ctx.region_id);
491        sb.push(TraceConstants::CONTENT_SPLITOR);
492        sb.push_str(&ctx.group_name);
493        sb.push(TraceConstants::CONTENT_SPLITOR);
494        sb.push_str(&bean.topic);
495        sb.push(TraceConstants::CONTENT_SPLITOR);
496        sb.push_str(&bean.msg_id);
497        sb.push(TraceConstants::CONTENT_SPLITOR);
498        sb.push_str(if ctx.is_success { "true" } else { "false" });
499        sb.push(TraceConstants::FIELD_SPLITOR);
500    }
501
502    // ==================== Utility Methods ====================
503
504    #[inline]
505    fn message_type_from_ordinal(ordinal: usize) -> Option<MessageType> {
506        match ordinal {
507            0 => Some(MessageType::NormalMsg),
508            1 => Some(MessageType::TransMsgHalf),
509            2 => Some(MessageType::TransMsgCommit),
510            3 => Some(MessageType::DelayMsg),
511            4 => Some(MessageType::OrderMsg),
512            _ => None,
513        }
514    }
515
516    #[inline]
517    fn message_type_to_ordinal(msg_type: MessageType) -> usize {
518        match msg_type {
519            MessageType::NormalMsg => 0,
520            MessageType::TransMsgHalf => 1,
521            MessageType::TransMsgCommit => 2,
522            MessageType::DelayMsg => 3,
523            MessageType::OrderMsg => 4,
524        }
525    }
526
527    #[inline]
528    fn parse_transaction_state(state_str: &str) -> Option<LocalTransactionState> {
529        match state_str {
530            "COMMIT_MESSAGE" => Some(LocalTransactionState::CommitMessage),
531            "ROLLBACK_MESSAGE" => Some(LocalTransactionState::RollbackMessage),
532            "UNKNOW" | "UNKNOWN" => Some(LocalTransactionState::Unknown),
533            _ => None,
534        }
535    }
536}
537
538#[cfg(test)]
539mod tests {
540    use cheetah_string::CheetahString;
541    use rocketmq_common::common::message::message_enum::MessageType;
542
543    use crate::base::access_channel::AccessChannel;
544    use crate::producer::local_transaction_state::LocalTransactionState;
545    use crate::trace::trace_bean::TraceBean;
546    use crate::trace::trace_context::TraceContext;
547    use crate::trace::trace_type::TraceType;
548
549    use super::*;
550
551    #[test]
552    fn test_decode_empty_trace_data() {
553        let result = TraceDataEncoder::decoder_from_trace_data_string("");
554        assert!(result.is_empty());
555    }
556
557    #[test]
558    fn test_encode_and_decode_pub_context() {
559        let bean = TraceBean {
560            topic: CheetahString::from("test-topic"),
561            msg_id: CheetahString::from("msg-001"),
562            tags: CheetahString::from("tagA"),
563            keys: CheetahString::from("key1 key2"),
564            store_host: CheetahString::from("127.0.0.1:10911"),
565            body_length: 1024,
566            msg_type: Some(MessageType::NormalMsg),
567            offset_msg_id: CheetahString::from("offset-001"),
568            client_host: CheetahString::from("192.168.1.1:12345"),
569            ..Default::default()
570        };
571
572        let ctx = TraceContext {
573            trace_type: Some(TraceType::Pub),
574            time_stamp: 1234567890,
575            region_id: CheetahString::from("us-east-1"),
576            group_name: CheetahString::from("test-group"),
577            cost_time: 100,
578            is_success: true,
579            trace_beans: Some(vec![bean]),
580            ..Default::default()
581        };
582
583        let transfer_bean = TraceDataEncoder::encoder_from_context_bean(&ctx).unwrap();
584        let decoded_contexts = TraceDataEncoder::decoder_from_trace_data_string(&transfer_bean.trans_data);
585
586        assert_eq!(decoded_contexts.len(), 1);
587        let decoded = &decoded_contexts[0];
588        assert_eq!(decoded.trace_type, Some(TraceType::Pub));
589        assert_eq!(decoded.time_stamp, 1234567890);
590        assert_eq!(decoded.region_id, CheetahString::from("us-east-1"));
591        assert_eq!(decoded.group_name, CheetahString::from("test-group"));
592        assert_eq!(decoded.cost_time, 100);
593        assert!(decoded.is_success);
594        assert!(decoded.trace_beans.is_some());
595        let decoded_bean = &decoded.trace_beans.as_ref().unwrap()[0];
596        assert_eq!(decoded_bean.topic, CheetahString::from("test-topic"));
597        assert_eq!(decoded_bean.msg_id, CheetahString::from("msg-001"));
598        assert_eq!(decoded_bean.tags, CheetahString::from("tagA"));
599        assert_eq!(decoded_bean.keys, CheetahString::from("key1 key2"));
600        assert_eq!(decoded_bean.store_host, CheetahString::from("127.0.0.1:10911"));
601        assert_eq!(decoded_bean.body_length, 1024);
602        assert_eq!(decoded_bean.msg_type, Some(MessageType::NormalMsg));
603        assert_eq!(decoded_bean.offset_msg_id, CheetahString::from("offset-001"));
604        assert_eq!(decoded_bean.client_host, CheetahString::from("192.168.1.1:12345"));
605    }
606
607    #[test]
608    fn test_encode_and_decode_sub_before_context() {
609        let bean = TraceBean {
610            msg_id: CheetahString::from("msg-002"),
611            retry_times: 2,
612            keys: CheetahString::from("key3"),
613            ..Default::default()
614        };
615
616        let ctx = TraceContext {
617            trace_type: Some(TraceType::SubBefore),
618            time_stamp: 9876543210,
619            region_id: CheetahString::from("eu-west-1"),
620            group_name: CheetahString::from("consumer-group"),
621            request_id: CheetahString::from("req-123"),
622            trace_beans: Some(vec![bean]),
623            ..Default::default()
624        };
625
626        let transfer_bean = TraceDataEncoder::encoder_from_context_bean(&ctx).unwrap();
627        let decoded_contexts = TraceDataEncoder::decoder_from_trace_data_string(&transfer_bean.trans_data);
628
629        assert_eq!(decoded_contexts.len(), 1);
630        let decoded = &decoded_contexts[0];
631        assert_eq!(decoded.trace_type, Some(TraceType::SubBefore));
632        assert_eq!(decoded.time_stamp, 9876543210);
633        assert_eq!(decoded.region_id, CheetahString::from("eu-west-1"));
634        assert_eq!(decoded.group_name, CheetahString::from("consumer-group"));
635        assert_eq!(decoded.request_id, CheetahString::from("req-123"));
636        assert!(decoded.trace_beans.is_some());
637        let decoded_bean = &decoded.trace_beans.as_ref().unwrap()[0];
638        assert_eq!(decoded_bean.msg_id, CheetahString::from("msg-002"));
639        assert_eq!(decoded_bean.retry_times, 2);
640        assert_eq!(decoded_bean.keys, CheetahString::from("key3"));
641    }
642
643    #[test]
644    fn test_encode_and_decode_sub_after_context() {
645        let bean = TraceBean {
646            msg_id: CheetahString::from("msg-003"),
647            keys: CheetahString::from("key4 key5"),
648            ..Default::default()
649        };
650
651        let ctx = TraceContext {
652            trace_type: Some(TraceType::SubAfter),
653            request_id: CheetahString::from("req-456"),
654            cost_time: 200,
655            is_success: false,
656            context_code: 1,
657            time_stamp: 1122334455,
658            group_name: CheetahString::from("consumer-group-2"),
659            trace_beans: Some(vec![bean]),
660            ..Default::default()
661        };
662
663        let transfer_bean = TraceDataEncoder::encoder_from_context_bean(&ctx).unwrap();
664        let decoded_contexts = TraceDataEncoder::decoder_from_trace_data_string(&transfer_bean.trans_data);
665
666        assert_eq!(decoded_contexts.len(), 1);
667        let decoded = &decoded_contexts[0];
668        assert_eq!(decoded.trace_type, Some(TraceType::SubAfter));
669        assert_eq!(decoded.request_id, CheetahString::from("req-456"));
670        assert_eq!(decoded.cost_time, 200);
671        assert!(!decoded.is_success);
672        assert_eq!(decoded.context_code, 1);
673        assert_eq!(decoded.time_stamp, 1122334455);
674        assert_eq!(decoded.group_name, CheetahString::from("consumer-group-2"));
675        assert!(decoded.trace_beans.is_some());
676        let decoded_bean = &decoded.trace_beans.as_ref().unwrap()[0];
677        assert_eq!(decoded_bean.msg_id, CheetahString::from("msg-003"));
678        assert_eq!(decoded_bean.keys, CheetahString::from("key4 key5"));
679    }
680
681    #[test]
682    fn test_encode_and_decode_sub_after_cloud_channel() {
683        let bean = TraceBean {
684            msg_id: CheetahString::from("msg-004"),
685            keys: CheetahString::from("key6"),
686            ..Default::default()
687        };
688
689        let ctx = TraceContext {
690            trace_type: Some(TraceType::SubAfter),
691            request_id: CheetahString::from("req-789"),
692            cost_time: 150,
693            is_success: true,
694            context_code: 0,
695            access_channel: Some(AccessChannel::Cloud),
696            time_stamp: 1122334455,
697            group_name: CheetahString::from("consumer-group-cloud"),
698            trace_beans: Some(vec![bean]),
699            ..Default::default()
700        };
701
702        let transfer_bean = TraceDataEncoder::encoder_from_context_bean(&ctx).unwrap();
703        let decoded_contexts = TraceDataEncoder::decoder_from_trace_data_string(&transfer_bean.trans_data);
704
705        assert_eq!(decoded_contexts.len(), 1);
706        let decoded = &decoded_contexts[0];
707        assert_eq!(decoded.trace_type, Some(TraceType::SubAfter));
708        assert_eq!(decoded.request_id, CheetahString::from("req-789"));
709        assert_eq!(decoded.cost_time, 150);
710        assert!(decoded.is_success);
711        assert_eq!(decoded.context_code, 0);
712        assert_eq!(decoded.time_stamp, 0);
713        assert_eq!(decoded.group_name, CheetahString::default());
714    }
715
716    #[test]
717    fn test_encode_and_decode_end_transaction_context() {
718        let bean = TraceBean {
719            topic: CheetahString::from("tx-topic"),
720            msg_id: CheetahString::from("msg-tx-001"),
721            tags: CheetahString::from("tx-tag"),
722            keys: CheetahString::from("tx-key"),
723            store_host: CheetahString::from("127.0.0.1:10911"),
724            msg_type: Some(MessageType::TransMsgCommit),
725            transaction_id: Some(CheetahString::from("tx-id-001")),
726            transaction_state: Some(LocalTransactionState::CommitMessage),
727            from_transaction_check: true,
728            ..Default::default()
729        };
730
731        let ctx = TraceContext {
732            trace_type: Some(TraceType::EndTransaction),
733            time_stamp: 1234567890123,
734            region_id: CheetahString::from("ap-southeast-1"),
735            group_name: CheetahString::from("tx-group"),
736            trace_beans: Some(vec![bean]),
737            ..Default::default()
738        };
739
740        let transfer_bean = TraceDataEncoder::encoder_from_context_bean(&ctx).unwrap();
741        let decoded_contexts = TraceDataEncoder::decoder_from_trace_data_string(&transfer_bean.trans_data);
742
743        assert_eq!(decoded_contexts.len(), 1);
744        let decoded = &decoded_contexts[0];
745        assert_eq!(decoded.trace_type, Some(TraceType::EndTransaction));
746        assert_eq!(decoded.time_stamp, 1234567890123);
747        assert_eq!(decoded.region_id, CheetahString::from("ap-southeast-1"));
748        assert_eq!(decoded.group_name, CheetahString::from("tx-group"));
749        assert!(decoded.trace_beans.is_some());
750        let decoded_bean = &decoded.trace_beans.as_ref().unwrap()[0];
751        assert_eq!(decoded_bean.topic, CheetahString::from("tx-topic"));
752        assert_eq!(decoded_bean.msg_id, CheetahString::from("msg-tx-001"));
753        assert_eq!(decoded_bean.tags, CheetahString::from("tx-tag"));
754        assert_eq!(decoded_bean.keys, CheetahString::from("tx-key"));
755        assert_eq!(decoded_bean.store_host, CheetahString::from("127.0.0.1:10911"));
756        assert_eq!(decoded_bean.msg_type, Some(MessageType::TransMsgCommit));
757        assert_eq!(decoded_bean.transaction_id, Some(CheetahString::from("tx-id-001")));
758        assert_eq!(
759            decoded_bean.transaction_state,
760            Some(LocalTransactionState::CommitMessage)
761        );
762        assert!(decoded_bean.from_transaction_check);
763    }
764
765    #[test]
766    fn test_encode_and_decode_recall_context() {
767        let bean = TraceBean {
768            topic: CheetahString::from("recall-topic"),
769            msg_id: CheetahString::from("msg-recall-001"),
770            ..Default::default()
771        };
772
773        let ctx = TraceContext {
774            trace_type: Some(TraceType::Recall),
775            time_stamp: 9876543210987,
776            region_id: CheetahString::from("sa-east-1"),
777            group_name: CheetahString::from("recall-group"),
778            is_success: true,
779            trace_beans: Some(vec![bean]),
780            ..Default::default()
781        };
782
783        let transfer_bean = TraceDataEncoder::encoder_from_context_bean(&ctx).unwrap();
784        let decoded_contexts = TraceDataEncoder::decoder_from_trace_data_string(&transfer_bean.trans_data);
785
786        assert_eq!(decoded_contexts.len(), 1);
787        let decoded = &decoded_contexts[0];
788        assert_eq!(decoded.trace_type, Some(TraceType::Recall));
789        assert_eq!(decoded.time_stamp, 9876543210987);
790        assert_eq!(decoded.region_id, CheetahString::from("sa-east-1"));
791        assert_eq!(decoded.group_name, CheetahString::from("recall-group"));
792        assert!(decoded.is_success);
793        assert!(decoded.trace_beans.is_some());
794        let decoded_bean = &decoded.trace_beans.as_ref().unwrap()[0];
795        assert_eq!(decoded_bean.topic, CheetahString::from("recall-topic"));
796        assert_eq!(decoded_bean.msg_id, CheetahString::from("msg-recall-001"));
797    }
798
799    #[test]
800    fn test_encode_invalid_context() {
801        let ctx_no_beans = TraceContext {
802            trace_type: Some(TraceType::Pub),
803            trace_beans: None,
804            ..Default::default()
805        };
806        assert!(TraceDataEncoder::encoder_from_context_bean(&ctx_no_beans).is_none());
807
808        let ctx_empty_beans = TraceContext {
809            trace_type: Some(TraceType::Pub),
810            trace_beans: Some(vec![]),
811            ..Default::default()
812        };
813        assert!(TraceDataEncoder::encoder_from_context_bean(&ctx_empty_beans).is_none());
814        let ctx_no_trace_type = TraceContext {
815            trace_type: None,
816            trace_beans: Some(vec![TraceBean::default()]),
817            ..Default::default()
818        };
819        assert!(TraceDataEncoder::encoder_from_context_bean(&ctx_no_trace_type).is_none());
820    }
821
822    #[test]
823    fn test_transfer_bean_keys() {
824        let bean = TraceBean {
825            msg_id: CheetahString::from("msg-key-test"),
826            keys: CheetahString::from("key1 key2 key3"),
827            ..Default::default()
828        };
829
830        let ctx = TraceContext {
831            trace_type: Some(TraceType::Pub),
832            trace_beans: Some(vec![bean]),
833            ..Default::default()
834        };
835
836        let transfer_bean = TraceDataEncoder::encoder_from_context_bean(&ctx).unwrap();
837        assert!(transfer_bean.trans_key.contains(&CheetahString::from("msg-key-test")));
838        assert!(transfer_bean.trans_key.contains(&CheetahString::from("key1")));
839        assert!(transfer_bean.trans_key.contains(&CheetahString::from("key2")));
840        assert!(transfer_bean.trans_key.contains(&CheetahString::from("key3")));
841    }
842
843    #[test]
844    fn test_message_type_ordinal_conversion() {
845        assert_eq!(TraceDataEncoder::message_type_to_ordinal(MessageType::NormalMsg), 0);
846        assert_eq!(TraceDataEncoder::message_type_to_ordinal(MessageType::TransMsgHalf), 1);
847        assert_eq!(
848            TraceDataEncoder::message_type_to_ordinal(MessageType::TransMsgCommit),
849            2
850        );
851        assert_eq!(TraceDataEncoder::message_type_to_ordinal(MessageType::DelayMsg), 3);
852        assert_eq!(TraceDataEncoder::message_type_to_ordinal(MessageType::OrderMsg), 4);
853
854        assert_eq!(
855            TraceDataEncoder::message_type_from_ordinal(0),
856            Some(MessageType::NormalMsg)
857        );
858        assert_eq!(
859            TraceDataEncoder::message_type_from_ordinal(1),
860            Some(MessageType::TransMsgHalf)
861        );
862        assert_eq!(
863            TraceDataEncoder::message_type_from_ordinal(2),
864            Some(MessageType::TransMsgCommit)
865        );
866        assert_eq!(
867            TraceDataEncoder::message_type_from_ordinal(3),
868            Some(MessageType::DelayMsg)
869        );
870        assert_eq!(
871            TraceDataEncoder::message_type_from_ordinal(4),
872            Some(MessageType::OrderMsg)
873        );
874        assert_eq!(TraceDataEncoder::message_type_from_ordinal(99), None);
875    }
876
877    #[test]
878    fn test_parse_transaction_state() {
879        assert_eq!(
880            TraceDataEncoder::parse_transaction_state("COMMIT_MESSAGE"),
881            Some(LocalTransactionState::CommitMessage)
882        );
883        assert_eq!(
884            TraceDataEncoder::parse_transaction_state("ROLLBACK_MESSAGE"),
885            Some(LocalTransactionState::RollbackMessage)
886        );
887        assert_eq!(
888            TraceDataEncoder::parse_transaction_state("UNKNOW"),
889            Some(LocalTransactionState::Unknown)
890        );
891        assert_eq!(
892            TraceDataEncoder::parse_transaction_state("UNKNOWN"),
893            Some(LocalTransactionState::Unknown)
894        );
895        assert_eq!(TraceDataEncoder::parse_transaction_state("INVALID"), None);
896    }
897}