rocketmq_common/utils/
message_utils.rs1use std::collections::hash_map::DefaultHasher;
18use std::collections::HashSet;
19use std::hash::Hash;
20use std::hash::Hasher;
21use std::net::SocketAddr;
22
23use bytes::BufMut;
24
25use crate::common::message::message_ext::MessageExt;
26use crate::common::message::MessageConst;
27use crate::MessageDecoder::NAME_VALUE_SEPARATOR;
28use crate::MessageDecoder::PROPERTY_SEPARATOR;
29use crate::UtilAll::bytes_to_string;
30
31pub fn get_sharding_key_index(sharding_key: &str, index_size: usize) -> usize {
32 let mut hasher = DefaultHasher::new();
33 sharding_key.hash(&mut hasher);
34 let hash = hasher.finish() as usize;
35 hash % index_size
36}
37
38pub fn get_sharding_key_index_by_msg(msg: &MessageExt, index_size: usize) -> usize {
39 let sharding_key = msg
40 .message
41 .properties
42 .get(MessageConst::PROPERTY_SHARDING_KEY)
43 .cloned()
44 .unwrap_or_default();
45
46 get_sharding_key_index(sharding_key.as_str(), index_size)
47}
48
49pub fn get_sharding_key_indexes(msgs: &[MessageExt], index_size: usize) -> HashSet<usize> {
50 let mut index_set = HashSet::with_capacity(index_size);
51 for msg in msgs {
52 let idx = get_sharding_key_index_by_msg(msg, index_size);
53 index_set.insert(idx);
54 }
55 index_set
56}
57
58pub fn delete_property(properties_string: &str, name: &str) -> String {
59 if !properties_string.is_empty() {
60 let mut idx0 = 0;
61 let mut idx1;
62 let mut idx2;
63 idx1 = properties_string.find(name);
64 if idx1.is_some() {
65 let mut string_builder = String::with_capacity(properties_string.len());
67 loop {
68 let mut start_idx = idx0;
69 loop {
70 idx1 = properties_string[start_idx..]
71 .find(name)
72 .map(|i| i + start_idx);
73 if idx1.is_none() {
74 break;
75 }
76 let idx1 = idx1.unwrap();
77 start_idx = idx1 + name.len();
78 if (idx1 == 0
79 || properties_string.chars().nth(idx1 - 1) == Some(PROPERTY_SEPARATOR))
80 && (properties_string.len() > idx1 + name.len())
81 && properties_string.chars().nth(idx1 + name.len())
82 == Some(NAME_VALUE_SEPARATOR)
83 {
84 break;
85 }
86 }
87 if idx1.is_none() {
88 string_builder.push_str(&properties_string[idx0..]);
91 break;
92 }
93 let idx1 = idx1.unwrap();
94 string_builder.push_str(&properties_string[idx0..idx1]);
96 idx2 = properties_string[idx1 + name.len() + 1..]
98 .find(PROPERTY_SEPARATOR)
99 .map(|i| i + idx1 + name.len() + 1);
100 if idx2.is_none() {
102 break;
103 }
104 idx0 = idx2.unwrap() + 1;
105 }
106 return string_builder;
107 }
108 }
109 properties_string.to_string()
110}
111
112pub fn build_message_id(socket_addr: SocketAddr, wrote_offset: i64) -> String {
113 let mut msg_id_vec = match socket_addr {
114 SocketAddr::V4(addr) => {
115 let mut msg_id_vec = Vec::<u8>::with_capacity(16);
116 msg_id_vec.extend_from_slice(&addr.ip().octets());
117 msg_id_vec.put_i32(addr.port() as i32);
118 msg_id_vec
119 }
120 SocketAddr::V6(addr) => {
121 let mut msg_id_vec = Vec::<u8>::with_capacity(28);
122 msg_id_vec.extend_from_slice(&addr.ip().octets());
123 msg_id_vec.put_i32(addr.port() as i32);
124 msg_id_vec
125 }
126 };
127 msg_id_vec.put_i64(wrote_offset);
128 bytes_to_string(msg_id_vec.as_slice())
129}
130
131pub fn socket_address_to_vec(socket_addr: SocketAddr) -> Vec<u8> {
132 match socket_addr {
133 SocketAddr::V4(addr) => {
134 let mut msg_id_vec = Vec::<u8>::with_capacity(8);
135 msg_id_vec.extend_from_slice(&addr.ip().octets());
136 msg_id_vec.put_i32(addr.port() as i32);
137 msg_id_vec
138 }
139 SocketAddr::V6(addr) => {
140 let mut msg_id_vec = Vec::<u8>::with_capacity(20);
141 msg_id_vec.extend_from_slice(&addr.ip().octets());
142 msg_id_vec.put_i32(addr.port() as i32);
143 msg_id_vec
144 }
145 }
146}
147
148pub fn build_batch_message_id(
149 socket_addr: SocketAddr,
150 store_host_length: i32,
151 batch_size: usize,
152 phy_pos: &[i64],
153) -> String {
154 if batch_size == 0 {
155 return String::new();
156 }
157 let msg_id_len = (store_host_length + 8) as usize;
158 let mut msg_id_vec = vec![0u8; msg_id_len];
159 msg_id_vec[..store_host_length as usize].copy_from_slice(&socket_address_to_vec(socket_addr));
160 let mut message_id = String::with_capacity(batch_size * msg_id_len * 2 + batch_size - 1);
161 for (index, value) in phy_pos.iter().enumerate() {
162 msg_id_vec[msg_id_len - 8..msg_id_len].copy_from_slice(&value.to_be_bytes());
163 if index != 0 {
164 message_id.push(',');
165 }
166 message_id.push_str(&bytes_to_string(&msg_id_vec));
167 }
168 message_id
169}
170
171pub fn parse_message_id(_msg_id: impl Into<String>) -> (SocketAddr, i64) {
172 unimplemented!()
173}
174
175#[cfg(test)]
176mod tests {
177 use std::net::Ipv4Addr;
178
179 use bytes::Bytes;
180 use bytes::BytesMut;
181
182 use super::*;
183 use crate::common::message::message_ext::MessageExt;
184
185 #[test]
186 fn test_get_sharding_key_index() {
187 let sharding_key = "example_key";
188 let index_size = 10;
189 let result = get_sharding_key_index(sharding_key, index_size);
190 assert!(result < index_size);
191 }
192
193 #[test]
194 fn test_get_sharding_key_index_by_msg() {
195 let mut message = MessageExt::default();
196 message.message.properties.insert(
197 MessageConst::PROPERTY_SHARDING_KEY.into(),
198 "example_key".into(),
199 );
200 let index_size = 10;
201 let result = get_sharding_key_index_by_msg(&message, index_size);
202 assert!(result < index_size);
203 }
204
205 #[test]
206 fn test_get_sharding_key_indexes() {
207 let mut messages = Vec::new();
208 let mut message1 = MessageExt::default();
209 message1
210 .message
211 .properties
212 .insert(MessageConst::PROPERTY_SHARDING_KEY.into(), "key1".into());
213 messages.push(message1);
214 let mut message2 = MessageExt::default();
215 message2
216 .message
217 .properties
218 .insert(MessageConst::PROPERTY_SHARDING_KEY.into(), "key2".into());
219 messages.push(message2);
220 let index_size = 10;
221 let result = get_sharding_key_indexes(&messages, index_size);
222 assert_eq!(result.len(), 2);
223 }
224
225 #[test]
226 fn test_delete_property() {
227 let properties_string = "aa\u{0001}bb\u{0002}cc\u{0001}bb\u{0002}";
228 let name = "aa";
229 let result = delete_property(properties_string, name);
230 assert_eq!(result, "cc\u{0001}bb\u{0002}");
231 }
232
233 #[test]
234 fn delete_property_removes_property_correctly() {
235 let properties_string =
236 "key1\u{0001}value1\u{0002}key2\u{0001}value2\u{0002}key3\u{0001}value3";
237 let name = "key2";
238 let result = delete_property(properties_string, name);
239 assert_eq!(result, "key1\u{0001}value1\u{0002}key3\u{0001}value3");
240 }
241
242 #[test]
243 fn delete_property_handles_empty_string() {
244 let properties_string = "";
245 let name = "key";
246 let result = delete_property(properties_string, name);
247 assert_eq!(result, "");
248 }
249
250 #[test]
251 fn delete_property_handles_non_existent_key() {
252 let properties_string = "key1\u{0001}value1\u{0002}key2\u{0001}value2";
253 let name = "key3";
254 let result = delete_property(properties_string, name);
255 assert_eq!(result, "key1\u{0001}value1\u{0002}key2\u{0001}value2");
256 }
257
258 #[test]
259 fn delete_property_handles_key_at_start() {
260 let properties_string = "key1\u{0001}value1\u{0002}key2\u{0001}value2";
261 let name = "key1";
262 let result = delete_property(properties_string, name);
263 assert_eq!(result, "key2\u{0001}value2");
264 }
265
266 #[test]
267 fn delete_property_handles_key_at_end() {
268 let properties_string = "key1\u{0001}value1\u{0002}key2\u{0001}value2";
269 let name = "key2";
270 let result = delete_property(properties_string, name);
271 assert_eq!(result, "key1\u{0001}value1\u{0002}");
272 }
273
274 #[test]
275 fn delete_property_handles_multiple_occurrences() {
276 let properties_string =
277 "key1\u{0001}value1\u{0002}key2\u{0001}value2\u{0002}key1\u{0001}value3";
278 let name = "key1";
279 let result = delete_property(properties_string, name);
280 assert_eq!(result, "key2\u{0001}value2\u{0002}");
281 }
282
283 #[test]
284 fn test_build_message_id() {
285 let socket_addr = "127.0.0.1:12".parse().unwrap();
286 let wrote_offset = 1;
287 let result = build_message_id(socket_addr, wrote_offset);
288 assert_eq!(result, "7F0000010000000C0000000000000001");
289 }
290
291 #[test]
292 fn build_batch_message_id_creates_correct_id_for_single_position() {
293 let socket_addr = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 8080);
294 let store_host_length = 8;
295 let batch_size = 1;
296 let phy_pos = vec![12345];
297
298 let result = build_batch_message_id(socket_addr, store_host_length, batch_size, &phy_pos);
299
300 assert_eq!(result, "7F00000100001F900000000000003039");
301 }
302
303 #[test]
304 fn build_batch_message_id_creates_correct_id_for_multiple_positions() {
305 let socket_addr = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 8080);
306 let store_host_length = 8;
307 let batch_size = 2;
308 let phy_pos = vec![12345, 67890];
309
310 let result = build_batch_message_id(socket_addr, store_host_length, batch_size, &phy_pos);
311
312 assert_eq!(
313 result,
314 "7F00000100001F900000000000003039,7F00000100001F900000000000010932"
315 );
316 }
317
318 #[test]
319 fn build_batch_message_id_creates_empty_id_for_no_positions() {
320 let socket_addr = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 8080);
321 let store_host_length = 8;
322 let batch_size = 0;
323 let phy_pos = vec![];
324
325 let result = build_batch_message_id(socket_addr, store_host_length, batch_size, &phy_pos);
326
327 assert_eq!(result, "");
328 }
329}