rocketmq_common/utils/
message_utils.rs1use std::collections::hash_map::DefaultHasher;
18use std::collections::HashSet;
19use std::hash::Hash;
20use std::hash::Hasher;
21use std::net::SocketAddr;
22
23use bytes::BufMut;
24
25use crate::common::message::message_ext::MessageExt;
26use crate::common::message::MessageConst;
27use crate::MessageDecoder::NAME_VALUE_SEPARATOR;
28use crate::MessageDecoder::PROPERTY_SEPARATOR;
29use crate::UtilAll::bytes_to_string;
30
31pub fn get_sharding_key_index(sharding_key: &str, index_size: usize) -> usize {
32 let mut hasher = DefaultHasher::new();
33 sharding_key.hash(&mut hasher);
34 let hash = hasher.finish() as usize;
35 hash % index_size
36}
37
38pub fn get_sharding_key_index_by_msg(msg: &MessageExt, index_size: usize) -> usize {
39 let sharding_key = msg
40 .message
41 .properties
42 .get(MessageConst::PROPERTY_SHARDING_KEY)
43 .cloned()
44 .unwrap_or_default();
45
46 get_sharding_key_index(sharding_key.as_str(), index_size)
47}
48
49pub fn get_sharding_key_indexes(msgs: &[MessageExt], index_size: usize) -> HashSet<usize> {
50 let mut index_set = HashSet::with_capacity(index_size);
51 for msg in msgs {
52 let idx = get_sharding_key_index_by_msg(msg, index_size);
53 index_set.insert(idx);
54 }
55 index_set
56}
57
58pub fn delete_property(properties_string: &str, name: &str) -> String {
59 if !properties_string.is_empty() {
60 let mut idx0 = 0;
61 let mut idx1;
62 let mut idx2;
63 idx1 = properties_string.find(name);
64 if idx1.is_some() {
65 let mut string_builder = String::with_capacity(properties_string.len());
67 loop {
68 let mut start_idx = idx0;
69 loop {
70 idx1 = properties_string[start_idx..]
71 .find(name)
72 .map(|i| i + start_idx);
73 if idx1.is_none() {
74 break;
75 }
76 let idx1 = idx1.unwrap();
77 start_idx = idx1 + name.len();
78 if (idx1 == 0
79 || properties_string.chars().nth(idx1 - 1) == Some(PROPERTY_SEPARATOR))
80 && (properties_string.len() > idx1 + name.len())
81 && properties_string.chars().nth(idx1 + name.len())
82 == Some(NAME_VALUE_SEPARATOR)
83 {
84 break;
85 }
86 }
87 if idx1.is_none() {
88 string_builder.push_str(&properties_string[idx0..]);
91 break;
92 }
93 let idx1 = idx1.unwrap();
94 string_builder.push_str(&properties_string[idx0..idx1]);
96 idx2 = properties_string[idx1 + name.len() + 1..]
98 .find(PROPERTY_SEPARATOR)
99 .map(|i| i + idx1 + name.len() + 1);
100 if idx2.is_none() {
102 break;
103 }
104 idx0 = idx2.unwrap() + 1;
105 }
106 return string_builder;
107 }
108 }
109 properties_string.to_string()
110}
111
112#[allow(unused_assignments)]
113pub fn delete_property_v2(properties_str: &str, name: &str) -> String {
114 if properties_str.is_empty() {
115 return String::new();
116 }
117 if let Some(mut idx1) = properties_str.find(name) {
118 let mut idx0 = 0;
119 let mut result = String::with_capacity(properties_str.len());
120
121 loop {
122 let mut start_idx = idx0;
123 loop {
124 match properties_str[start_idx..].find(name) {
125 Some(offset) => {
126 idx1 = start_idx + offset;
127 start_idx = idx1 + name.len();
128 let before_ok = idx1 == 0
129 || properties_str.chars().nth(idx1 - 1) == Some(PROPERTY_SEPARATOR);
130 let after_ok = properties_str.chars().nth(idx1 + name.len())
131 == Some(NAME_VALUE_SEPARATOR);
132 if before_ok && after_ok {
133 break;
134 }
135 }
136 None => {
137 idx1 = usize::MAX;
138 break;
139 }
140 }
141 }
142
143 if idx1 == usize::MAX {
144 result.push_str(&properties_str[idx0..]);
145 break;
146 }
147
148 result.push_str(&properties_str[idx0..idx1]);
149
150 let idx2_opt = properties_str[idx1 + name.len() + 1..].find(PROPERTY_SEPARATOR);
151
152 match idx2_opt {
153 Some(rel) => idx0 = idx1 + name.len() + 1 + rel + 1, None => break,
155 }
156 }
157 result
158 } else {
159 properties_str.to_string()
160 }
161}
162
163pub fn build_message_id(socket_addr: SocketAddr, wrote_offset: i64) -> String {
164 let mut msg_id_vec = match socket_addr {
165 SocketAddr::V4(addr) => {
166 let mut msg_id_vec = Vec::<u8>::with_capacity(16);
167 msg_id_vec.extend_from_slice(&addr.ip().octets());
168 msg_id_vec.put_i32(addr.port() as i32);
169 msg_id_vec
170 }
171 SocketAddr::V6(addr) => {
172 let mut msg_id_vec = Vec::<u8>::with_capacity(28);
173 msg_id_vec.extend_from_slice(&addr.ip().octets());
174 msg_id_vec.put_i32(addr.port() as i32);
175 msg_id_vec
176 }
177 };
178 msg_id_vec.put_i64(wrote_offset);
179 bytes_to_string(msg_id_vec.as_slice())
180}
181
182pub fn socket_address_to_vec(socket_addr: SocketAddr) -> Vec<u8> {
183 match socket_addr {
184 SocketAddr::V4(addr) => {
185 let mut msg_id_vec = Vec::<u8>::with_capacity(8);
186 msg_id_vec.extend_from_slice(&addr.ip().octets());
187 msg_id_vec.put_i32(addr.port() as i32);
188 msg_id_vec
189 }
190 SocketAddr::V6(addr) => {
191 let mut msg_id_vec = Vec::<u8>::with_capacity(20);
192 msg_id_vec.extend_from_slice(&addr.ip().octets());
193 msg_id_vec.put_i32(addr.port() as i32);
194 msg_id_vec
195 }
196 }
197}
198
199pub fn build_batch_message_id(
200 socket_addr: SocketAddr,
201 store_host_length: i32,
202 batch_size: usize,
203 phy_pos: &[i64],
204) -> String {
205 if batch_size == 0 {
206 return String::new();
207 }
208 let msg_id_len = (store_host_length + 8) as usize;
209 let mut msg_id_vec = vec![0u8; msg_id_len];
210 msg_id_vec[..store_host_length as usize].copy_from_slice(&socket_address_to_vec(socket_addr));
211 let mut message_id = String::with_capacity(batch_size * msg_id_len * 2 + batch_size - 1);
212 for (index, value) in phy_pos.iter().enumerate() {
213 msg_id_vec[msg_id_len - 8..msg_id_len].copy_from_slice(&value.to_be_bytes());
214 if index != 0 {
215 message_id.push(',');
216 }
217 message_id.push_str(&bytes_to_string(&msg_id_vec));
218 }
219 message_id
220}
221
222pub fn parse_message_id(_msg_id: impl Into<String>) -> (SocketAddr, i64) {
223 unimplemented!()
224}
225
226#[cfg(test)]
227mod tests {
228 use std::net::Ipv4Addr;
229
230 use bytes::Bytes;
231 use bytes::BytesMut;
232
233 use super::*;
234 use crate::common::message::message_ext::MessageExt;
235
236 #[test]
237 fn test_get_sharding_key_index() {
238 let sharding_key = "example_key";
239 let index_size = 10;
240 let result = get_sharding_key_index(sharding_key, index_size);
241 assert!(result < index_size);
242 }
243
244 #[test]
245 fn test_get_sharding_key_index_by_msg() {
246 let mut message = MessageExt::default();
247 message.message.properties.insert(
248 MessageConst::PROPERTY_SHARDING_KEY.into(),
249 "example_key".into(),
250 );
251 let index_size = 10;
252 let result = get_sharding_key_index_by_msg(&message, index_size);
253 assert!(result < index_size);
254 }
255
256 #[test]
257 fn test_get_sharding_key_indexes() {
258 let mut messages = Vec::new();
259 let mut message1 = MessageExt::default();
260 message1
261 .message
262 .properties
263 .insert(MessageConst::PROPERTY_SHARDING_KEY.into(), "key1".into());
264 messages.push(message1);
265 let mut message2 = MessageExt::default();
266 message2
267 .message
268 .properties
269 .insert(MessageConst::PROPERTY_SHARDING_KEY.into(), "key2".into());
270 messages.push(message2);
271 let index_size = 10;
272 let result = get_sharding_key_indexes(&messages, index_size);
273 assert_eq!(result.len(), 2);
274 }
275
276 #[test]
277 fn test_delete_property() {
278 let properties_string = "aa\u{0001}bb\u{0002}cc\u{0001}bb\u{0002}";
279 let name = "aa";
280 let result = delete_property(properties_string, name);
281 assert_eq!(result, "cc\u{0001}bb\u{0002}");
282
283 let result = delete_property_v2(properties_string, name);
284 assert_eq!(result, "cc\u{0001}bb\u{0002}");
285 }
286
287 #[test]
288 fn delete_property_removes_property_correctly() {
289 let properties_string =
290 "key1\u{0001}value1\u{0002}key2\u{0001}value2\u{0002}key3\u{0001}value3";
291 let name = "key2";
292 let result = delete_property(properties_string, name);
293 assert_eq!(result, "key1\u{0001}value1\u{0002}key3\u{0001}value3");
294
295 let result = delete_property_v2(properties_string, name);
296 assert_eq!(result, "key1\u{0001}value1\u{0002}key3\u{0001}value3");
297 }
298
299 #[test]
300 fn delete_property_handles_empty_string() {
301 let properties_string = "";
302 let name = "key";
303 let result = delete_property(properties_string, name);
304 assert_eq!(result, "");
305 }
306
307 #[test]
308 fn delete_property_handles_non_existent_key() {
309 let properties_string = "key1\u{0001}value1\u{0002}key2\u{0001}value2";
310 let name = "key3";
311 let result = delete_property(properties_string, name);
312 assert_eq!(result, "key1\u{0001}value1\u{0002}key2\u{0001}value2");
313 }
314
315 #[test]
316 fn delete_property_handles_key_at_start() {
317 let properties_string = "key1\u{0001}value1\u{0002}key2\u{0001}value2";
318 let name = "key1";
319 let result = delete_property(properties_string, name);
320 assert_eq!(result, "key2\u{0001}value2");
321 }
322
323 #[test]
324 fn delete_property_handles_key_at_end() {
325 let properties_string = "key1\u{0001}value1\u{0002}key2\u{0001}value2";
326 let name = "key2";
327 let result = delete_property(properties_string, name);
328 assert_eq!(result, "key1\u{0001}value1\u{0002}");
329 }
330
331 #[test]
332 fn delete_property_handles_multiple_occurrences() {
333 let properties_string =
334 "key1\u{0001}value1\u{0002}key2\u{0001}value2\u{0002}key1\u{0001}value3";
335 let name = "key1";
336 let result = delete_property(properties_string, name);
337 assert_eq!(result, "key2\u{0001}value2\u{0002}");
338 }
339
340 #[test]
341 fn test_build_message_id() {
342 let socket_addr = "127.0.0.1:12".parse().unwrap();
343 let wrote_offset = 1;
344 let result = build_message_id(socket_addr, wrote_offset);
345 assert_eq!(result, "7F0000010000000C0000000000000001");
346 }
347
348 #[test]
349 fn build_batch_message_id_creates_correct_id_for_single_position() {
350 let socket_addr = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 8080);
351 let store_host_length = 8;
352 let batch_size = 1;
353 let phy_pos = vec![12345];
354
355 let result = build_batch_message_id(socket_addr, store_host_length, batch_size, &phy_pos);
356
357 assert_eq!(result, "7F00000100001F900000000000003039");
358 }
359
360 #[test]
361 fn build_batch_message_id_creates_correct_id_for_multiple_positions() {
362 let socket_addr = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 8080);
363 let store_host_length = 8;
364 let batch_size = 2;
365 let phy_pos = vec![12345, 67890];
366
367 let result = build_batch_message_id(socket_addr, store_host_length, batch_size, &phy_pos);
368
369 assert_eq!(
370 result,
371 "7F00000100001F900000000000003039,7F00000100001F900000000000010932"
372 );
373 }
374
375 #[test]
376 fn build_batch_message_id_creates_empty_id_for_no_positions() {
377 let socket_addr = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 8080);
378 let store_host_length = 8;
379 let batch_size = 0;
380 let phy_pos = vec![];
381
382 let result = build_batch_message_id(socket_addr, store_host_length, batch_size, &phy_pos);
383
384 assert_eq!(result, "");
385 }
386}