1use std::collections::HashMap;
16use std::fmt;
17use std::fmt::Display;
18use std::fmt::Formatter;
19use std::sync::atomic::AtomicI64;
20use std::sync::atomic::Ordering;
21
22use cheetah_string::CheetahString;
23use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils;
24use rocketmq_common::utils::time_utils;
25use serde::ser::SerializeStruct;
26use serde::Deserialize;
27use serde::Serialize;
28use serde::Serializer;
29
30use crate::rocketmq_serializable::RocketMQSerializable;
31
32pub mod admin;
33pub mod bodies;
34pub mod body;
35pub mod broker_sync_info;
36pub mod command_custom_header;
37pub mod filter;
38pub mod forbidden_type;
39pub mod header;
40pub mod headers;
41pub mod heartbeat;
42pub mod namespace_util;
43pub mod namesrv;
44pub mod remoting_command;
45pub mod request_source;
46pub mod request_type;
47pub mod rocketmq_serializable;
48pub mod route;
49pub mod static_topic;
50pub mod subscription;
51pub mod topic;
52
53pub use self::command_custom_header::CommandCustomHeader;
55pub use self::command_custom_header::FromMap;
56pub use self::remoting_command::RemotingCommand;
57
58#[repr(u8)]
59#[derive(Debug, PartialEq, Eq)]
60pub enum RemotingCommandType {
61 REQUEST,
62 RESPONSE,
63}
64
65impl RemotingCommandType {
66 pub fn value_of(code: u8) -> Option<Self> {
67 match code {
68 0 => Some(RemotingCommandType::REQUEST),
69 1 => Some(RemotingCommandType::RESPONSE),
70 _ => None,
71 }
72 }
73
74 pub fn get_code(&self) -> u8 {
75 match self {
76 RemotingCommandType::REQUEST => 0,
77 RemotingCommandType::RESPONSE => 1,
78 }
79 }
80
81 pub fn get_type_from_name(name: &str) -> Option<Self> {
82 match name.to_ascii_uppercase().as_str() {
83 "REQUEST" => Some(RemotingCommandType::REQUEST),
84 "RESPONSE" => Some(RemotingCommandType::RESPONSE),
85 _ => None,
86 }
87 }
88}
89
90#[repr(u8)]
91#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Default, Hash, Copy)]
92pub enum LanguageCode {
93 JAVA,
94 CPP,
95 DOTNET,
96 PYTHON,
97 DELPHI,
98 ERLANG,
99 RUBY,
100 OTHER,
101 HTTP,
102 GO,
103 PHP,
104 OMS,
105 #[default]
106 RUST,
107}
108
109impl fmt::Display for LanguageCode {
110 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
111 match *self {
112 LanguageCode::JAVA => write!(f, "JAVA"),
113 LanguageCode::CPP => write!(f, "CPP"),
114 LanguageCode::DOTNET => write!(f, "DOTNET"),
115 LanguageCode::PYTHON => write!(f, "PYTHON"),
116 LanguageCode::DELPHI => write!(f, "DELPHI"),
117 LanguageCode::ERLANG => write!(f, "ERLANG"),
118 LanguageCode::RUBY => write!(f, "RUBY"),
119 LanguageCode::OTHER => write!(f, "OTHER"),
120 LanguageCode::HTTP => write!(f, "HTTP"),
121 LanguageCode::GO => write!(f, "GO"),
122 LanguageCode::PHP => write!(f, "PHP"),
123 LanguageCode::OMS => write!(f, "OMS"),
124 LanguageCode::RUST => write!(f, "RUST"),
125 }
126 }
127}
128
129impl From<LanguageCode> for u8 {
130 fn from(code: LanguageCode) -> Self {
131 code.get_code()
132 }
133}
134
135impl From<LanguageCode> for i32 {
136 fn from(code: LanguageCode) -> Self {
137 code.get_code() as i32
138 }
139}
140
141impl From<u32> for LanguageCode {
142 fn from(code: u32) -> Self {
143 if let Ok(c) = u8::try_from(code) {
144 LanguageCode::value_of(c).unwrap_or(LanguageCode::OTHER)
145 } else {
146 LanguageCode::OTHER
147 }
148 }
149}
150
151impl From<i32> for LanguageCode {
152 fn from(code: i32) -> Self {
153 if let Ok(c) = u8::try_from(code) {
154 LanguageCode::value_of(c).unwrap_or(LanguageCode::OTHER)
155 } else {
156 LanguageCode::OTHER
157 }
158 }
159}
160
161impl From<u8> for LanguageCode {
162 fn from(code: u8) -> Self {
163 LanguageCode::value_of(code).unwrap_or(LanguageCode::OTHER)
164 }
165}
166
167impl LanguageCode {
168 pub fn value_of(code: u8) -> Option<Self> {
169 match code {
170 0 => Some(LanguageCode::JAVA),
171 1 => Some(LanguageCode::CPP),
172 2 => Some(LanguageCode::DOTNET),
173 3 => Some(LanguageCode::PYTHON),
174 4 => Some(LanguageCode::DELPHI),
175 5 => Some(LanguageCode::ERLANG),
176 6 => Some(LanguageCode::RUBY),
177 7 => Some(LanguageCode::OTHER),
178 8 => Some(LanguageCode::HTTP),
179 9 => Some(LanguageCode::GO),
180 10 => Some(LanguageCode::PHP),
181 11 => Some(LanguageCode::OMS),
182 12 => Some(LanguageCode::RUST),
183 _ => Some(LanguageCode::OTHER),
184 }
185 }
186
187 pub fn get_code(&self) -> u8 {
188 match self {
189 LanguageCode::JAVA => 0,
190 LanguageCode::CPP => 1,
191 LanguageCode::DOTNET => 2,
192 LanguageCode::PYTHON => 3,
193 LanguageCode::DELPHI => 4,
194 LanguageCode::ERLANG => 5,
195 LanguageCode::RUBY => 6,
196 LanguageCode::OTHER => 7,
197 LanguageCode::HTTP => 8,
198 LanguageCode::GO => 9,
199 LanguageCode::PHP => 10,
200 LanguageCode::OMS => 11,
201 LanguageCode::RUST => 12,
202 }
203 }
204
205 pub fn get_code_from_name(name: &str) -> Option<Self> {
206 match name.to_ascii_uppercase().as_str() {
207 "JAVA" => Some(LanguageCode::JAVA),
208 "CPP" => Some(LanguageCode::CPP),
209 "DOTNET" => Some(LanguageCode::DOTNET),
210 "PYTHON" => Some(LanguageCode::PYTHON),
211 "DELPHI" => Some(LanguageCode::DELPHI),
212 "ERLANG" => Some(LanguageCode::ERLANG),
213 "RUBY" => Some(LanguageCode::RUBY),
214 "OTHER" => Some(LanguageCode::OTHER),
215 "HTTP" => Some(LanguageCode::HTTP),
216 "GO" => Some(LanguageCode::GO),
217 "PHP" => Some(LanguageCode::PHP),
218 "OMS" => Some(LanguageCode::OMS),
219 "RUST" => Some(LanguageCode::RUST),
220 _ => None,
221 }
222 }
223}
224
225#[repr(u8)]
226#[derive(Debug, PartialEq, Eq, Clone, Copy, Deserialize, Serialize)]
227pub enum SerializeType {
228 JSON,
229 ROCKETMQ,
230}
231
232impl fmt::Display for SerializeType {
233 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
234 match *self {
235 SerializeType::JSON => write!(f, "JSON"),
236 SerializeType::ROCKETMQ => write!(f, "ROCKETMQ"),
237 }
238 }
239}
240
241impl SerializeType {
242 pub fn value_of(code: u8) -> Option<Self> {
243 match code {
244 0 => Some(SerializeType::JSON),
245 1 => Some(SerializeType::ROCKETMQ),
246 _ => None,
247 }
248 }
249
250 pub fn get_code(&self) -> u8 {
251 match self {
252 SerializeType::JSON => 0,
253 SerializeType::ROCKETMQ => 1,
254 }
255 }
256}
257
258#[derive(Debug)]
259pub struct DataVersion {
260 state_version: i64,
261 timestamp: i64,
262 counter: AtomicI64,
263}
264
265impl Serialize for DataVersion {
266 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
267 where
268 S: Serializer,
269 {
270 let mut state = serializer.serialize_struct("DataVersion", 3)?;
271 state.serialize_field("stateVersion", &self.state_version)?;
272 state.serialize_field("timestamp", &self.timestamp)?;
273 state.serialize_field("counter", &self.counter.load(Ordering::SeqCst))?;
274 state.end()
275 }
276}
277
278impl<'de> Deserialize<'de> for DataVersion {
279 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
280 where
281 D: serde::Deserializer<'de>,
282 {
283 #[derive(Deserialize)]
284 #[serde(rename_all = "camelCase")]
285 struct DataVersionHelper {
286 state_version: i64,
287 timestamp: i64,
288 counter: i64,
289 }
290
291 let helper = DataVersionHelper::deserialize(deserializer)?;
292 Ok(DataVersion {
293 state_version: helper.state_version,
294 timestamp: helper.timestamp,
295 counter: AtomicI64::new(helper.counter),
296 })
297 }
298}
299
300impl Clone for DataVersion {
301 fn clone(&self) -> Self {
302 DataVersion {
303 state_version: self.state_version,
304 timestamp: self.timestamp,
305 counter: AtomicI64::new(self.counter.load(Ordering::SeqCst)),
306 }
307 }
308}
309
310impl PartialEq for DataVersion {
311 fn eq(&self, other: &Self) -> bool {
312 self.state_version == other.state_version
313 && self.timestamp == other.timestamp
314 && self.get_counter() == other.get_counter()
315 }
316}
317
318impl Eq for DataVersion {}
319
320impl PartialOrd for DataVersion {
321 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
322 Some(self.cmp(other))
323 }
324}
325
326impl Ord for DataVersion {
327 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
328 self.state_version
329 .cmp(&other.state_version)
330 .then_with(|| self.timestamp.cmp(&other.timestamp))
331 .then_with(|| self.get_counter().cmp(&other.get_counter()))
332 }
333}
334
335impl Default for DataVersion {
336 fn default() -> Self {
337 DataVersion::new()
338 }
339}
340
341impl DataVersion {
342 pub fn new() -> Self {
343 let timestamp = time_utils::current_millis() as i64;
344
345 DataVersion {
346 state_version: 0,
347 timestamp,
348 counter: AtomicI64::new(0),
349 }
350 }
351
352 pub fn assign_new_one(&mut self, data_version: &DataVersion) {
353 self.timestamp = data_version.timestamp;
354 self.state_version = data_version.state_version;
355 self.counter = AtomicI64::new(data_version.counter.load(Ordering::SeqCst));
356 }
357
358 pub fn get_state_version(&self) -> i64 {
359 self.state_version
360 }
361
362 pub fn set_state_version(&mut self, state_version: i64) {
363 self.state_version = state_version;
364 }
365
366 pub fn get_timestamp(&self) -> i64 {
367 self.timestamp
368 }
369
370 pub fn set_timestamp(&mut self, timestamp: i64) {
371 self.timestamp = timestamp;
372 }
373
374 pub fn get_counter(&self) -> i64 {
375 self.counter.load(Ordering::Relaxed)
376 }
377
378 pub fn set_counter(&mut self, counter: i64) {
379 self.counter.store(counter, Ordering::Relaxed);
380 }
381
382 pub fn increment_counter(&self) -> i64 {
383 self.counter.fetch_add(1, Ordering::Relaxed) + 1
384 }
385
386 pub fn state_version(&self) -> i64 {
387 self.state_version
388 }
389
390 pub fn timestamp(&self) -> i64 {
391 self.timestamp
392 }
393
394 pub fn counter(&self) -> i64 {
395 self.counter.load(Ordering::Relaxed)
396 }
397
398 pub fn next_version(&mut self) {
399 self.next_version_with(0)
400 }
401
402 pub fn next_version_with(&mut self, state_version: i64) {
403 self.timestamp = time_utils::current_millis() as i64;
404 self.state_version = state_version;
405 self.counter.fetch_add(1, Ordering::SeqCst);
406 }
407}
408
409impl Display for DataVersion {
410 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
411 let counter_value = self.counter.load(Ordering::SeqCst);
412 write!(
413 f,
414 "State Version: {}, Timestamp: {}, Counter: {}",
415 self.state_version, self.timestamp, counter_value
416 )
417 }
418}
419
420pub trait RemotingSerializable {
427 fn encode(&self) -> rocketmq_error::RocketMQResult<Vec<u8>>;
433
434 fn serialize_json(&self) -> rocketmq_error::RocketMQResult<String>;
440
441 fn serialize_json_pretty(&self) -> rocketmq_error::RocketMQResult<String>;
447}
448
449pub trait RemotingDeserializable {
459 type Output;
461
462 fn decode(bytes: &[u8]) -> rocketmq_error::RocketMQResult<Self::Output>;
475
476 fn decode_str(s: &str) -> rocketmq_error::RocketMQResult<Self::Output> {
486 Self::decode(s.as_bytes())
487 }
488
489 fn decode_string(s: String) -> rocketmq_error::RocketMQResult<Self::Output> {
499 Self::decode_str(&s)
500 }
501}
502
503impl<T: Serialize> RemotingSerializable for T {
504 fn encode(&self) -> rocketmq_error::RocketMQResult<Vec<u8>> {
505 SerdeJsonUtils::serialize_json_vec(self)
506 }
507
508 fn serialize_json(&self) -> rocketmq_error::RocketMQResult<String> {
509 SerdeJsonUtils::serialize_json(self)
510 }
511
512 fn serialize_json_pretty(&self) -> rocketmq_error::RocketMQResult<String> {
513 SerdeJsonUtils::serialize_json_pretty(self)
514 }
515}
516
517impl<T: serde::de::DeserializeOwned> RemotingDeserializable for T {
518 type Output = T;
519 fn decode(bytes: &[u8]) -> rocketmq_error::RocketMQResult<Self::Output> {
520 SerdeJsonUtils::from_json_bytes(bytes)
521 }
522}
523
524pub trait FastCodesHeader {
530 fn write_if_not_null(out: &mut bytes::BytesMut, key: &str, value: &str) {
542 if !value.is_empty() {
543 RocketMQSerializable::write_str(out, true, key);
544 RocketMQSerializable::write_str(out, false, value);
545 }
546 }
547
548 fn encode_fast(&mut self, out: &mut bytes::BytesMut);
557
558 fn decode_fast(&mut self, fields: &HashMap<CheetahString, CheetahString>);
567}
568
569#[cfg(test)]
570mod tests {
571 use super::*;
572
573 #[test]
574 fn test_remoting_command_type() {
575 assert_eq!(Some(RemotingCommandType::REQUEST), RemotingCommandType::value_of(0));
577 assert_eq!(Some(RemotingCommandType::RESPONSE), RemotingCommandType::value_of(1));
578 assert_eq!(None, RemotingCommandType::value_of(2));
579
580 assert_eq!(0, RemotingCommandType::REQUEST.get_code());
582 assert_eq!(1, RemotingCommandType::RESPONSE.get_code());
583
584 assert_eq!(
586 Some(RemotingCommandType::REQUEST),
587 RemotingCommandType::get_type_from_name("REQUEST")
588 );
589 assert_eq!(
590 Some(RemotingCommandType::RESPONSE),
591 RemotingCommandType::get_type_from_name("RESPONSE")
592 );
593 assert_eq!(None, RemotingCommandType::get_type_from_name("UNKNOWN"));
594 }
595
596 #[test]
597 fn test_language_code() {
598 assert_eq!(Some(LanguageCode::JAVA), LanguageCode::value_of(0));
600 assert_eq!(Some(LanguageCode::CPP), LanguageCode::value_of(1));
601 assert_eq!(Some(LanguageCode::DOTNET), LanguageCode::value_of(2));
602 assert_eq!(0, LanguageCode::JAVA.get_code());
606 assert_eq!(1, LanguageCode::CPP.get_code());
607 assert_eq!(2, LanguageCode::DOTNET.get_code());
608
609 assert_eq!(Some(LanguageCode::JAVA), LanguageCode::get_code_from_name("JAVA"));
611 assert_eq!(Some(LanguageCode::CPP), LanguageCode::get_code_from_name("CPP"));
612 assert_eq!(Some(LanguageCode::DOTNET), LanguageCode::get_code_from_name("DOTNET"));
613 }
614
615 #[test]
616 fn data_version_serialization_deserialization() {
617 let mut data_version = DataVersion::new();
618 data_version.set_state_version(10);
619 let serialized = serde_json::to_string(&data_version).unwrap();
620 let deserialized: DataVersion = serde_json::from_str(&serialized).unwrap();
621 assert_eq!(data_version.state_version, deserialized.state_version);
622 assert_eq!(data_version.timestamp, deserialized.timestamp);
623 assert_eq!(
624 data_version.counter.load(Ordering::SeqCst),
625 deserialized.counter.load(Ordering::SeqCst)
626 );
627 }
628
629 #[test]
630 fn data_version_counter_increment() {
631 let data_version = DataVersion::new();
632 let initial_counter = data_version.counter.load(Ordering::SeqCst);
633 data_version.increment_counter();
634 assert_eq!(initial_counter + 1, data_version.counter.load(Ordering::SeqCst));
635 }
636
637 #[test]
638 fn data_version_next_version() {
639 let mut data_version = DataVersion::new();
640 let initial_state_version = data_version.state_version;
641 let initial_timestamp = data_version.timestamp;
642 let initial_counter = data_version.counter.load(Ordering::SeqCst);
643 data_version.next_version();
644 assert_eq!(initial_state_version, data_version.state_version);
645 assert!(data_version.timestamp >= initial_timestamp);
646 assert_eq!(initial_counter + 1, data_version.counter.load(Ordering::SeqCst));
647 }
648
649 #[test]
650 fn data_version_next_version_with_state() {
651 let mut data_version = DataVersion::new();
652 let initial_timestamp = data_version.timestamp;
653 let initial_counter = data_version.counter.load(Ordering::SeqCst);
654 data_version.next_version_with(20);
655 assert_eq!(20, data_version.state_version);
656 assert!(data_version.timestamp >= initial_timestamp);
657 assert_eq!(initial_counter + 1, data_version.counter.load(Ordering::SeqCst));
658 }
659
660 #[test]
661 fn data_version_equality() {
662 let data_version1 = DataVersion::new();
663 let data_version2 = data_version1.clone();
664 assert_eq!(data_version1, data_version2);
665
666 data_version2.increment_counter();
667 assert_ne!(data_version1, data_version2);
668 }
669
670 #[test]
671 fn data_version_partial_ordering() {
672 let data_version1 = DataVersion::new();
673 let mut data_version2 = data_version1.clone();
674
675 assert_eq!(
676 data_version1.partial_cmp(&data_version2),
677 Some(std::cmp::Ordering::Equal)
678 );
679
680 data_version2.set_state_version(data_version1.get_state_version() + 1);
681 assert_eq!(
682 data_version1.partial_cmp(&data_version2),
683 Some(std::cmp::Ordering::Less)
684 );
685 assert_eq!(
686 data_version2.partial_cmp(&data_version1),
687 Some(std::cmp::Ordering::Greater)
688 );
689 }
690
691 #[test]
692 fn data_version_total_ordering() {
693 let data_version1 = DataVersion::new();
694 let mut data_version2 = data_version1.clone();
695
696 assert_eq!(data_version1.cmp(&data_version2), std::cmp::Ordering::Equal);
697
698 data_version2.set_state_version(data_version1.get_state_version() + 1);
699 assert_eq!(data_version1.cmp(&data_version2), std::cmp::Ordering::Less);
700 assert_eq!(data_version2.cmp(&data_version1), std::cmp::Ordering::Greater);
701 }
702}