Skip to main content

rocketmq_remoting/protocol/admin/
topic_stats_table.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use rocketmq_common::common::message::message_queue::MessageQueue;
18use serde::Deserialize;
19use serde::Serialize;
20use serde_json_any_key::*;
21
22use crate::protocol::admin::topic_offset::TopicOffset;
23
24#[derive(Debug, Clone, Serialize, Deserialize, Default)]
25#[serde(rename_all = "camelCase")]
26pub struct TopicStatsTable {
27    #[serde(with = "any_key_map")]
28    offset_table: HashMap<MessageQueue, TopicOffset>,
29}
30
31impl TopicStatsTable {
32    pub fn new() -> Self {
33        Self {
34            offset_table: HashMap::new(),
35        }
36    }
37
38    pub fn get_offset_table(&self) -> &HashMap<MessageQueue, TopicOffset> {
39        &self.offset_table
40    }
41
42    pub fn get_offset_table_mut(&mut self) -> &mut HashMap<MessageQueue, TopicOffset> {
43        &mut self.offset_table
44    }
45
46    pub fn into_offset_table(self) -> HashMap<MessageQueue, TopicOffset> {
47        self.offset_table
48    }
49
50    pub fn set_offset_table(&mut self, offset_table: HashMap<MessageQueue, TopicOffset>) {
51        self.offset_table = offset_table;
52    }
53}
54
55#[cfg(test)]
56mod tests {
57    use super::*;
58
59    fn create_custom_mq(topic: &str, queue_id: i32) -> MessageQueue {
60        let json = format!(
61            r#"{{"topic": "{}", "brokerName": "default", "queueId": {}}}"#,
62            topic, queue_id
63        );
64        serde_json::from_str(&json).unwrap()
65    }
66
67    #[test]
68    fn test_topic_stats_table_initialization() {
69        let table = TopicStatsTable::new();
70        assert!(table.get_offset_table().is_empty());
71    }
72
73    #[test]
74    fn test_message_queue_default_new() {
75        let mq = MessageQueue::new();
76        let json = serde_json::to_value(&mq).unwrap();
77        assert_eq!(json["queueId"], 0);
78        assert_eq!(json["topic"], "");
79    }
80
81    #[test]
82    fn test_set_and_get_offsets() {
83        let mut table = TopicStatsTable::new();
84        let mut map = HashMap::new();
85
86        let mq = create_custom_mq("test_topic", 5);
87        let mut offset = TopicOffset::new();
88        offset.set_min_offset(100);
89        offset.set_max_offset(200);
90
91        map.insert(mq.clone(), offset);
92        table.set_offset_table(map);
93        let result_table = table.get_offset_table();
94        assert_eq!(result_table.len(), 1);
95
96        let retrieved_offset = result_table.get(&mq).expect("MQ should exist in map");
97        assert_eq!(retrieved_offset.get_min_offset(), 100);
98        assert_eq!(retrieved_offset.get_max_offset(), 200);
99    }
100
101    #[test]
102    fn test_serialization_cycle_with_any_key() {
103        let mut table = TopicStatsTable::new();
104        let mut map = HashMap::new();
105
106        let mq = create_custom_mq("order_topic", 1);
107        let mut offset = TopicOffset::new();
108        offset.set_last_update_timestamp(11111111);
109
110        map.insert(mq, offset);
111        table.set_offset_table(map);
112
113        let serialized = serde_json::to_string(&table).expect("Serialization failed");
114
115        assert!(serialized.contains("offsetTable"));
116
117        let deserialized: TopicStatsTable = serde_json::from_str(&serialized).expect("Deserialization failed");
118        assert_eq!(deserialized.get_offset_table().len(), 1);
119
120        let offset_val = deserialized.get_offset_table().values().next().unwrap().clone();
121        assert_eq!(offset_val.get_last_update_timestamp(), 11111111);
122    }
123
124    #[test]
125    fn test_topic_offset_display_format() {
126        let mut offset = TopicOffset::new();
127        offset.set_min_offset(5);
128        offset.set_max_offset(15);
129
130        let output = format!("{}", offset);
131        let expected = "TopicOffset{min_offset=5, max_offset=15, last_update_timestamp=0}";
132        assert_eq!(output, expected);
133    }
134}