rocketmq_remoting/protocol/body/
batch_ack.rs1use bitvec::prelude::BitVec;
18use bitvec::prelude::Lsb0;
19use cheetah_string::CheetahString;
20use serde::Deserialize;
21use serde::Deserializer;
22use serde::Serialize;
23use serde::Serializer;
24
25#[derive(Serialize, Deserialize)]
26pub struct BatchAck {
27 #[serde(rename = "c", alias = "consumerGroup")]
28 pub consumer_group: CheetahString,
29
30 #[serde(rename = "t", alias = "topic")]
31 pub topic: CheetahString,
32
33 #[serde(rename = "r", alias = "retry")]
34 pub retry: CheetahString, #[serde(rename = "so", alias = "startOffset")]
37 pub start_offset: i64,
38
39 #[serde(rename = "q", alias = "queueId")]
40 pub queue_id: i32,
41
42 #[serde(rename = "rq", alias = "reviveQueueId")]
43 pub revive_queue_id: i32,
44
45 #[serde(rename = "pt", alias = "popTime")]
46 pub pop_time: i64,
47
48 #[serde(rename = "it", alias = "invisibleTime")]
49 pub invisible_time: i64,
50
51 #[serde(rename = "b", alias = "bitSet")]
52 pub bit_set: SerializableBitVec,
53}
54
55pub struct SerializableBitVec(pub BitVec<u64, Lsb0>);
56
57impl Serialize for SerializableBitVec {
58 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
59 where
60 S: Serializer,
61 {
62 let slice = bytemuck::cast_slice(self.0.as_raw_slice());
63 serializer.serialize_bytes(slice)
64 }
65}
66
67impl<'de> Deserialize<'de> for SerializableBitVec {
68 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
69 where
70 D: Deserializer<'de>,
71 {
72 let bytes: Vec<u8> = Vec::deserialize(deserializer)?;
73 let inner: &[u64] = bytemuck::cast_slice(bytes.as_slice());
74 Ok(SerializableBitVec(BitVec::<u64, Lsb0>::from_slice(inner)))
75 }
76}
77
78#[cfg(test)]
79mod tests {
80 use bitvec::prelude::*;
81 use cheetah_string::CheetahString;
82 use serde_json;
83
84 use super::*;
85
86 #[test]
87 fn batch_ack_serialization() {
88 let bit_set = BitVec::from_vec(vec![0u64; 8]);
89 let batch_ack = BatchAck {
90 consumer_group: CheetahString::from("group1"),
91 topic: CheetahString::from("topic1"),
92 retry: CheetahString::from("1"),
93 start_offset: 100,
94 queue_id: 1,
95 revive_queue_id: 2,
96 pop_time: 123456789,
97 invisible_time: 987654321,
98 bit_set: SerializableBitVec(bit_set.clone()),
99 };
100 let serialized = serde_json::to_string(&batch_ack).unwrap();
101 let deserialized: BatchAck = serde_json::from_str(&serialized).unwrap();
102 assert_eq!(deserialized.consumer_group, CheetahString::from("group1"));
103 assert_eq!(deserialized.topic, CheetahString::from("topic1"));
104 assert_eq!(deserialized.retry, CheetahString::from("1"));
105 assert_eq!(deserialized.start_offset, 100);
106 assert_eq!(deserialized.queue_id, 1);
107 assert_eq!(deserialized.revive_queue_id, 2);
108 assert_eq!(deserialized.pop_time, 123456789);
109 assert_eq!(deserialized.invisible_time, 987654321);
110 assert_eq!(deserialized.bit_set.0, bit_set);
111 }
112
113 #[test]
114 fn batch_ack_default_values() {
115 let bit_set = BitVec::from_element(8);
116 let batch_ack = BatchAck {
117 consumer_group: CheetahString::new(),
118 topic: CheetahString::new(),
119 retry: CheetahString::new(),
120 start_offset: 0,
121 queue_id: 0,
122 revive_queue_id: 0,
123 pop_time: 0,
124 invisible_time: 0,
125 bit_set: SerializableBitVec(bit_set.clone()),
126 };
127 assert_eq!(batch_ack.consumer_group, CheetahString::new());
128 assert_eq!(batch_ack.topic, CheetahString::new());
129 assert_eq!(batch_ack.retry, CheetahString::new());
130 assert_eq!(batch_ack.start_offset, 0);
131 assert_eq!(batch_ack.queue_id, 0);
132 assert_eq!(batch_ack.revive_queue_id, 0);
133 assert_eq!(batch_ack.pop_time, 0);
134 assert_eq!(batch_ack.invisible_time, 0);
135 assert_eq!(batch_ack.bit_set.0, bit_set);
136 }
137
138 #[test]
139 fn batch_ack_edge_case_empty_strings() {
140 let bit_set = BitVec::new();
141 let batch_ack = BatchAck {
142 consumer_group: CheetahString::from(""),
143 topic: CheetahString::from(""),
144 retry: CheetahString::from(""),
145 start_offset: -1,
146 queue_id: -1,
147 revive_queue_id: -1,
148 pop_time: -1,
149 invisible_time: -1,
150 bit_set: SerializableBitVec(bit_set.clone()),
151 };
152 assert_eq!(batch_ack.consumer_group, CheetahString::from(""));
153 assert_eq!(batch_ack.topic, CheetahString::from(""));
154 assert_eq!(batch_ack.retry, CheetahString::from(""));
155 assert_eq!(batch_ack.start_offset, -1);
156 assert_eq!(batch_ack.queue_id, -1);
157 assert_eq!(batch_ack.revive_queue_id, -1);
158 assert_eq!(batch_ack.pop_time, -1);
159 assert_eq!(batch_ack.invisible_time, -1);
160 assert_eq!(batch_ack.bit_set.0, bit_set);
161 }
162}