rocketmq_remoting/protocol/static_topic/
topic_queue_mapping_detail.rs

1use std::collections::HashMap;
2
3use rocketmq_rust::ArcMut;
4use serde::Deserialize;
5use serde::Serialize;
6
7use crate::protocol::static_topic::logic_queue_mapping_item::LogicQueueMappingItem;
8use crate::protocol::static_topic::topic_queue_info::TopicQueueMappingInfo;
9
10#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
11pub struct TopicQueueMappingDetail {
12    #[serde(flatten)]
13    pub topic_queue_mapping_info: TopicQueueMappingInfo,
14
15    #[serde(rename = "hostedQueues")]
16    pub hosted_queues: Option<HashMap<i32 /* global id */, Vec<LogicQueueMappingItem>>>,
17}
18
19impl TopicQueueMappingDetail {
20    pub fn get_mapping_info(
21        mapping_detail: &TopicQueueMappingDetail,
22        global_id: i32,
23    ) -> Option<&Vec<LogicQueueMappingItem>> {
24        mapping_detail.hosted_queues.as_ref()?.get(&global_id)
25    }
26
27    pub fn compute_max_offset_from_mapping(
28        mapping_detail: &TopicQueueMappingDetail,
29        global_id: Option<i32>,
30    ) -> i64 {
31        match Self::get_mapping_info(mapping_detail, global_id.unwrap()) {
32            Some(mapping_items) => {
33                if mapping_items.is_empty() {
34                    return -1;
35                }
36                let item = mapping_items.last().unwrap();
37                item.compute_max_static_queue_offset()
38            }
39            None => -1,
40        }
41    }
42}
43
44//impl static methods(Like java static method)
45impl TopicQueueMappingDetail {
46    pub fn clone_as_mapping_info(
47        mapping_detail: &TopicQueueMappingDetail,
48    ) -> TopicQueueMappingInfo {
49        TopicQueueMappingInfo {
50            topic: mapping_detail.topic_queue_mapping_info.topic.clone(),
51            total_queues: mapping_detail.topic_queue_mapping_info.total_queues,
52            bname: mapping_detail.topic_queue_mapping_info.bname.clone(),
53            epoch: mapping_detail.topic_queue_mapping_info.epoch,
54            ..TopicQueueMappingInfo::default()
55        }
56    }
57    pub fn put_mapping_info(
58        mut mapping_detail: ArcMut<TopicQueueMappingDetail>,
59        global_id: i32,
60        mapping_info: Vec<LogicQueueMappingItem>,
61    ) {
62        if mapping_info.is_empty() {
63            return;
64        }
65        if let Some(q_map) = &mut mapping_detail.hosted_queues {
66            q_map.insert(global_id, mapping_info);
67        }
68    }
69}