rocketmq_common/common/message/
message_client_id_setter.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::sync::atomic::AtomicI32;
18use std::sync::atomic::Ordering;
19
20use bytes::BufMut;
21use bytes::BytesMut;
22use cheetah_string::CheetahString;
23use chrono::Datelike;
24use chrono::Months;
25use chrono::TimeZone;
26use chrono::Timelike;
27use chrono::Utc;
28use lazy_static::lazy_static;
29use parking_lot::Mutex;
30
31use crate::common::hasher::string_hasher::JavaStringHasher;
32use crate::common::message::message_single::Message;
33use crate::common::message::MessageConst;
34use crate::common::message::MessageTrait;
35use crate::utils::util_all;
36use crate::TimeUtils::get_current_millis;
37use crate::UtilAll::bytes_to_string;
38use crate::UtilAll::write_int;
39use crate::UtilAll::write_short;
40
41lazy_static! {
42    static ref COUNTER: AtomicI32 = AtomicI32::new(0);
43    static ref START_TIME: Mutex<i64> = Mutex::new(0);
44    static ref NEXT_START_TIME: Mutex<i64> = Mutex::new(0);
45    static ref LEN: usize = {
46        let ip = util_all::get_ip().unwrap_or_else(|_| create_fake_ip());
47        ip.len() + 2 + 4 + 4 + 2
48    };
49    static ref FIX_STRING: Vec<char> = {
50        let ip = util_all::get_ip().unwrap_or_else(|_| create_fake_ip());
51        let pid = std::process::id() as i16;
52        let class_loader_hash = JavaStringHasher::hash_str("MessageClientIDSetter");
53        let mut bytes = BytesMut::with_capacity(ip.len() + 2 + 4);
54        bytes.put(ip.as_slice());
55        bytes.put_i16(pid);
56        bytes.put_i32(class_loader_hash);
57        let data = bytes_to_string(bytes.freeze().as_ref())
58            .chars()
59            .collect::<Vec<char>>();
60        data
61    };
62}
63
64pub fn create_fake_ip() -> Vec<u8> {
65    get_current_millis().to_be_bytes()[4..].to_vec()
66}
67
68pub struct MessageClientIDSetter;
69
70impl MessageClientIDSetter {
71    #[inline]
72    pub fn get_uniq_id<T>(message: &T) -> Option<CheetahString>
73    where
74        T: MessageTrait,
75    {
76        message.get_property(&CheetahString::from_static_str(
77            MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
78        ))
79    }
80
81    fn set_start_time(millis: i64) {
82        let dt = Utc.timestamp_millis_opt(millis).unwrap();
83        let cal = Utc
84            .with_ymd_and_hms(dt.year(), dt.month(), 1, 0, 0, 0)
85            .unwrap();
86
87        *START_TIME.lock() = cal.timestamp_millis();
88
89        *NEXT_START_TIME.lock() = cal
90            .checked_add_months(Months::new(1))
91            .unwrap()
92            .timestamp_millis();
93    }
94
95    pub fn create_uniq_id() -> String {
96        let mut sb = vec!['\0'; *LEN * 2];
97        sb[..FIX_STRING.len()].copy_from_slice(&FIX_STRING);
98
99        let current = get_current_millis() as i64;
100        if current >= *NEXT_START_TIME.lock() {
101            Self::set_start_time(current);
102        }
103
104        let mut diff = (current - *START_TIME.lock()) as i32;
105        if diff < 0 && diff > -1_000_000 {
106            // May cause by NTP
107            diff = 0;
108        }
109
110        let pos = FIX_STRING.len();
111        write_int(&mut sb, pos, diff);
112
113        let counter_val = COUNTER.fetch_add(1, Ordering::SeqCst) as i16;
114        write_short(&mut sb, pos + 8, counter_val);
115        sb.into_iter().collect()
116    }
117
118    pub fn set_uniq_id<T>(message: &mut T)
119    where
120        T: MessageTrait,
121    {
122        if message
123            .get_property(&CheetahString::from_static_str(
124                MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
125            ))
126            .is_none()
127        {
128            let uniq_id = Self::create_uniq_id();
129            message.put_property(
130                CheetahString::from_static_str(
131                    MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
132                ),
133                CheetahString::from_string(uniq_id),
134            );
135        }
136    }
137}
138
139#[cfg(test)]
140mod tests {
141    use std::net::IpAddr;
142    use std::net::Ipv4Addr;
143
144    use chrono::Utc;
145
146    use super::*;
147
148    #[test]
149    fn unique_id_is_generated_correctly() {
150        let message = Message::default();
151        let unique_id = MessageClientIDSetter::create_uniq_id();
152        assert!(!unique_id.is_empty());
153    }
154
155    #[test]
156    fn unique_id_counter_increments_properly() {
157        let first_id = MessageClientIDSetter::create_uniq_id();
158        let second_id = MessageClientIDSetter::create_uniq_id();
159        let first_counter = &first_id[FIX_STRING.len() + 8..];
160        let second_counter = &second_id[FIX_STRING.len() + 8..];
161        assert_ne!(first_counter, second_counter);
162    }
163
164    #[test]
165    fn get_uniq_id_returns_none_when_not_set() {
166        let message = Message::default();
167        assert_eq!(MessageClientIDSetter::get_uniq_id(&message), None);
168    }
169
170    #[test]
171    fn create_fake_ip_generates_valid_ip() {
172        let fake_ip = create_fake_ip();
173        assert_eq!(fake_ip.len(), 4);
174        assert!(fake_ip.iter().all(|&byte| true));
175    }
176
177    #[test]
178    fn start_time_is_set_correctly_for_new_month() {
179        let millis = Utc::now().timestamp_millis();
180        MessageClientIDSetter::set_start_time(millis);
181        let locked_start_time = *START_TIME.lock();
182        assert!(locked_start_time <= millis);
183    }
184
185    #[test]
186    fn next_start_time_is_set_correctly_for_new_month() {
187        let millis = Utc::now().timestamp_millis();
188        MessageClientIDSetter::set_start_time(millis);
189        let locked_next_start_time = *NEXT_START_TIME.lock();
190        assert!(locked_next_start_time > millis);
191    }
192}