rocketmq_common/utils/
message_utils.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::collections::hash_map::DefaultHasher;
18use std::collections::HashSet;
19use std::hash::Hash;
20use std::hash::Hasher;
21use std::net::SocketAddr;
22
23use bytes::BufMut;
24
25use crate::common::message::message_ext::MessageExt;
26use crate::common::message::MessageConst;
27use crate::MessageDecoder::NAME_VALUE_SEPARATOR;
28use crate::MessageDecoder::PROPERTY_SEPARATOR;
29use crate::UtilAll::bytes_to_string;
30
31pub fn get_sharding_key_index(sharding_key: &str, index_size: usize) -> usize {
32    let mut hasher = DefaultHasher::new();
33    sharding_key.hash(&mut hasher);
34    let hash = hasher.finish() as usize;
35    hash % index_size
36}
37
38pub fn get_sharding_key_index_by_msg(msg: &MessageExt, index_size: usize) -> usize {
39    let sharding_key = msg
40        .message
41        .properties
42        .get(MessageConst::PROPERTY_SHARDING_KEY)
43        .cloned()
44        .unwrap_or_default();
45
46    get_sharding_key_index(sharding_key.as_str(), index_size)
47}
48
49pub fn get_sharding_key_indexes(msgs: &[MessageExt], index_size: usize) -> HashSet<usize> {
50    let mut index_set = HashSet::with_capacity(index_size);
51    for msg in msgs {
52        let idx = get_sharding_key_index_by_msg(msg, index_size);
53        index_set.insert(idx);
54    }
55    index_set
56}
57
58pub fn delete_property(properties_string: &str, name: &str) -> String {
59    if !properties_string.is_empty() {
60        let mut idx0 = 0;
61        let mut idx1;
62        let mut idx2;
63        idx1 = properties_string.find(name);
64        if idx1.is_some() {
65            // cropping may be required
66            let mut string_builder = String::with_capacity(properties_string.len());
67            loop {
68                let mut start_idx = idx0;
69                loop {
70                    idx1 = properties_string[start_idx..]
71                        .find(name)
72                        .map(|i| i + start_idx);
73                    if idx1.is_none() {
74                        break;
75                    }
76                    let idx1 = idx1.unwrap();
77                    start_idx = idx1 + name.len();
78                    if (idx1 == 0
79                        || properties_string.chars().nth(idx1 - 1) == Some(PROPERTY_SEPARATOR))
80                        && (properties_string.len() > idx1 + name.len())
81                        && properties_string.chars().nth(idx1 + name.len())
82                            == Some(NAME_VALUE_SEPARATOR)
83                    {
84                        break;
85                    }
86                }
87                if idx1.is_none() {
88                    // there are no characters that need to be skipped. Append all remaining
89                    // characters.
90                    string_builder.push_str(&properties_string[idx0..]);
91                    break;
92                }
93                let idx1 = idx1.unwrap();
94                // there are characters that need to be cropped
95                string_builder.push_str(&properties_string[idx0..idx1]);
96                // move idx2 to the end of the cropped character
97                idx2 = properties_string[idx1 + name.len() + 1..]
98                    .find(PROPERTY_SEPARATOR)
99                    .map(|i| i + idx1 + name.len() + 1);
100                // all subsequent characters will be cropped
101                if idx2.is_none() {
102                    break;
103                }
104                idx0 = idx2.unwrap() + 1;
105            }
106            return string_builder;
107        }
108    }
109    properties_string.to_string()
110}
111
112pub fn build_message_id(socket_addr: SocketAddr, wrote_offset: i64) -> String {
113    let mut msg_id_vec = match socket_addr {
114        SocketAddr::V4(addr) => {
115            let mut msg_id_vec = Vec::<u8>::with_capacity(16);
116            msg_id_vec.extend_from_slice(&addr.ip().octets());
117            msg_id_vec.put_i32(addr.port() as i32);
118            msg_id_vec
119        }
120        SocketAddr::V6(addr) => {
121            let mut msg_id_vec = Vec::<u8>::with_capacity(28);
122            msg_id_vec.extend_from_slice(&addr.ip().octets());
123            msg_id_vec.put_i32(addr.port() as i32);
124            msg_id_vec
125        }
126    };
127    msg_id_vec.put_i64(wrote_offset);
128    bytes_to_string(msg_id_vec.as_slice())
129}
130
131pub fn socket_address_to_vec(socket_addr: SocketAddr) -> Vec<u8> {
132    match socket_addr {
133        SocketAddr::V4(addr) => {
134            let mut msg_id_vec = Vec::<u8>::with_capacity(8);
135            msg_id_vec.extend_from_slice(&addr.ip().octets());
136            msg_id_vec.put_i32(addr.port() as i32);
137            msg_id_vec
138        }
139        SocketAddr::V6(addr) => {
140            let mut msg_id_vec = Vec::<u8>::with_capacity(20);
141            msg_id_vec.extend_from_slice(&addr.ip().octets());
142            msg_id_vec.put_i32(addr.port() as i32);
143            msg_id_vec
144        }
145    }
146}
147
148pub fn build_batch_message_id(
149    socket_addr: SocketAddr,
150    store_host_length: i32,
151    batch_size: usize,
152    phy_pos: &[i64],
153) -> String {
154    if batch_size == 0 {
155        return String::new();
156    }
157    let msg_id_len = (store_host_length + 8) as usize;
158    let mut msg_id_vec = vec![0u8; msg_id_len];
159    msg_id_vec[..store_host_length as usize].copy_from_slice(&socket_address_to_vec(socket_addr));
160    let mut message_id = String::with_capacity(batch_size * msg_id_len * 2 + batch_size - 1);
161    for (index, value) in phy_pos.iter().enumerate() {
162        msg_id_vec[msg_id_len - 8..msg_id_len].copy_from_slice(&value.to_be_bytes());
163        if index != 0 {
164            message_id.push(',');
165        }
166        message_id.push_str(&bytes_to_string(&msg_id_vec));
167    }
168    message_id
169}
170
171pub fn parse_message_id(_msg_id: impl Into<String>) -> (SocketAddr, i64) {
172    unimplemented!()
173}
174
175#[cfg(test)]
176mod tests {
177    use std::net::Ipv4Addr;
178
179    use bytes::Bytes;
180    use bytes::BytesMut;
181
182    use super::*;
183    use crate::common::message::message_ext::MessageExt;
184
185    #[test]
186    fn test_get_sharding_key_index() {
187        let sharding_key = "example_key";
188        let index_size = 10;
189        let result = get_sharding_key_index(sharding_key, index_size);
190        assert!(result < index_size);
191    }
192
193    #[test]
194    fn test_get_sharding_key_index_by_msg() {
195        let mut message = MessageExt::default();
196        message.message.properties.insert(
197            MessageConst::PROPERTY_SHARDING_KEY.into(),
198            "example_key".into(),
199        );
200        let index_size = 10;
201        let result = get_sharding_key_index_by_msg(&message, index_size);
202        assert!(result < index_size);
203    }
204
205    #[test]
206    fn test_get_sharding_key_indexes() {
207        let mut messages = Vec::new();
208        let mut message1 = MessageExt::default();
209        message1
210            .message
211            .properties
212            .insert(MessageConst::PROPERTY_SHARDING_KEY.into(), "key1".into());
213        messages.push(message1);
214        let mut message2 = MessageExt::default();
215        message2
216            .message
217            .properties
218            .insert(MessageConst::PROPERTY_SHARDING_KEY.into(), "key2".into());
219        messages.push(message2);
220        let index_size = 10;
221        let result = get_sharding_key_indexes(&messages, index_size);
222        assert_eq!(result.len(), 2);
223    }
224
225    #[test]
226    fn test_delete_property() {
227        let properties_string = "aa\u{0001}bb\u{0002}cc\u{0001}bb\u{0002}";
228        let name = "aa";
229        let result = delete_property(properties_string, name);
230        assert_eq!(result, "cc\u{0001}bb\u{0002}");
231    }
232
233    #[test]
234    fn delete_property_removes_property_correctly() {
235        let properties_string =
236            "key1\u{0001}value1\u{0002}key2\u{0001}value2\u{0002}key3\u{0001}value3";
237        let name = "key2";
238        let result = delete_property(properties_string, name);
239        assert_eq!(result, "key1\u{0001}value1\u{0002}key3\u{0001}value3");
240    }
241
242    #[test]
243    fn delete_property_handles_empty_string() {
244        let properties_string = "";
245        let name = "key";
246        let result = delete_property(properties_string, name);
247        assert_eq!(result, "");
248    }
249
250    #[test]
251    fn delete_property_handles_non_existent_key() {
252        let properties_string = "key1\u{0001}value1\u{0002}key2\u{0001}value2";
253        let name = "key3";
254        let result = delete_property(properties_string, name);
255        assert_eq!(result, "key1\u{0001}value1\u{0002}key2\u{0001}value2");
256    }
257
258    #[test]
259    fn delete_property_handles_key_at_start() {
260        let properties_string = "key1\u{0001}value1\u{0002}key2\u{0001}value2";
261        let name = "key1";
262        let result = delete_property(properties_string, name);
263        assert_eq!(result, "key2\u{0001}value2");
264    }
265
266    #[test]
267    fn delete_property_handles_key_at_end() {
268        let properties_string = "key1\u{0001}value1\u{0002}key2\u{0001}value2";
269        let name = "key2";
270        let result = delete_property(properties_string, name);
271        assert_eq!(result, "key1\u{0001}value1\u{0002}");
272    }
273
274    #[test]
275    fn delete_property_handles_multiple_occurrences() {
276        let properties_string =
277            "key1\u{0001}value1\u{0002}key2\u{0001}value2\u{0002}key1\u{0001}value3";
278        let name = "key1";
279        let result = delete_property(properties_string, name);
280        assert_eq!(result, "key2\u{0001}value2\u{0002}");
281    }
282
283    #[test]
284    fn test_build_message_id() {
285        let socket_addr = "127.0.0.1:12".parse().unwrap();
286        let wrote_offset = 1;
287        let result = build_message_id(socket_addr, wrote_offset);
288        assert_eq!(result, "7F0000010000000C0000000000000001");
289    }
290
291    #[test]
292    fn build_batch_message_id_creates_correct_id_for_single_position() {
293        let socket_addr = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 8080);
294        let store_host_length = 8;
295        let batch_size = 1;
296        let phy_pos = vec![12345];
297
298        let result = build_batch_message_id(socket_addr, store_host_length, batch_size, &phy_pos);
299
300        assert_eq!(result, "7F00000100001F900000000000003039");
301    }
302
303    #[test]
304    fn build_batch_message_id_creates_correct_id_for_multiple_positions() {
305        let socket_addr = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 8080);
306        let store_host_length = 8;
307        let batch_size = 2;
308        let phy_pos = vec![12345, 67890];
309
310        let result = build_batch_message_id(socket_addr, store_host_length, batch_size, &phy_pos);
311
312        assert_eq!(
313            result,
314            "7F00000100001F900000000000003039,7F00000100001F900000000000010932"
315        );
316    }
317
318    #[test]
319    fn build_batch_message_id_creates_empty_id_for_no_positions() {
320        let socket_addr = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 8080);
321        let store_host_length = 8;
322        let batch_size = 0;
323        let phy_pos = vec![];
324
325        let result = build_batch_message_id(socket_addr, store_host_length, batch_size, &phy_pos);
326
327        assert_eq!(result, "");
328    }
329}