Skip to main content

rocketmq_remoting/protocol/static_topic/
logic_queue_mapping_item.rs

1use cheetah_string::CheetahString;
2use serde::Deserialize;
3use serde::Serialize;
4
5#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
6pub struct LogicQueueMappingItem {
7    pub gen: i32, // Immutable
8    #[serde(rename = "queueId")]
9    pub queue_id: i32, // Immutable
10    pub bname: Option<CheetahString>, // Immutable
11    #[serde(rename = "logicOffset")]
12    pub logic_offset: i64, // Start of the logic offset, Important, can be changed by command only once
13    #[serde(rename = "startOffset")]
14    pub start_offset: i64, // Start of the physical offset, Should always be 0, Immutable
15    #[serde(rename = "endOffset")]
16    pub end_offset: i64, // End of the physical offset, Excluded, Default to -1, Mutable
17    #[serde(rename = "timeOfStart")]
18    pub time_of_start: i64, // Mutable, Reserved
19    #[serde(rename = "timeOfEnd")]
20    pub time_of_end: i64, // Mutable, Reserved
21}
22
23impl Default for LogicQueueMappingItem {
24    fn default() -> Self {
25        Self {
26            gen: 0,
27            queue_id: 0,
28            bname: None,
29            logic_offset: 0,
30            start_offset: 0,
31            end_offset: -1,
32            time_of_start: -1,
33            time_of_end: -1,
34        }
35    }
36}
37
38impl LogicQueueMappingItem {
39    pub fn compute_static_queue_offset_strictly(&self, physical_queue_offset: i64) -> i64 {
40        if physical_queue_offset > self.start_offset {
41            return self.logic_offset;
42        }
43        self.logic_offset + (physical_queue_offset - self.start_offset)
44    }
45
46    pub fn compute_static_queue_offset_loosely(&self, physical_queue_offset: i64) -> i64 {
47        // Consider the newly mapped item
48        if self.logic_offset < 0 {
49            return -1;
50        }
51        if physical_queue_offset < self.start_offset {
52            return self.logic_offset;
53        }
54        if self.end_offset >= self.start_offset && self.end_offset < physical_queue_offset {
55            return self.logic_offset + (self.end_offset - self.start_offset);
56        }
57        self.logic_offset + (physical_queue_offset - self.start_offset)
58    }
59
60    pub fn compute_physical_queue_offset(&self, static_queue_offset: i64) -> i64 {
61        self.start_offset + (static_queue_offset - self.logic_offset)
62    }
63
64    pub fn compute_offset_delta(&self) -> i64 {
65        self.logic_offset - self.start_offset
66    }
67
68    pub fn check_if_end_offset_decided(&self) -> bool {
69        self.end_offset > self.start_offset
70    }
71
72    pub fn compute_max_static_queue_offset(&self) -> i64 {
73        if self.end_offset >= self.start_offset {
74            self.logic_offset + self.end_offset - self.start_offset
75        } else {
76            self.logic_offset
77        }
78    }
79}
80
81#[cfg(test)]
82mod tests {
83    use super::*;
84
85    #[test]
86    fn logic_queue_mapping_item_default() {
87        let item = LogicQueueMappingItem::default();
88        assert_eq!(item.gen, 0);
89        assert_eq!(item.queue_id, 0);
90        assert!(item.bname.is_none());
91        assert_eq!(item.logic_offset, 0);
92        assert_eq!(item.start_offset, 0);
93        assert_eq!(item.end_offset, -1);
94        assert_eq!(item.time_of_start, -1);
95        assert_eq!(item.time_of_end, -1);
96    }
97
98    #[test]
99    fn logic_queue_mapping_item_serde() {
100        let item = LogicQueueMappingItem {
101            gen: 1,
102            queue_id: 2,
103            bname: Some(CheetahString::from("broker-a")),
104            logic_offset: 100,
105            start_offset: 0,
106            end_offset: 200,
107            time_of_start: 123456,
108            time_of_end: 789012,
109        };
110        let json = serde_json::to_string(&item).unwrap();
111        let expected = r#"{"gen":1,"queueId":2,"bname":"broker-a","logicOffset":100,"startOffset":0,"endOffset":200,"timeOfStart":123456,"timeOfEnd":789012}"#;
112        assert_eq!(json, expected);
113        let deserialized: LogicQueueMappingItem = serde_json::from_str(&json).unwrap();
114        assert_eq!(item, deserialized);
115    }
116
117    #[test]
118    fn compute_static_queue_offset_strictly() {
119        let item = LogicQueueMappingItem {
120            start_offset: 100,
121            logic_offset: 500,
122            ..Default::default()
123        };
124        assert_eq!(item.compute_static_queue_offset_strictly(150), 500);
125        assert_eq!(item.compute_static_queue_offset_strictly(50), 450);
126    }
127
128    #[test]
129    fn compute_static_queue_offset_loosely() {
130        let mut item = LogicQueueMappingItem {
131            logic_offset: -1,
132            ..Default::default()
133        };
134        assert_eq!(item.compute_static_queue_offset_loosely(100), -1);
135
136        item.logic_offset = 500;
137        item.start_offset = 100;
138        assert_eq!(item.compute_static_queue_offset_loosely(50), 500);
139
140        item.end_offset = 200;
141        assert_eq!(item.compute_static_queue_offset_loosely(250), 600);
142
143        assert_eq!(item.compute_static_queue_offset_loosely(150), 550);
144    }
145
146    #[test]
147    fn compute_physical_queue_offset() {
148        let item = LogicQueueMappingItem {
149            start_offset: 100,
150            logic_offset: 500,
151            ..Default::default()
152        };
153        assert_eq!(item.compute_physical_queue_offset(550), 150);
154    }
155
156    #[test]
157    fn compute_offset_delta() {
158        let item = LogicQueueMappingItem {
159            start_offset: 100,
160            logic_offset: 500,
161            ..Default::default()
162        };
163        assert_eq!(item.compute_offset_delta(), 400);
164    }
165
166    #[test]
167    fn check_if_end_offset_decided() {
168        let mut item = LogicQueueMappingItem {
169            start_offset: 100,
170            end_offset: 50,
171            ..Default::default()
172        };
173        assert!(!item.check_if_end_offset_decided());
174        item.end_offset = 150;
175        assert!(item.check_if_end_offset_decided());
176    }
177
178    #[test]
179    fn compute_max_static_queue_offset() {
180        let mut item = LogicQueueMappingItem {
181            logic_offset: 500,
182            start_offset: 100,
183            end_offset: 50,
184            ..Default::default()
185        };
186        assert_eq!(item.compute_max_static_queue_offset(), 500);
187        item.end_offset = 200;
188        assert_eq!(item.compute_max_static_queue_offset(), 600);
189    }
190}