rocketmq_remoting/protocol/static_topic/
logic_queue_mapping_item.rs

1use cheetah_string::CheetahString;
2use serde::Deserialize;
3use serde::Serialize;
4
5#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
6pub struct LogicQueueMappingItem {
7    pub gen: i32, // Immutable
8    #[serde(rename = "queueId")]
9    pub queue_id: i32, // Immutable
10    pub bname: Option<CheetahString>, // Immutable
11    #[serde(rename = "logicOffset")]
12    pub logic_offset: i64, /* Start of the logic offset, Important, can be changed by command
13                   * only once */
14    #[serde(rename = "startOffset")]
15    pub start_offset: i64, // Start of the physical offset, Should always be 0, Immutable
16    #[serde(rename = "endOffset")]
17    pub end_offset: i64, // End of the physical offset, Excluded, Default to -1, Mutable
18    #[serde(rename = "timeOfStart")]
19    pub time_of_start: i64, // Mutable, Reserved
20    #[serde(rename = "timeOfEnd")]
21    pub time_of_end: i64, // Mutable, Reserved
22}
23
24impl Default for LogicQueueMappingItem {
25    fn default() -> Self {
26        Self {
27            gen: 0,
28            queue_id: 0,
29            bname: None,
30            logic_offset: 0,
31            start_offset: 0,
32            end_offset: -1,
33            time_of_start: -1,
34            time_of_end: -1,
35        }
36    }
37}
38
39impl LogicQueueMappingItem {
40    pub fn compute_static_queue_offset_strictly(&self, physical_queue_offset: i64) -> i64 {
41        if physical_queue_offset > self.start_offset {
42            return self.logic_offset;
43        }
44        self.logic_offset + (physical_queue_offset - self.start_offset)
45    }
46
47    pub fn compute_static_queue_offset_loosely(&self, physical_queue_offset: i64) -> i64 {
48        // Consider the newly mapped item
49        if self.logic_offset < 0 {
50            return -1;
51        }
52        if physical_queue_offset < self.start_offset {
53            return self.logic_offset;
54        }
55        if self.end_offset >= self.start_offset && self.end_offset < physical_queue_offset {
56            return self.logic_offset + (self.end_offset - self.start_offset);
57        }
58        self.logic_offset + (physical_queue_offset - self.start_offset)
59    }
60
61    pub fn compute_physical_queue_offset(&self, static_queue_offset: i64) -> i64 {
62        self.start_offset + (static_queue_offset - self.logic_offset)
63    }
64
65    pub fn compute_offset_delta(&self) -> i64 {
66        self.logic_offset - self.start_offset
67    }
68
69    pub fn check_if_end_offset_decided(&self) -> bool {
70        self.end_offset > self.start_offset
71    }
72
73    pub fn compute_max_static_queue_offset(&self) -> i64 {
74        if self.end_offset >= self.start_offset {
75            self.logic_offset + self.end_offset - self.start_offset
76        } else {
77            self.logic_offset
78        }
79    }
80}