use cheetah_string::CheetahString;
use serde::Deserialize;
use serde::Serialize;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct LogicQueueMappingItem {
pub gen: i32, #[serde(rename = "queueId")]
pub queue_id: i32, pub bname: Option<CheetahString>, #[serde(rename = "logicOffset")]
pub logic_offset: i64, #[serde(rename = "startOffset")]
pub start_offset: i64, #[serde(rename = "endOffset")]
pub end_offset: i64, #[serde(rename = "timeOfStart")]
pub time_of_start: i64, #[serde(rename = "timeOfEnd")]
pub time_of_end: i64, }
impl Default for LogicQueueMappingItem {
fn default() -> Self {
Self {
gen: 0,
queue_id: 0,
bname: None,
logic_offset: 0,
start_offset: 0,
end_offset: -1,
time_of_start: -1,
time_of_end: -1,
}
}
}
impl LogicQueueMappingItem {
pub fn compute_static_queue_offset_strictly(&self, physical_queue_offset: i64) -> i64 {
if physical_queue_offset > self.start_offset {
return self.logic_offset;
}
self.logic_offset + (physical_queue_offset - self.start_offset)
}
pub fn compute_static_queue_offset_loosely(&self, physical_queue_offset: i64) -> i64 {
if self.logic_offset < 0 {
return -1;
}
if physical_queue_offset < self.start_offset {
return self.logic_offset;
}
if self.end_offset >= self.start_offset && self.end_offset < physical_queue_offset {
return self.logic_offset + (self.end_offset - self.start_offset);
}
self.logic_offset + (physical_queue_offset - self.start_offset)
}
pub fn compute_physical_queue_offset(&self, static_queue_offset: i64) -> i64 {
self.start_offset + (static_queue_offset - self.logic_offset)
}
pub fn compute_offset_delta(&self) -> i64 {
self.logic_offset - self.start_offset
}
pub fn check_if_end_offset_decided(&self) -> bool {
self.end_offset > self.start_offset
}
pub fn compute_max_static_queue_offset(&self) -> i64 {
if self.end_offset >= self.start_offset {
self.logic_offset + self.end_offset - self.start_offset
} else {
self.logic_offset
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn logic_queue_mapping_item_default() {
let item = LogicQueueMappingItem::default();
assert_eq!(item.gen, 0);
assert_eq!(item.queue_id, 0);
assert!(item.bname.is_none());
assert_eq!(item.logic_offset, 0);
assert_eq!(item.start_offset, 0);
assert_eq!(item.end_offset, -1);
assert_eq!(item.time_of_start, -1);
assert_eq!(item.time_of_end, -1);
}
#[test]
fn logic_queue_mapping_item_serde() {
let item = LogicQueueMappingItem {
gen: 1,
queue_id: 2,
bname: Some(CheetahString::from("broker-a")),
logic_offset: 100,
start_offset: 0,
end_offset: 200,
time_of_start: 123456,
time_of_end: 789012,
};
let json = serde_json::to_string(&item).unwrap();
let expected = r#"{"gen":1,"queueId":2,"bname":"broker-a","logicOffset":100,"startOffset":0,"endOffset":200,"timeOfStart":123456,"timeOfEnd":789012}"#;
assert_eq!(json, expected);
let deserialized: LogicQueueMappingItem = serde_json::from_str(&json).unwrap();
assert_eq!(item, deserialized);
}
#[test]
fn compute_static_queue_offset_strictly() {
let item = LogicQueueMappingItem {
start_offset: 100,
logic_offset: 500,
..Default::default()
};
assert_eq!(item.compute_static_queue_offset_strictly(150), 500);
assert_eq!(item.compute_static_queue_offset_strictly(50), 450);
}
#[test]
fn compute_static_queue_offset_loosely() {
let mut item = LogicQueueMappingItem {
logic_offset: -1,
..Default::default()
};
assert_eq!(item.compute_static_queue_offset_loosely(100), -1);
item.logic_offset = 500;
item.start_offset = 100;
assert_eq!(item.compute_static_queue_offset_loosely(50), 500);
item.end_offset = 200;
assert_eq!(item.compute_static_queue_offset_loosely(250), 600);
assert_eq!(item.compute_static_queue_offset_loosely(150), 550);
}
#[test]
fn compute_physical_queue_offset() {
let item = LogicQueueMappingItem {
start_offset: 100,
logic_offset: 500,
..Default::default()
};
assert_eq!(item.compute_physical_queue_offset(550), 150);
}
#[test]
fn compute_offset_delta() {
let item = LogicQueueMappingItem {
start_offset: 100,
logic_offset: 500,
..Default::default()
};
assert_eq!(item.compute_offset_delta(), 400);
}
#[test]
fn check_if_end_offset_decided() {
let mut item = LogicQueueMappingItem {
start_offset: 100,
end_offset: 50,
..Default::default()
};
assert!(!item.check_if_end_offset_decided());
item.end_offset = 150;
assert!(item.check_if_end_offset_decided());
}
#[test]
fn compute_max_static_queue_offset() {
let mut item = LogicQueueMappingItem {
logic_offset: 500,
start_offset: 100,
end_offset: 50,
..Default::default()
};
assert_eq!(item.compute_max_static_queue_offset(), 500);
item.end_offset = 200;
assert_eq!(item.compute_max_static_queue_offset(), 600);
}
}