1use std::collections::HashMap;
18use std::fmt;
19use std::fmt::Display;
20use std::fmt::Formatter;
21use std::sync::atomic::AtomicI64;
22use std::sync::atomic::Ordering;
23
24use cheetah_string::CheetahString;
25use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils;
26use rocketmq_common::utils::time_utils;
27use serde::ser::SerializeStruct;
28use serde::Deserialize;
29use serde::Serialize;
30use serde::Serializer;
31
32use crate::rocketmq_serializable::RocketMQSerializable;
33
34pub mod admin;
35pub mod body;
36pub mod broker_sync_info;
37pub mod command_custom_header;
38pub mod filter;
39pub mod forbidden_type;
40pub mod header;
41pub mod heartbeat;
42pub mod namespace_util;
43pub mod namesrv;
44pub mod remoting_command;
45pub mod request_source;
46pub mod request_type;
47pub mod rocketmq_serializable;
48pub mod route;
49pub mod static_topic;
50pub mod subscription;
51pub mod topic;
52
53#[repr(u8)]
54#[derive(Debug, PartialEq, Eq)]
55pub enum RemotingCommandType {
56 REQUEST,
57 RESPONSE,
58}
59
60impl RemotingCommandType {
61 pub fn value_of(code: u8) -> Option<Self> {
62 match code {
63 0 => Some(RemotingCommandType::REQUEST),
64 1 => Some(RemotingCommandType::RESPONSE),
65 _ => None,
66 }
67 }
68
69 pub fn get_code(&self) -> u8 {
70 match self {
71 RemotingCommandType::REQUEST => 0,
72 RemotingCommandType::RESPONSE => 1,
73 }
74 }
75
76 pub fn get_type_from_name(name: &str) -> Option<Self> {
77 match name.to_ascii_uppercase().as_str() {
78 "REQUEST" => Some(RemotingCommandType::REQUEST),
79 "RESPONSE" => Some(RemotingCommandType::RESPONSE),
80 _ => None,
81 }
82 }
83}
84
85#[repr(u8)]
86#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Default, Hash, Copy)]
87pub enum LanguageCode {
88 JAVA,
89 CPP,
90 DOTNET,
91 PYTHON,
92 DELPHI,
93 ERLANG,
94 RUBY,
95 OTHER,
96 HTTP,
97 GO,
98 PHP,
99 OMS,
100 #[default]
101 RUST,
102}
103
104impl fmt::Display for LanguageCode {
105 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
106 match *self {
107 LanguageCode::JAVA => write!(f, "JAVA"),
108 LanguageCode::CPP => write!(f, "CPP"),
109 LanguageCode::DOTNET => write!(f, "DOTNET"),
110 LanguageCode::PYTHON => write!(f, "PYTHON"),
111 LanguageCode::DELPHI => write!(f, "DELPHI"),
112 LanguageCode::ERLANG => write!(f, "ERLANG"),
113 LanguageCode::RUBY => write!(f, "RUBY"),
114 LanguageCode::OTHER => write!(f, "OTHER"),
115 LanguageCode::HTTP => write!(f, "HTTP"),
116 LanguageCode::GO => write!(f, "GO"),
117 LanguageCode::PHP => write!(f, "PHP"),
118 LanguageCode::OMS => write!(f, "OMS"),
119 LanguageCode::RUST => write!(f, "RUST"),
120 }
121 }
122}
123
124impl From<LanguageCode> for u8 {
125 fn from(code: LanguageCode) -> Self {
126 code.get_code()
127 }
128}
129
130impl From<LanguageCode> for i32 {
131 fn from(code: LanguageCode) -> Self {
132 code.get_code() as i32
133 }
134}
135
136impl From<u32> for LanguageCode {
137 fn from(code: u32) -> Self {
138 if let Ok(c) = u8::try_from(code) {
139 LanguageCode::value_of(c).unwrap_or(LanguageCode::OTHER)
140 } else {
141 LanguageCode::OTHER
142 }
143 }
144}
145
146impl From<i32> for LanguageCode {
147 fn from(code: i32) -> Self {
148 if let Ok(c) = u8::try_from(code) {
149 LanguageCode::value_of(c).unwrap_or(LanguageCode::OTHER)
150 } else {
151 LanguageCode::OTHER
152 }
153 }
154}
155
156impl From<u8> for LanguageCode {
157 fn from(code: u8) -> Self {
158 LanguageCode::value_of(code).unwrap_or(LanguageCode::OTHER)
159 }
160}
161
162impl LanguageCode {
163 pub fn value_of(code: u8) -> Option<Self> {
164 match code {
165 0 => Some(LanguageCode::JAVA),
166 1 => Some(LanguageCode::CPP),
167 2 => Some(LanguageCode::DOTNET),
168 3 => Some(LanguageCode::PYTHON),
169 4 => Some(LanguageCode::DELPHI),
170 5 => Some(LanguageCode::ERLANG),
171 6 => Some(LanguageCode::RUBY),
172 7 => Some(LanguageCode::OTHER),
173 8 => Some(LanguageCode::HTTP),
174 9 => Some(LanguageCode::GO),
175 10 => Some(LanguageCode::PHP),
176 11 => Some(LanguageCode::OMS),
177 12 => Some(LanguageCode::RUST),
178 _ => Some(LanguageCode::OTHER),
179 }
180 }
181
182 pub fn get_code(&self) -> u8 {
183 match self {
184 LanguageCode::JAVA => 0,
185 LanguageCode::CPP => 1,
186 LanguageCode::DOTNET => 2,
187 LanguageCode::PYTHON => 3,
188 LanguageCode::DELPHI => 4,
189 LanguageCode::ERLANG => 5,
190 LanguageCode::RUBY => 6,
191 LanguageCode::OTHER => 7,
192 LanguageCode::HTTP => 8,
193 LanguageCode::GO => 9,
194 LanguageCode::PHP => 10,
195 LanguageCode::OMS => 11,
196 LanguageCode::RUST => 12,
197 }
198 }
199
200 pub fn get_code_from_name(name: &str) -> Option<Self> {
201 match name.to_ascii_uppercase().as_str() {
202 "JAVA" => Some(LanguageCode::JAVA),
203 "CPP" => Some(LanguageCode::CPP),
204 "DOTNET" => Some(LanguageCode::DOTNET),
205 "PYTHON" => Some(LanguageCode::PYTHON),
206 "DELPHI" => Some(LanguageCode::DELPHI),
207 "ERLANG" => Some(LanguageCode::ERLANG),
208 "RUBY" => Some(LanguageCode::RUBY),
209 "OTHER" => Some(LanguageCode::OTHER),
210 "HTTP" => Some(LanguageCode::HTTP),
211 "GO" => Some(LanguageCode::GO),
212 "PHP" => Some(LanguageCode::PHP),
213 "OMS" => Some(LanguageCode::OMS),
214 "RUST" => Some(LanguageCode::RUST),
215 _ => None,
216 }
217 }
218}
219
220#[repr(u8)]
221#[derive(Debug, PartialEq, Eq, Clone, Copy, Deserialize, Serialize)]
222pub enum SerializeType {
223 JSON,
224 ROCKETMQ,
225}
226
227impl fmt::Display for SerializeType {
228 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
229 match *self {
230 SerializeType::JSON => write!(f, "JSON"),
231 SerializeType::ROCKETMQ => write!(f, "ROCKETMQ"),
232 }
233 }
234}
235
236impl SerializeType {
237 pub fn value_of(code: u8) -> Option<Self> {
238 match code {
239 0 => Some(SerializeType::JSON),
240 1 => Some(SerializeType::ROCKETMQ),
241 _ => None,
242 }
243 }
244
245 pub fn get_code(&self) -> u8 {
246 match self {
247 SerializeType::JSON => 0,
248 SerializeType::ROCKETMQ => 1,
249 }
250 }
251}
252
253#[derive(Debug)]
254pub struct DataVersion {
255 state_version: i64,
256 timestamp: i64,
257 counter: AtomicI64,
258}
259
260impl Serialize for DataVersion {
261 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
262 where
263 S: Serializer,
264 {
265 let mut state = serializer.serialize_struct("DataVersion", 3)?;
266 state.serialize_field("stateVersion", &self.state_version)?;
267 state.serialize_field("timestamp", &self.timestamp)?;
268 state.serialize_field("counter", &self.counter.load(Ordering::SeqCst))?;
269 state.end()
270 }
271}
272
273impl<'de> Deserialize<'de> for DataVersion {
274 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
275 where
276 D: serde::Deserializer<'de>,
277 {
278 #[derive(Deserialize)]
279 #[serde(rename_all = "camelCase")]
280 struct DataVersionHelper {
281 state_version: i64,
282 timestamp: i64,
283 counter: i64,
284 }
285
286 let helper = DataVersionHelper::deserialize(deserializer)?;
287 Ok(DataVersion {
288 state_version: helper.state_version,
289 timestamp: helper.timestamp,
290 counter: AtomicI64::new(helper.counter),
291 })
292 }
293}
294
295impl Clone for DataVersion {
296 fn clone(&self) -> Self {
297 DataVersion {
298 state_version: self.state_version,
299 timestamp: self.timestamp,
300 counter: AtomicI64::new(self.counter.load(Ordering::SeqCst)),
301 }
302 }
303}
304
305impl PartialEq for DataVersion {
306 fn eq(&self, other: &Self) -> bool {
307 self.state_version == other.state_version
308 && self.timestamp == other.timestamp
309 && self.get_counter() == other.get_counter()
310 }
311}
312
313impl Eq for DataVersion {}
314
315impl PartialOrd for DataVersion {
316 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
317 Some(self.cmp(other))
318 }
319}
320
321impl Ord for DataVersion {
322 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
323 self.state_version
324 .cmp(&other.state_version)
325 .then_with(|| self.timestamp.cmp(&other.timestamp))
326 .then_with(|| self.get_counter().cmp(&other.get_counter()))
327 }
328}
329
330impl Default for DataVersion {
331 fn default() -> Self {
332 DataVersion::new()
333 }
334}
335
336impl DataVersion {
337 pub fn new() -> Self {
338 let timestamp = time_utils::get_current_millis() as i64;
339
340 DataVersion {
341 state_version: 0,
342 timestamp,
343 counter: AtomicI64::new(0),
344 }
345 }
346
347 pub fn assign_new_one(&mut self, data_version: &DataVersion) {
348 self.timestamp = data_version.timestamp;
349 self.state_version = data_version.state_version;
350 self.counter = AtomicI64::new(data_version.counter.load(Ordering::SeqCst));
351 }
352
353 pub fn get_state_version(&self) -> i64 {
354 self.state_version
355 }
356
357 pub fn set_state_version(&mut self, state_version: i64) {
358 self.state_version = state_version;
359 }
360
361 pub fn get_timestamp(&self) -> i64 {
362 self.timestamp
363 }
364
365 pub fn set_timestamp(&mut self, timestamp: i64) {
366 self.timestamp = timestamp;
367 }
368
369 pub fn get_counter(&self) -> i64 {
370 self.counter.load(Ordering::Relaxed)
371 }
372
373 pub fn set_counter(&mut self, counter: i64) {
374 self.counter.store(counter, Ordering::Relaxed);
375 }
376
377 pub fn increment_counter(&self) -> i64 {
378 self.counter.fetch_add(1, Ordering::Relaxed) + 1
379 }
380
381 pub fn state_version(&self) -> i64 {
382 self.state_version
383 }
384
385 pub fn timestamp(&self) -> i64 {
386 self.timestamp
387 }
388
389 pub fn counter(&self) -> i64 {
390 self.counter.load(Ordering::Relaxed)
391 }
392
393 pub fn next_version(&mut self) {
394 self.next_version_with(0)
395 }
396
397 pub fn next_version_with(&mut self, state_version: i64) {
398 self.timestamp = time_utils::get_current_millis() as i64;
399 self.state_version = state_version;
400 self.counter.fetch_add(1, Ordering::SeqCst);
401 }
402}
403
404impl Display for DataVersion {
405 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
406 let counter_value = self.counter.load(Ordering::SeqCst);
407 write!(
408 f,
409 "State Version: {}, Timestamp: {}, Counter: {}",
410 self.state_version, self.timestamp, counter_value
411 )
412 }
413}
414
415pub trait RemotingSerializable {
422 fn encode(&self) -> rocketmq_error::RocketMQResult<Vec<u8>>;
428
429 fn serialize_json(&self) -> rocketmq_error::RocketMQResult<String>;
435
436 fn serialize_json_pretty(&self) -> rocketmq_error::RocketMQResult<String>;
442}
443
444pub trait RemotingDeserializable {
454 type Output;
456
457 fn decode(bytes: &[u8]) -> rocketmq_error::RocketMQResult<Self::Output>;
470}
471
472pub trait JsonSerializable: Serialize + RemotingSerializable {}
473
474impl<T: Serialize> RemotingSerializable for T {
475 fn encode(&self) -> rocketmq_error::RocketMQResult<Vec<u8>> {
476 SerdeJsonUtils::serialize_json_vec(self)
477 }
478
479 fn serialize_json(&self) -> rocketmq_error::RocketMQResult<String> {
480 SerdeJsonUtils::serialize_json(self)
481 }
482
483 fn serialize_json_pretty(&self) -> rocketmq_error::RocketMQResult<String> {
484 SerdeJsonUtils::serialize_json_pretty(self)
485 }
486}
487
488impl<T: serde::de::DeserializeOwned> RemotingDeserializable for T {
489 type Output = T;
490 fn decode(bytes: &[u8]) -> rocketmq_error::RocketMQResult<Self::Output> {
491 SerdeJsonUtils::from_json_bytes(bytes)
492 }
493}
494
495pub trait FastCodesHeader {
501 fn write_if_not_null(out: &mut bytes::BytesMut, key: &str, value: &str) {
513 if !value.is_empty() {
514 RocketMQSerializable::write_str(out, true, key);
515 RocketMQSerializable::write_str(out, false, key);
516 }
517 }
518
519 fn encode_fast(&mut self, out: &mut bytes::BytesMut);
528
529 fn decode_fast(&mut self, fields: &HashMap<CheetahString, CheetahString>);
538}
539
540#[cfg(test)]
541mod tests {
542 use super::*;
543
544 #[test]
545 fn test_remoting_command_type() {
546 assert_eq!(
548 Some(RemotingCommandType::REQUEST),
549 RemotingCommandType::value_of(0)
550 );
551 assert_eq!(
552 Some(RemotingCommandType::RESPONSE),
553 RemotingCommandType::value_of(1)
554 );
555 assert_eq!(None, RemotingCommandType::value_of(2));
556
557 assert_eq!(0, RemotingCommandType::REQUEST.get_code());
559 assert_eq!(1, RemotingCommandType::RESPONSE.get_code());
560
561 assert_eq!(
563 Some(RemotingCommandType::REQUEST),
564 RemotingCommandType::get_type_from_name("REQUEST")
565 );
566 assert_eq!(
567 Some(RemotingCommandType::RESPONSE),
568 RemotingCommandType::get_type_from_name("RESPONSE")
569 );
570 assert_eq!(None, RemotingCommandType::get_type_from_name("UNKNOWN"));
571 }
572
573 #[test]
574 fn test_language_code() {
575 assert_eq!(Some(LanguageCode::JAVA), LanguageCode::value_of(0));
577 assert_eq!(Some(LanguageCode::CPP), LanguageCode::value_of(1));
578 assert_eq!(Some(LanguageCode::DOTNET), LanguageCode::value_of(2));
579 assert_eq!(0, LanguageCode::JAVA.get_code());
583 assert_eq!(1, LanguageCode::CPP.get_code());
584 assert_eq!(2, LanguageCode::DOTNET.get_code());
585
586 assert_eq!(
588 Some(LanguageCode::JAVA),
589 LanguageCode::get_code_from_name("JAVA")
590 );
591 assert_eq!(
592 Some(LanguageCode::CPP),
593 LanguageCode::get_code_from_name("CPP")
594 );
595 assert_eq!(
596 Some(LanguageCode::DOTNET),
597 LanguageCode::get_code_from_name("DOTNET")
598 );
599 }
600
601 #[test]
602 fn data_version_serialization_deserialization() {
603 let mut data_version = DataVersion::new();
604 data_version.set_state_version(10);
605 let serialized = serde_json::to_string(&data_version).unwrap();
606 let deserialized: DataVersion = serde_json::from_str(&serialized).unwrap();
607 assert_eq!(data_version.state_version, deserialized.state_version);
608 assert_eq!(data_version.timestamp, deserialized.timestamp);
609 assert_eq!(
610 data_version.counter.load(Ordering::SeqCst),
611 deserialized.counter.load(Ordering::SeqCst)
612 );
613 }
614
615 #[test]
616 fn data_version_counter_increment() {
617 let data_version = DataVersion::new();
618 let initial_counter = data_version.counter.load(Ordering::SeqCst);
619 data_version.increment_counter();
620 assert_eq!(
621 initial_counter + 1,
622 data_version.counter.load(Ordering::SeqCst)
623 );
624 }
625
626 #[test]
627 fn data_version_next_version() {
628 let mut data_version = DataVersion::new();
629 let initial_state_version = data_version.state_version;
630 let initial_timestamp = data_version.timestamp;
631 let initial_counter = data_version.counter.load(Ordering::SeqCst);
632 data_version.next_version();
633 assert_eq!(initial_state_version, data_version.state_version);
634 assert!(data_version.timestamp >= initial_timestamp);
635 assert_eq!(
636 initial_counter + 1,
637 data_version.counter.load(Ordering::SeqCst)
638 );
639 }
640
641 #[test]
642 fn data_version_next_version_with_state() {
643 let mut data_version = DataVersion::new();
644 let initial_timestamp = data_version.timestamp;
645 let initial_counter = data_version.counter.load(Ordering::SeqCst);
646 data_version.next_version_with(20);
647 assert_eq!(20, data_version.state_version);
648 assert!(data_version.timestamp >= initial_timestamp);
649 assert_eq!(
650 initial_counter + 1,
651 data_version.counter.load(Ordering::SeqCst)
652 );
653 }
654
655 #[test]
656 fn data_version_equality() {
657 let data_version1 = DataVersion::new();
658 let data_version2 = data_version1.clone();
659 assert_eq!(data_version1, data_version2);
660
661 data_version2.increment_counter();
662 assert_ne!(data_version1, data_version2);
663 }
664
665 #[test]
666 fn data_version_partial_ordering() {
667 let data_version1 = DataVersion::new();
668 let mut data_version2 = data_version1.clone();
669
670 assert_eq!(
671 data_version1.partial_cmp(&data_version2),
672 Some(std::cmp::Ordering::Equal)
673 );
674
675 data_version2.set_state_version(data_version1.get_state_version() + 1);
676 assert_eq!(
677 data_version1.partial_cmp(&data_version2),
678 Some(std::cmp::Ordering::Less)
679 );
680 assert_eq!(
681 data_version2.partial_cmp(&data_version1),
682 Some(std::cmp::Ordering::Greater)
683 );
684 }
685
686 #[test]
687 fn data_version_total_ordering() {
688 let data_version1 = DataVersion::new();
689 let mut data_version2 = data_version1.clone();
690
691 assert_eq!(data_version1.cmp(&data_version2), std::cmp::Ordering::Equal);
692
693 data_version2.set_state_version(data_version1.get_state_version() + 1);
694 assert_eq!(data_version1.cmp(&data_version2), std::cmp::Ordering::Less);
695 assert_eq!(
696 data_version2.cmp(&data_version1),
697 std::cmp::Ordering::Greater
698 );
699 }
700}