rocketmq_common/utils/
message_utils.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::collections::hash_map::DefaultHasher;
18use std::collections::HashSet;
19use std::hash::Hash;
20use std::hash::Hasher;
21use std::net::SocketAddr;
22
23use bytes::BufMut;
24
25use crate::common::message::message_ext::MessageExt;
26use crate::common::message::MessageConst;
27use crate::MessageDecoder::NAME_VALUE_SEPARATOR;
28use crate::MessageDecoder::PROPERTY_SEPARATOR;
29use crate::UtilAll::bytes_to_string;
30
31pub fn get_sharding_key_index(sharding_key: &str, index_size: usize) -> usize {
32    let mut hasher = DefaultHasher::new();
33    sharding_key.hash(&mut hasher);
34    let hash = hasher.finish() as usize;
35    hash % index_size
36}
37
38pub fn get_sharding_key_index_by_msg(msg: &MessageExt, index_size: usize) -> usize {
39    let sharding_key = msg
40        .message
41        .properties
42        .get(MessageConst::PROPERTY_SHARDING_KEY)
43        .cloned()
44        .unwrap_or_default();
45
46    get_sharding_key_index(sharding_key.as_str(), index_size)
47}
48
49pub fn get_sharding_key_indexes(msgs: &[MessageExt], index_size: usize) -> HashSet<usize> {
50    let mut index_set = HashSet::with_capacity(index_size);
51    for msg in msgs {
52        let idx = get_sharding_key_index_by_msg(msg, index_size);
53        index_set.insert(idx);
54    }
55    index_set
56}
57
58pub fn delete_property(properties_string: &str, name: &str) -> String {
59    if !properties_string.is_empty() {
60        let mut idx0 = 0;
61        let mut idx1;
62        let mut idx2;
63        idx1 = properties_string.find(name);
64        if idx1.is_some() {
65            // cropping may be required
66            let mut string_builder = String::with_capacity(properties_string.len());
67            loop {
68                let mut start_idx = idx0;
69                loop {
70                    idx1 = properties_string[start_idx..]
71                        .find(name)
72                        .map(|i| i + start_idx);
73                    if idx1.is_none() {
74                        break;
75                    }
76                    let idx1 = idx1.unwrap();
77                    start_idx = idx1 + name.len();
78                    if (idx1 == 0
79                        || properties_string.chars().nth(idx1 - 1) == Some(PROPERTY_SEPARATOR))
80                        && (properties_string.len() > idx1 + name.len())
81                        && properties_string.chars().nth(idx1 + name.len())
82                            == Some(NAME_VALUE_SEPARATOR)
83                    {
84                        break;
85                    }
86                }
87                if idx1.is_none() {
88                    // there are no characters that need to be skipped. Append all remaining
89                    // characters.
90                    string_builder.push_str(&properties_string[idx0..]);
91                    break;
92                }
93                let idx1 = idx1.unwrap();
94                // there are characters that need to be cropped
95                string_builder.push_str(&properties_string[idx0..idx1]);
96                // move idx2 to the end of the cropped character
97                idx2 = properties_string[idx1 + name.len() + 1..]
98                    .find(PROPERTY_SEPARATOR)
99                    .map(|i| i + idx1 + name.len() + 1);
100                // all subsequent characters will be cropped
101                if idx2.is_none() {
102                    break;
103                }
104                idx0 = idx2.unwrap() + 1;
105            }
106            return string_builder;
107        }
108    }
109    properties_string.to_string()
110}
111
112#[allow(unused_assignments)]
113pub fn delete_property_v2(properties_str: &str, name: &str) -> String {
114    if properties_str.is_empty() {
115        return String::new();
116    }
117    if let Some(mut idx1) = properties_str.find(name) {
118        let mut idx0 = 0;
119        let mut result = String::with_capacity(properties_str.len());
120
121        loop {
122            let mut start_idx = idx0;
123            loop {
124                match properties_str[start_idx..].find(name) {
125                    Some(offset) => {
126                        idx1 = start_idx + offset;
127                        start_idx = idx1 + name.len();
128                        let before_ok = idx1 == 0
129                            || properties_str.chars().nth(idx1 - 1) == Some(PROPERTY_SEPARATOR);
130                        let after_ok = properties_str.chars().nth(idx1 + name.len())
131                            == Some(NAME_VALUE_SEPARATOR);
132                        if before_ok && after_ok {
133                            break;
134                        }
135                    }
136                    None => {
137                        idx1 = usize::MAX;
138                        break;
139                    }
140                }
141            }
142
143            if idx1 == usize::MAX {
144                result.push_str(&properties_str[idx0..]);
145                break;
146            }
147
148            result.push_str(&properties_str[idx0..idx1]);
149
150            let idx2_opt = properties_str[idx1 + name.len() + 1..].find(PROPERTY_SEPARATOR);
151
152            match idx2_opt {
153                Some(rel) => idx0 = idx1 + name.len() + 1 + rel + 1, //
154                None => break,
155            }
156        }
157        result
158    } else {
159        properties_str.to_string()
160    }
161}
162
163pub fn build_message_id(socket_addr: SocketAddr, wrote_offset: i64) -> String {
164    let mut msg_id_vec = match socket_addr {
165        SocketAddr::V4(addr) => {
166            let mut msg_id_vec = Vec::<u8>::with_capacity(16);
167            msg_id_vec.extend_from_slice(&addr.ip().octets());
168            msg_id_vec.put_i32(addr.port() as i32);
169            msg_id_vec
170        }
171        SocketAddr::V6(addr) => {
172            let mut msg_id_vec = Vec::<u8>::with_capacity(28);
173            msg_id_vec.extend_from_slice(&addr.ip().octets());
174            msg_id_vec.put_i32(addr.port() as i32);
175            msg_id_vec
176        }
177    };
178    msg_id_vec.put_i64(wrote_offset);
179    bytes_to_string(msg_id_vec.as_slice())
180}
181
182pub fn socket_address_to_vec(socket_addr: SocketAddr) -> Vec<u8> {
183    match socket_addr {
184        SocketAddr::V4(addr) => {
185            let mut msg_id_vec = Vec::<u8>::with_capacity(8);
186            msg_id_vec.extend_from_slice(&addr.ip().octets());
187            msg_id_vec.put_i32(addr.port() as i32);
188            msg_id_vec
189        }
190        SocketAddr::V6(addr) => {
191            let mut msg_id_vec = Vec::<u8>::with_capacity(20);
192            msg_id_vec.extend_from_slice(&addr.ip().octets());
193            msg_id_vec.put_i32(addr.port() as i32);
194            msg_id_vec
195        }
196    }
197}
198
199pub fn build_batch_message_id(
200    socket_addr: SocketAddr,
201    store_host_length: i32,
202    batch_size: usize,
203    phy_pos: &[i64],
204) -> String {
205    if batch_size == 0 {
206        return String::new();
207    }
208    let msg_id_len = (store_host_length + 8) as usize;
209    let mut msg_id_vec = vec![0u8; msg_id_len];
210    msg_id_vec[..store_host_length as usize].copy_from_slice(&socket_address_to_vec(socket_addr));
211    let mut message_id = String::with_capacity(batch_size * msg_id_len * 2 + batch_size - 1);
212    for (index, value) in phy_pos.iter().enumerate() {
213        msg_id_vec[msg_id_len - 8..msg_id_len].copy_from_slice(&value.to_be_bytes());
214        if index != 0 {
215            message_id.push(',');
216        }
217        message_id.push_str(&bytes_to_string(&msg_id_vec));
218    }
219    message_id
220}
221
222pub fn parse_message_id(_msg_id: impl Into<String>) -> (SocketAddr, i64) {
223    unimplemented!()
224}
225
226#[cfg(test)]
227mod tests {
228    use std::net::Ipv4Addr;
229
230    use bytes::Bytes;
231    use bytes::BytesMut;
232
233    use super::*;
234    use crate::common::message::message_ext::MessageExt;
235
236    #[test]
237    fn test_get_sharding_key_index() {
238        let sharding_key = "example_key";
239        let index_size = 10;
240        let result = get_sharding_key_index(sharding_key, index_size);
241        assert!(result < index_size);
242    }
243
244    #[test]
245    fn test_get_sharding_key_index_by_msg() {
246        let mut message = MessageExt::default();
247        message.message.properties.insert(
248            MessageConst::PROPERTY_SHARDING_KEY.into(),
249            "example_key".into(),
250        );
251        let index_size = 10;
252        let result = get_sharding_key_index_by_msg(&message, index_size);
253        assert!(result < index_size);
254    }
255
256    #[test]
257    fn test_get_sharding_key_indexes() {
258        let mut messages = Vec::new();
259        let mut message1 = MessageExt::default();
260        message1
261            .message
262            .properties
263            .insert(MessageConst::PROPERTY_SHARDING_KEY.into(), "key1".into());
264        messages.push(message1);
265        let mut message2 = MessageExt::default();
266        message2
267            .message
268            .properties
269            .insert(MessageConst::PROPERTY_SHARDING_KEY.into(), "key2".into());
270        messages.push(message2);
271        let index_size = 10;
272        let result = get_sharding_key_indexes(&messages, index_size);
273        assert_eq!(result.len(), 2);
274    }
275
276    #[test]
277    fn test_delete_property() {
278        let properties_string = "aa\u{0001}bb\u{0002}cc\u{0001}bb\u{0002}";
279        let name = "aa";
280        let result = delete_property(properties_string, name);
281        assert_eq!(result, "cc\u{0001}bb\u{0002}");
282
283        let result = delete_property_v2(properties_string, name);
284        assert_eq!(result, "cc\u{0001}bb\u{0002}");
285    }
286
287    #[test]
288    fn delete_property_removes_property_correctly() {
289        let properties_string =
290            "key1\u{0001}value1\u{0002}key2\u{0001}value2\u{0002}key3\u{0001}value3";
291        let name = "key2";
292        let result = delete_property(properties_string, name);
293        assert_eq!(result, "key1\u{0001}value1\u{0002}key3\u{0001}value3");
294
295        let result = delete_property_v2(properties_string, name);
296        assert_eq!(result, "key1\u{0001}value1\u{0002}key3\u{0001}value3");
297    }
298
299    #[test]
300    fn delete_property_handles_empty_string() {
301        let properties_string = "";
302        let name = "key";
303        let result = delete_property(properties_string, name);
304        assert_eq!(result, "");
305    }
306
307    #[test]
308    fn delete_property_handles_non_existent_key() {
309        let properties_string = "key1\u{0001}value1\u{0002}key2\u{0001}value2";
310        let name = "key3";
311        let result = delete_property(properties_string, name);
312        assert_eq!(result, "key1\u{0001}value1\u{0002}key2\u{0001}value2");
313    }
314
315    #[test]
316    fn delete_property_handles_key_at_start() {
317        let properties_string = "key1\u{0001}value1\u{0002}key2\u{0001}value2";
318        let name = "key1";
319        let result = delete_property(properties_string, name);
320        assert_eq!(result, "key2\u{0001}value2");
321    }
322
323    #[test]
324    fn delete_property_handles_key_at_end() {
325        let properties_string = "key1\u{0001}value1\u{0002}key2\u{0001}value2";
326        let name = "key2";
327        let result = delete_property(properties_string, name);
328        assert_eq!(result, "key1\u{0001}value1\u{0002}");
329    }
330
331    #[test]
332    fn delete_property_handles_multiple_occurrences() {
333        let properties_string =
334            "key1\u{0001}value1\u{0002}key2\u{0001}value2\u{0002}key1\u{0001}value3";
335        let name = "key1";
336        let result = delete_property(properties_string, name);
337        assert_eq!(result, "key2\u{0001}value2\u{0002}");
338    }
339
340    #[test]
341    fn test_build_message_id() {
342        let socket_addr = "127.0.0.1:12".parse().unwrap();
343        let wrote_offset = 1;
344        let result = build_message_id(socket_addr, wrote_offset);
345        assert_eq!(result, "7F0000010000000C0000000000000001");
346    }
347
348    #[test]
349    fn build_batch_message_id_creates_correct_id_for_single_position() {
350        let socket_addr = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 8080);
351        let store_host_length = 8;
352        let batch_size = 1;
353        let phy_pos = vec![12345];
354
355        let result = build_batch_message_id(socket_addr, store_host_length, batch_size, &phy_pos);
356
357        assert_eq!(result, "7F00000100001F900000000000003039");
358    }
359
360    #[test]
361    fn build_batch_message_id_creates_correct_id_for_multiple_positions() {
362        let socket_addr = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 8080);
363        let store_host_length = 8;
364        let batch_size = 2;
365        let phy_pos = vec![12345, 67890];
366
367        let result = build_batch_message_id(socket_addr, store_host_length, batch_size, &phy_pos);
368
369        assert_eq!(
370            result,
371            "7F00000100001F900000000000003039,7F00000100001F900000000000010932"
372        );
373    }
374
375    #[test]
376    fn build_batch_message_id_creates_empty_id_for_no_positions() {
377        let socket_addr = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 8080);
378        let store_host_length = 8;
379        let batch_size = 0;
380        let phy_pos = vec![];
381
382        let result = build_batch_message_id(socket_addr, store_host_length, batch_size, &phy_pos);
383
384        assert_eq!(result, "");
385    }
386}