rocketmq_common/common/message/
message_client_id_setter.rs1use std::sync::atomic::AtomicI32;
18use std::sync::atomic::Ordering;
19
20use bytes::BufMut;
21use bytes::BytesMut;
22use cheetah_string::CheetahString;
23use chrono::Datelike;
24use chrono::Months;
25use chrono::TimeZone;
26use chrono::Timelike;
27use chrono::Utc;
28use lazy_static::lazy_static;
29use parking_lot::Mutex;
30
31use crate::common::hasher::string_hasher::JavaStringHasher;
32use crate::common::message::message_single::Message;
33use crate::common::message::MessageConst;
34use crate::common::message::MessageTrait;
35use crate::utils::util_all;
36use crate::TimeUtils::get_current_millis;
37use crate::UtilAll::bytes_to_string;
38use crate::UtilAll::write_int;
39use crate::UtilAll::write_short;
40
41lazy_static! {
42 static ref COUNTER: AtomicI32 = AtomicI32::new(0);
43 static ref START_TIME: Mutex<i64> = Mutex::new(0);
44 static ref NEXT_START_TIME: Mutex<i64> = Mutex::new(0);
45 static ref LEN: usize = {
46 let ip = util_all::get_ip().unwrap_or_else(|_| create_fake_ip());
47 ip.len() + 2 + 4 + 4 + 2
48 };
49 static ref FIX_STRING: Vec<char> = {
50 let ip = util_all::get_ip().unwrap_or_else(|_| create_fake_ip());
51 let pid = std::process::id() as i16;
52 let class_loader_hash = JavaStringHasher::hash_str("MessageClientIDSetter");
53 let mut bytes = BytesMut::with_capacity(ip.len() + 2 + 4);
54 bytes.put(ip.as_slice());
55 bytes.put_i16(pid);
56 bytes.put_i32(class_loader_hash);
57 let data = bytes_to_string(bytes.freeze().as_ref())
58 .chars()
59 .collect::<Vec<char>>();
60 data
61 };
62}
63
64pub fn create_fake_ip() -> Vec<u8> {
65 get_current_millis().to_be_bytes()[4..].to_vec()
66}
67
68pub struct MessageClientIDSetter;
69
70impl MessageClientIDSetter {
71 #[inline]
72 pub fn get_uniq_id<T>(message: &T) -> Option<CheetahString>
73 where
74 T: MessageTrait,
75 {
76 message.get_property(&CheetahString::from_static_str(
77 MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
78 ))
79 }
80
81 fn set_start_time(millis: i64) {
82 let dt = Utc.timestamp_millis_opt(millis).unwrap();
83 let cal = Utc
84 .with_ymd_and_hms(dt.year(), dt.month(), 1, 0, 0, 0)
85 .unwrap();
86
87 *START_TIME.lock() = cal.timestamp_millis();
88
89 *NEXT_START_TIME.lock() = cal
90 .checked_add_months(Months::new(1))
91 .unwrap()
92 .timestamp_millis();
93 }
94
95 pub fn create_uniq_id() -> String {
96 let mut sb = vec!['\0'; *LEN * 2];
97 sb[..FIX_STRING.len()].copy_from_slice(&FIX_STRING);
98
99 let current = get_current_millis() as i64;
100 if current >= *NEXT_START_TIME.lock() {
101 Self::set_start_time(current);
102 }
103
104 let mut diff = (current - *START_TIME.lock()) as i32;
105 if diff < 0 && diff > -1_000_000 {
106 diff = 0;
108 }
109
110 let pos = FIX_STRING.len();
111 write_int(&mut sb, pos, diff);
112
113 let counter_val = COUNTER.fetch_add(1, Ordering::SeqCst) as i16;
114 write_short(&mut sb, pos + 8, counter_val);
115 sb.into_iter().collect()
116 }
117
118 pub fn set_uniq_id<T>(message: &mut T)
119 where
120 T: MessageTrait,
121 {
122 if message
123 .get_property(&CheetahString::from_static_str(
124 MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
125 ))
126 .is_none()
127 {
128 let uniq_id = Self::create_uniq_id();
129 message.put_property(
130 CheetahString::from_static_str(
131 MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
132 ),
133 CheetahString::from_string(uniq_id),
134 );
135 }
136 }
137}
138
139#[cfg(test)]
140mod tests {
141 use std::net::IpAddr;
142 use std::net::Ipv4Addr;
143
144 use chrono::Utc;
145
146 use super::*;
147
148 #[test]
149 fn unique_id_is_generated_correctly() {
150 let message = Message::default();
151 let unique_id = MessageClientIDSetter::create_uniq_id();
152 assert!(!unique_id.is_empty());
153 }
154
155 #[test]
156 fn unique_id_counter_increments_properly() {
157 let first_id = MessageClientIDSetter::create_uniq_id();
158 let second_id = MessageClientIDSetter::create_uniq_id();
159 let first_counter = &first_id[FIX_STRING.len() + 8..];
160 let second_counter = &second_id[FIX_STRING.len() + 8..];
161 assert_ne!(first_counter, second_counter);
162 }
163
164 #[test]
165 fn get_uniq_id_returns_none_when_not_set() {
166 let message = Message::default();
167 assert_eq!(MessageClientIDSetter::get_uniq_id(&message), None);
168 }
169
170 #[test]
171 fn create_fake_ip_generates_valid_ip() {
172 let fake_ip = create_fake_ip();
173 assert_eq!(fake_ip.len(), 4);
174 assert!(fake_ip.iter().all(|&byte| true));
175 }
176
177 #[test]
178 fn start_time_is_set_correctly_for_new_month() {
179 let millis = Utc::now().timestamp_millis();
180 MessageClientIDSetter::set_start_time(millis);
181 let locked_start_time = *START_TIME.lock();
182 assert!(locked_start_time <= millis);
183 }
184
185 #[test]
186 fn next_start_time_is_set_correctly_for_new_month() {
187 let millis = Utc::now().timestamp_millis();
188 MessageClientIDSetter::set_start_time(millis);
189 let locked_next_start_time = *NEXT_START_TIME.lock();
190 assert!(locked_next_start_time > millis);
191 }
192}