Skip to main content

rocketmq_remoting/protocol/static_topic/
topic_queue_mapping_detail.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use rocketmq_rust::ArcMut;
18use serde::Deserialize;
19use serde::Serialize;
20
21use crate::protocol::static_topic::logic_queue_mapping_item::LogicQueueMappingItem;
22use crate::protocol::static_topic::topic_queue_mapping_info::TopicQueueMappingInfo;
23
24#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
25pub struct TopicQueueMappingDetail {
26    #[serde(flatten)]
27    pub topic_queue_mapping_info: TopicQueueMappingInfo,
28
29    #[serde(rename = "hostedQueues")]
30    pub hosted_queues: Option<HashMap<i32 /* global id */, Vec<LogicQueueMappingItem>>>,
31}
32
33impl TopicQueueMappingDetail {
34    pub fn get_mapping_info(
35        mapping_detail: &TopicQueueMappingDetail,
36        global_id: i32,
37    ) -> Option<&Vec<LogicQueueMappingItem>> {
38        mapping_detail.hosted_queues.as_ref()?.get(&global_id)
39    }
40
41    pub fn compute_max_offset_from_mapping(mapping_detail: &TopicQueueMappingDetail, global_id: Option<i32>) -> i64 {
42        match Self::get_mapping_info(mapping_detail, global_id.unwrap()) {
43            Some(mapping_items) => {
44                if mapping_items.is_empty() {
45                    return -1;
46                }
47                let item = mapping_items.last().unwrap();
48                item.compute_max_static_queue_offset()
49            }
50            None => -1,
51        }
52    }
53}
54
55//impl static methods(Like java static method)
56impl TopicQueueMappingDetail {
57    pub fn clone_as_mapping_info(mapping_detail: &TopicQueueMappingDetail) -> TopicQueueMappingInfo {
58        TopicQueueMappingInfo {
59            topic: mapping_detail.topic_queue_mapping_info.topic.clone(),
60            total_queues: mapping_detail.topic_queue_mapping_info.total_queues,
61            bname: mapping_detail.topic_queue_mapping_info.bname.clone(),
62            epoch: mapping_detail.topic_queue_mapping_info.epoch,
63            ..TopicQueueMappingInfo::default()
64        }
65    }
66    pub fn put_mapping_info(
67        mut mapping_detail: ArcMut<TopicQueueMappingDetail>,
68        global_id: i32,
69        mapping_info: Vec<LogicQueueMappingItem>,
70    ) {
71        if mapping_info.is_empty() {
72            return;
73        }
74        if let Some(q_map) = &mut mapping_detail.hosted_queues {
75            q_map.insert(global_id, mapping_info);
76        }
77    }
78}
79
80#[cfg(test)]
81mod tests {
82    use super::*;
83
84    #[test]
85    fn topic_queue_mapping_detail_default() {
86        let detail = TopicQueueMappingDetail::default();
87        assert_eq!(detail.topic_queue_mapping_info, TopicQueueMappingInfo::default());
88        assert!(detail.hosted_queues.is_none());
89    }
90
91    #[test]
92    fn topic_queue_mapping_detail_serde() {
93        let mut detail = TopicQueueMappingDetail::default();
94        detail.topic_queue_mapping_info.topic = Some("test".into());
95        let mut hosted_queues = HashMap::new();
96        hosted_queues.insert(1, vec![LogicQueueMappingItem::default()]);
97        detail.hosted_queues = Some(hosted_queues);
98
99        let json = serde_json::to_string(&detail).unwrap();
100        let expected = r#"{"topic":"test","scope":"__global__","totalQueues":0,"bname":null,"epoch":0,"dirty":false,"currIdMap":null,"hostedQueues":{"1":[{"gen":0,"queueId":0,"bname":null,"logicOffset":0,"startOffset":0,"endOffset":-1,"timeOfStart":-1,"timeOfEnd":-1}]}}"#;
101        assert_eq!(json, expected);
102
103        let deserialized: TopicQueueMappingDetail = serde_json::from_str(&json).unwrap();
104        assert_eq!(detail, deserialized);
105    }
106
107    #[test]
108    fn get_mapping_info() {
109        let mut detail = TopicQueueMappingDetail::default();
110        let items = vec![LogicQueueMappingItem::default()];
111        let mut hosted_queues = HashMap::new();
112        hosted_queues.insert(1, items.clone());
113        detail.hosted_queues = Some(hosted_queues);
114
115        assert_eq!(TopicQueueMappingDetail::get_mapping_info(&detail, 1).unwrap(), &items);
116        assert!(TopicQueueMappingDetail::get_mapping_info(&detail, 2).is_none());
117    }
118
119    #[test]
120    fn compute_max_offset_from_mapping() {
121        let mut detail = TopicQueueMappingDetail::default();
122        let mut items = vec![LogicQueueMappingItem::default()];
123        items[0].logic_offset = 100;
124        items[0].start_offset = 50;
125        items[0].end_offset = 150;
126        // max = logic_offset + end_offset - start_offset = 100 + 150 - 50 = 200
127
128        let mut hosted_queues = HashMap::new();
129        hosted_queues.insert(1, items);
130        detail.hosted_queues = Some(hosted_queues);
131
132        assert_eq!(
133            TopicQueueMappingDetail::compute_max_offset_from_mapping(&detail, Some(1)),
134            200
135        );
136
137        // Update end_offset to be invalid (less than start_offset)
138        detail.hosted_queues.as_mut().unwrap().get_mut(&1).unwrap()[0].end_offset = 20;
139        // max should be logic_offset = 100
140        assert_eq!(
141            TopicQueueMappingDetail::compute_max_offset_from_mapping(&detail, Some(1)),
142            100
143        );
144
145        // non-existent global_id
146        assert_eq!(
147            TopicQueueMappingDetail::compute_max_offset_from_mapping(&detail, Some(2)),
148            -1
149        );
150    }
151
152    #[test]
153    fn clone_as_mapping_info() {
154        let detail = TopicQueueMappingDetail {
155            topic_queue_mapping_info: TopicQueueMappingInfo {
156                topic: Some("test_topic".into()),
157                total_queues: 8,
158                bname: Some("broker_a".into()),
159                epoch: 12345,
160                ..TopicQueueMappingInfo::default()
161            },
162            ..Default::default()
163        };
164
165        let info = TopicQueueMappingDetail::clone_as_mapping_info(&detail);
166        assert_eq!(info.topic, detail.topic_queue_mapping_info.topic);
167        assert_eq!(info.total_queues, detail.topic_queue_mapping_info.total_queues);
168        assert_eq!(info.bname, detail.topic_queue_mapping_info.bname);
169        assert_eq!(info.epoch, detail.topic_queue_mapping_info.epoch);
170    }
171
172    #[test]
173    fn put_mapping_info() {
174        let detail = ArcMut::new(TopicQueueMappingDetail::default());
175        detail.mut_from_ref().hosted_queues = Some(HashMap::new());
176
177        let items = vec![LogicQueueMappingItem::default()];
178        TopicQueueMappingDetail::put_mapping_info(detail.clone(), 1, items.clone());
179
180        assert_eq!(detail.hosted_queues.as_ref().unwrap().get(&1).unwrap(), &items);
181
182        TopicQueueMappingDetail::put_mapping_info(detail.clone(), 2, vec![]);
183        assert!(detail.hosted_queues.as_ref().unwrap().get(&2).is_none());
184    }
185}