rocketmq_remoting/protocol/admin/
topic_stats_table.rs1use std::collections::HashMap;
16
17use rocketmq_common::common::message::message_queue::MessageQueue;
18use serde::Deserialize;
19use serde::Serialize;
20use serde_json_any_key::*;
21
22use crate::protocol::admin::topic_offset::TopicOffset;
23
24#[derive(Debug, Clone, Serialize, Deserialize, Default)]
25#[serde(rename_all = "camelCase")]
26pub struct TopicStatsTable {
27 #[serde(with = "any_key_map")]
28 offset_table: HashMap<MessageQueue, TopicOffset>,
29}
30
31impl TopicStatsTable {
32 pub fn new() -> Self {
33 Self {
34 offset_table: HashMap::new(),
35 }
36 }
37
38 pub fn get_offset_table(&self) -> &HashMap<MessageQueue, TopicOffset> {
39 &self.offset_table
40 }
41
42 pub fn get_offset_table_mut(&mut self) -> &mut HashMap<MessageQueue, TopicOffset> {
43 &mut self.offset_table
44 }
45
46 pub fn into_offset_table(self) -> HashMap<MessageQueue, TopicOffset> {
47 self.offset_table
48 }
49
50 pub fn set_offset_table(&mut self, offset_table: HashMap<MessageQueue, TopicOffset>) {
51 self.offset_table = offset_table;
52 }
53}
54
55#[cfg(test)]
56mod tests {
57 use super::*;
58
59 fn create_custom_mq(topic: &str, queue_id: i32) -> MessageQueue {
60 let json = format!(
61 r#"{{"topic": "{}", "brokerName": "default", "queueId": {}}}"#,
62 topic, queue_id
63 );
64 serde_json::from_str(&json).unwrap()
65 }
66
67 #[test]
68 fn test_topic_stats_table_initialization() {
69 let table = TopicStatsTable::new();
70 assert!(table.get_offset_table().is_empty());
71 }
72
73 #[test]
74 fn test_message_queue_default_new() {
75 let mq = MessageQueue::new();
76 let json = serde_json::to_value(&mq).unwrap();
77 assert_eq!(json["queueId"], 0);
78 assert_eq!(json["topic"], "");
79 }
80
81 #[test]
82 fn test_set_and_get_offsets() {
83 let mut table = TopicStatsTable::new();
84 let mut map = HashMap::new();
85
86 let mq = create_custom_mq("test_topic", 5);
87 let mut offset = TopicOffset::new();
88 offset.set_min_offset(100);
89 offset.set_max_offset(200);
90
91 map.insert(mq.clone(), offset);
92 table.set_offset_table(map);
93 let result_table = table.get_offset_table();
94 assert_eq!(result_table.len(), 1);
95
96 let retrieved_offset = result_table.get(&mq).expect("MQ should exist in map");
97 assert_eq!(retrieved_offset.get_min_offset(), 100);
98 assert_eq!(retrieved_offset.get_max_offset(), 200);
99 }
100
101 #[test]
102 fn test_serialization_cycle_with_any_key() {
103 let mut table = TopicStatsTable::new();
104 let mut map = HashMap::new();
105
106 let mq = create_custom_mq("order_topic", 1);
107 let mut offset = TopicOffset::new();
108 offset.set_last_update_timestamp(11111111);
109
110 map.insert(mq, offset);
111 table.set_offset_table(map);
112
113 let serialized = serde_json::to_string(&table).expect("Serialization failed");
114
115 assert!(serialized.contains("offsetTable"));
116
117 let deserialized: TopicStatsTable = serde_json::from_str(&serialized).expect("Deserialization failed");
118 assert_eq!(deserialized.get_offset_table().len(), 1);
119
120 let offset_val = deserialized.get_offset_table().values().next().unwrap().clone();
121 assert_eq!(offset_val.get_last_update_timestamp(), 11111111);
122 }
123
124 #[test]
125 fn test_topic_offset_display_format() {
126 let mut offset = TopicOffset::new();
127 offset.set_min_offset(5);
128 offset.set_max_offset(15);
129
130 let output = format!("{}", offset);
131 let expected = "TopicOffset{min_offset=5, max_offset=15, last_update_timestamp=0}";
132 assert_eq!(output, expected);
133 }
134}