rocketmq_remoting/protocol/static_topic/
topic_queue_mapping_detail.rs1use std::collections::HashMap;
16
17use rocketmq_rust::ArcMut;
18use serde::Deserialize;
19use serde::Serialize;
20
21use crate::protocol::static_topic::logic_queue_mapping_item::LogicQueueMappingItem;
22use crate::protocol::static_topic::topic_queue_mapping_info::TopicQueueMappingInfo;
23
24#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
25pub struct TopicQueueMappingDetail {
26 #[serde(flatten)]
27 pub topic_queue_mapping_info: TopicQueueMappingInfo,
28
29 #[serde(rename = "hostedQueues")]
30 pub hosted_queues: Option<HashMap<i32 , Vec<LogicQueueMappingItem>>>,
31}
32
33impl TopicQueueMappingDetail {
34 pub fn get_mapping_info(
35 mapping_detail: &TopicQueueMappingDetail,
36 global_id: i32,
37 ) -> Option<&Vec<LogicQueueMappingItem>> {
38 mapping_detail.hosted_queues.as_ref()?.get(&global_id)
39 }
40
41 pub fn compute_max_offset_from_mapping(mapping_detail: &TopicQueueMappingDetail, global_id: Option<i32>) -> i64 {
42 match Self::get_mapping_info(mapping_detail, global_id.unwrap()) {
43 Some(mapping_items) => {
44 if mapping_items.is_empty() {
45 return -1;
46 }
47 let item = mapping_items.last().unwrap();
48 item.compute_max_static_queue_offset()
49 }
50 None => -1,
51 }
52 }
53}
54
55impl TopicQueueMappingDetail {
57 pub fn clone_as_mapping_info(mapping_detail: &TopicQueueMappingDetail) -> TopicQueueMappingInfo {
58 TopicQueueMappingInfo {
59 topic: mapping_detail.topic_queue_mapping_info.topic.clone(),
60 total_queues: mapping_detail.topic_queue_mapping_info.total_queues,
61 bname: mapping_detail.topic_queue_mapping_info.bname.clone(),
62 epoch: mapping_detail.topic_queue_mapping_info.epoch,
63 ..TopicQueueMappingInfo::default()
64 }
65 }
66 pub fn put_mapping_info(
67 mut mapping_detail: ArcMut<TopicQueueMappingDetail>,
68 global_id: i32,
69 mapping_info: Vec<LogicQueueMappingItem>,
70 ) {
71 if mapping_info.is_empty() {
72 return;
73 }
74 if let Some(q_map) = &mut mapping_detail.hosted_queues {
75 q_map.insert(global_id, mapping_info);
76 }
77 }
78}
79
80#[cfg(test)]
81mod tests {
82 use super::*;
83
84 #[test]
85 fn topic_queue_mapping_detail_default() {
86 let detail = TopicQueueMappingDetail::default();
87 assert_eq!(detail.topic_queue_mapping_info, TopicQueueMappingInfo::default());
88 assert!(detail.hosted_queues.is_none());
89 }
90
91 #[test]
92 fn topic_queue_mapping_detail_serde() {
93 let mut detail = TopicQueueMappingDetail::default();
94 detail.topic_queue_mapping_info.topic = Some("test".into());
95 let mut hosted_queues = HashMap::new();
96 hosted_queues.insert(1, vec![LogicQueueMappingItem::default()]);
97 detail.hosted_queues = Some(hosted_queues);
98
99 let json = serde_json::to_string(&detail).unwrap();
100 let expected = r#"{"topic":"test","scope":"__global__","totalQueues":0,"bname":null,"epoch":0,"dirty":false,"currIdMap":null,"hostedQueues":{"1":[{"gen":0,"queueId":0,"bname":null,"logicOffset":0,"startOffset":0,"endOffset":-1,"timeOfStart":-1,"timeOfEnd":-1}]}}"#;
101 assert_eq!(json, expected);
102
103 let deserialized: TopicQueueMappingDetail = serde_json::from_str(&json).unwrap();
104 assert_eq!(detail, deserialized);
105 }
106
107 #[test]
108 fn get_mapping_info() {
109 let mut detail = TopicQueueMappingDetail::default();
110 let items = vec![LogicQueueMappingItem::default()];
111 let mut hosted_queues = HashMap::new();
112 hosted_queues.insert(1, items.clone());
113 detail.hosted_queues = Some(hosted_queues);
114
115 assert_eq!(TopicQueueMappingDetail::get_mapping_info(&detail, 1).unwrap(), &items);
116 assert!(TopicQueueMappingDetail::get_mapping_info(&detail, 2).is_none());
117 }
118
119 #[test]
120 fn compute_max_offset_from_mapping() {
121 let mut detail = TopicQueueMappingDetail::default();
122 let mut items = vec![LogicQueueMappingItem::default()];
123 items[0].logic_offset = 100;
124 items[0].start_offset = 50;
125 items[0].end_offset = 150;
126 let mut hosted_queues = HashMap::new();
129 hosted_queues.insert(1, items);
130 detail.hosted_queues = Some(hosted_queues);
131
132 assert_eq!(
133 TopicQueueMappingDetail::compute_max_offset_from_mapping(&detail, Some(1)),
134 200
135 );
136
137 detail.hosted_queues.as_mut().unwrap().get_mut(&1).unwrap()[0].end_offset = 20;
139 assert_eq!(
141 TopicQueueMappingDetail::compute_max_offset_from_mapping(&detail, Some(1)),
142 100
143 );
144
145 assert_eq!(
147 TopicQueueMappingDetail::compute_max_offset_from_mapping(&detail, Some(2)),
148 -1
149 );
150 }
151
152 #[test]
153 fn clone_as_mapping_info() {
154 let detail = TopicQueueMappingDetail {
155 topic_queue_mapping_info: TopicQueueMappingInfo {
156 topic: Some("test_topic".into()),
157 total_queues: 8,
158 bname: Some("broker_a".into()),
159 epoch: 12345,
160 ..TopicQueueMappingInfo::default()
161 },
162 ..Default::default()
163 };
164
165 let info = TopicQueueMappingDetail::clone_as_mapping_info(&detail);
166 assert_eq!(info.topic, detail.topic_queue_mapping_info.topic);
167 assert_eq!(info.total_queues, detail.topic_queue_mapping_info.total_queues);
168 assert_eq!(info.bname, detail.topic_queue_mapping_info.bname);
169 assert_eq!(info.epoch, detail.topic_queue_mapping_info.epoch);
170 }
171
172 #[test]
173 fn put_mapping_info() {
174 let detail = ArcMut::new(TopicQueueMappingDetail::default());
175 detail.mut_from_ref().hosted_queues = Some(HashMap::new());
176
177 let items = vec![LogicQueueMappingItem::default()];
178 TopicQueueMappingDetail::put_mapping_info(detail.clone(), 1, items.clone());
179
180 assert_eq!(detail.hosted_queues.as_ref().unwrap().get(&1).unwrap(), &items);
181
182 TopicQueueMappingDetail::put_mapping_info(detail.clone(), 2, vec![]);
183 assert!(detail.hosted_queues.as_ref().unwrap().get(&2).is_none());
184 }
185}