rocketmq_remoting/protocol/body/
batch_ack.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use bitvec::prelude::BitVec;
18use bitvec::prelude::Lsb0;
19use cheetah_string::CheetahString;
20use serde::Deserialize;
21use serde::Deserializer;
22use serde::Serialize;
23use serde::Serializer;
24
25#[derive(Serialize, Deserialize)]
26pub struct BatchAck {
27    #[serde(rename = "c", alias = "consumerGroup")]
28    pub consumer_group: CheetahString,
29
30    #[serde(rename = "t", alias = "topic")]
31    pub topic: CheetahString,
32
33    #[serde(rename = "r", alias = "retry")]
34    pub retry: CheetahString, // "1" if it's retry topic
35
36    #[serde(rename = "so", alias = "startOffset")]
37    pub start_offset: i64,
38
39    #[serde(rename = "q", alias = "queueId")]
40    pub queue_id: i32,
41
42    #[serde(rename = "rq", alias = "reviveQueueId")]
43    pub revive_queue_id: i32,
44
45    #[serde(rename = "pt", alias = "popTime")]
46    pub pop_time: i64,
47
48    #[serde(rename = "it", alias = "invisibleTime")]
49    pub invisible_time: i64,
50
51    #[serde(rename = "b", alias = "bitSet")]
52    pub bit_set: SerializableBitVec,
53}
54
55pub struct SerializableBitVec(pub BitVec<u64, Lsb0>);
56
57impl Serialize for SerializableBitVec {
58    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
59    where
60        S: Serializer,
61    {
62        let slice = bytemuck::cast_slice(self.0.as_raw_slice());
63        serializer.serialize_bytes(slice)
64    }
65}
66
67impl<'de> Deserialize<'de> for SerializableBitVec {
68    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
69    where
70        D: Deserializer<'de>,
71    {
72        let bytes: Vec<u8> = Vec::deserialize(deserializer)?;
73        let inner: &[u64] = bytemuck::cast_slice(bytes.as_slice());
74        Ok(SerializableBitVec(BitVec::<u64, Lsb0>::from_slice(inner)))
75    }
76}
77
78#[cfg(test)]
79mod tests {
80    use bitvec::prelude::*;
81    use cheetah_string::CheetahString;
82    use serde_json;
83
84    use super::*;
85
86    #[test]
87    fn batch_ack_serialization() {
88        let bit_set = BitVec::from_vec(vec![0u64; 8]);
89        let batch_ack = BatchAck {
90            consumer_group: CheetahString::from("group1"),
91            topic: CheetahString::from("topic1"),
92            retry: CheetahString::from("1"),
93            start_offset: 100,
94            queue_id: 1,
95            revive_queue_id: 2,
96            pop_time: 123456789,
97            invisible_time: 987654321,
98            bit_set: SerializableBitVec(bit_set.clone()),
99        };
100        let serialized = serde_json::to_string(&batch_ack).unwrap();
101        let deserialized: BatchAck = serde_json::from_str(&serialized).unwrap();
102        assert_eq!(deserialized.consumer_group, CheetahString::from("group1"));
103        assert_eq!(deserialized.topic, CheetahString::from("topic1"));
104        assert_eq!(deserialized.retry, CheetahString::from("1"));
105        assert_eq!(deserialized.start_offset, 100);
106        assert_eq!(deserialized.queue_id, 1);
107        assert_eq!(deserialized.revive_queue_id, 2);
108        assert_eq!(deserialized.pop_time, 123456789);
109        assert_eq!(deserialized.invisible_time, 987654321);
110        assert_eq!(deserialized.bit_set.0, bit_set);
111    }
112
113    #[test]
114    fn batch_ack_default_values() {
115        let bit_set = BitVec::from_element(8);
116        let batch_ack = BatchAck {
117            consumer_group: CheetahString::new(),
118            topic: CheetahString::new(),
119            retry: CheetahString::new(),
120            start_offset: 0,
121            queue_id: 0,
122            revive_queue_id: 0,
123            pop_time: 0,
124            invisible_time: 0,
125            bit_set: SerializableBitVec(bit_set.clone()),
126        };
127        assert_eq!(batch_ack.consumer_group, CheetahString::new());
128        assert_eq!(batch_ack.topic, CheetahString::new());
129        assert_eq!(batch_ack.retry, CheetahString::new());
130        assert_eq!(batch_ack.start_offset, 0);
131        assert_eq!(batch_ack.queue_id, 0);
132        assert_eq!(batch_ack.revive_queue_id, 0);
133        assert_eq!(batch_ack.pop_time, 0);
134        assert_eq!(batch_ack.invisible_time, 0);
135        assert_eq!(batch_ack.bit_set.0, bit_set);
136    }
137
138    #[test]
139    fn batch_ack_edge_case_empty_strings() {
140        let bit_set = BitVec::new();
141        let batch_ack = BatchAck {
142            consumer_group: CheetahString::from(""),
143            topic: CheetahString::from(""),
144            retry: CheetahString::from(""),
145            start_offset: -1,
146            queue_id: -1,
147            revive_queue_id: -1,
148            pop_time: -1,
149            invisible_time: -1,
150            bit_set: SerializableBitVec(bit_set.clone()),
151        };
152        assert_eq!(batch_ack.consumer_group, CheetahString::from(""));
153        assert_eq!(batch_ack.topic, CheetahString::from(""));
154        assert_eq!(batch_ack.retry, CheetahString::from(""));
155        assert_eq!(batch_ack.start_offset, -1);
156        assert_eq!(batch_ack.queue_id, -1);
157        assert_eq!(batch_ack.revive_queue_id, -1);
158        assert_eq!(batch_ack.pop_time, -1);
159        assert_eq!(batch_ack.invisible_time, -1);
160        assert_eq!(batch_ack.bit_set.0, bit_set);
161    }
162}