rocketmq_remoting/protocol/static_topic/
logic_queue_mapping_item.rs1use cheetah_string::CheetahString;
2use serde::Deserialize;
3use serde::Serialize;
4
5#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
6pub struct LogicQueueMappingItem {
7 pub gen: i32, #[serde(rename = "queueId")]
9 pub queue_id: i32, pub bname: Option<CheetahString>, #[serde(rename = "logicOffset")]
12 pub logic_offset: i64, #[serde(rename = "startOffset")]
14 pub start_offset: i64, #[serde(rename = "endOffset")]
16 pub end_offset: i64, #[serde(rename = "timeOfStart")]
18 pub time_of_start: i64, #[serde(rename = "timeOfEnd")]
20 pub time_of_end: i64, }
22
23impl Default for LogicQueueMappingItem {
24 fn default() -> Self {
25 Self {
26 gen: 0,
27 queue_id: 0,
28 bname: None,
29 logic_offset: 0,
30 start_offset: 0,
31 end_offset: -1,
32 time_of_start: -1,
33 time_of_end: -1,
34 }
35 }
36}
37
38impl LogicQueueMappingItem {
39 pub fn compute_static_queue_offset_strictly(&self, physical_queue_offset: i64) -> i64 {
40 if physical_queue_offset > self.start_offset {
41 return self.logic_offset;
42 }
43 self.logic_offset + (physical_queue_offset - self.start_offset)
44 }
45
46 pub fn compute_static_queue_offset_loosely(&self, physical_queue_offset: i64) -> i64 {
47 if self.logic_offset < 0 {
49 return -1;
50 }
51 if physical_queue_offset < self.start_offset {
52 return self.logic_offset;
53 }
54 if self.end_offset >= self.start_offset && self.end_offset < physical_queue_offset {
55 return self.logic_offset + (self.end_offset - self.start_offset);
56 }
57 self.logic_offset + (physical_queue_offset - self.start_offset)
58 }
59
60 pub fn compute_physical_queue_offset(&self, static_queue_offset: i64) -> i64 {
61 self.start_offset + (static_queue_offset - self.logic_offset)
62 }
63
64 pub fn compute_offset_delta(&self) -> i64 {
65 self.logic_offset - self.start_offset
66 }
67
68 pub fn check_if_end_offset_decided(&self) -> bool {
69 self.end_offset > self.start_offset
70 }
71
72 pub fn compute_max_static_queue_offset(&self) -> i64 {
73 if self.end_offset >= self.start_offset {
74 self.logic_offset + self.end_offset - self.start_offset
75 } else {
76 self.logic_offset
77 }
78 }
79}
80
81#[cfg(test)]
82mod tests {
83 use super::*;
84
85 #[test]
86 fn logic_queue_mapping_item_default() {
87 let item = LogicQueueMappingItem::default();
88 assert_eq!(item.gen, 0);
89 assert_eq!(item.queue_id, 0);
90 assert!(item.bname.is_none());
91 assert_eq!(item.logic_offset, 0);
92 assert_eq!(item.start_offset, 0);
93 assert_eq!(item.end_offset, -1);
94 assert_eq!(item.time_of_start, -1);
95 assert_eq!(item.time_of_end, -1);
96 }
97
98 #[test]
99 fn logic_queue_mapping_item_serde() {
100 let item = LogicQueueMappingItem {
101 gen: 1,
102 queue_id: 2,
103 bname: Some(CheetahString::from("broker-a")),
104 logic_offset: 100,
105 start_offset: 0,
106 end_offset: 200,
107 time_of_start: 123456,
108 time_of_end: 789012,
109 };
110 let json = serde_json::to_string(&item).unwrap();
111 let expected = r#"{"gen":1,"queueId":2,"bname":"broker-a","logicOffset":100,"startOffset":0,"endOffset":200,"timeOfStart":123456,"timeOfEnd":789012}"#;
112 assert_eq!(json, expected);
113 let deserialized: LogicQueueMappingItem = serde_json::from_str(&json).unwrap();
114 assert_eq!(item, deserialized);
115 }
116
117 #[test]
118 fn compute_static_queue_offset_strictly() {
119 let item = LogicQueueMappingItem {
120 start_offset: 100,
121 logic_offset: 500,
122 ..Default::default()
123 };
124 assert_eq!(item.compute_static_queue_offset_strictly(150), 500);
125 assert_eq!(item.compute_static_queue_offset_strictly(50), 450);
126 }
127
128 #[test]
129 fn compute_static_queue_offset_loosely() {
130 let mut item = LogicQueueMappingItem {
131 logic_offset: -1,
132 ..Default::default()
133 };
134 assert_eq!(item.compute_static_queue_offset_loosely(100), -1);
135
136 item.logic_offset = 500;
137 item.start_offset = 100;
138 assert_eq!(item.compute_static_queue_offset_loosely(50), 500);
139
140 item.end_offset = 200;
141 assert_eq!(item.compute_static_queue_offset_loosely(250), 600);
142
143 assert_eq!(item.compute_static_queue_offset_loosely(150), 550);
144 }
145
146 #[test]
147 fn compute_physical_queue_offset() {
148 let item = LogicQueueMappingItem {
149 start_offset: 100,
150 logic_offset: 500,
151 ..Default::default()
152 };
153 assert_eq!(item.compute_physical_queue_offset(550), 150);
154 }
155
156 #[test]
157 fn compute_offset_delta() {
158 let item = LogicQueueMappingItem {
159 start_offset: 100,
160 logic_offset: 500,
161 ..Default::default()
162 };
163 assert_eq!(item.compute_offset_delta(), 400);
164 }
165
166 #[test]
167 fn check_if_end_offset_decided() {
168 let mut item = LogicQueueMappingItem {
169 start_offset: 100,
170 end_offset: 50,
171 ..Default::default()
172 };
173 assert!(!item.check_if_end_offset_decided());
174 item.end_offset = 150;
175 assert!(item.check_if_end_offset_decided());
176 }
177
178 #[test]
179 fn compute_max_static_queue_offset() {
180 let mut item = LogicQueueMappingItem {
181 logic_offset: 500,
182 start_offset: 100,
183 end_offset: 50,
184 ..Default::default()
185 };
186 assert_eq!(item.compute_max_static_queue_offset(), 500);
187 item.end_offset = 200;
188 assert_eq!(item.compute_max_static_queue_offset(), 600);
189 }
190}