rocketmq_remoting/
protocol.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::collections::HashMap;
18use std::fmt;
19use std::fmt::Display;
20use std::fmt::Formatter;
21use std::sync::atomic::AtomicI64;
22use std::sync::atomic::Ordering;
23
24use cheetah_string::CheetahString;
25use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils;
26use rocketmq_common::utils::time_utils;
27use serde::ser::SerializeStruct;
28use serde::Deserialize;
29use serde::Serialize;
30use serde::Serializer;
31
32use crate::rocketmq_serializable::RocketMQSerializable;
33
34pub mod admin;
35pub mod body;
36pub mod broker_sync_info;
37pub mod command_custom_header;
38pub mod filter;
39pub mod forbidden_type;
40pub mod header;
41pub mod heartbeat;
42pub mod namespace_util;
43pub mod namesrv;
44pub mod remoting_command;
45pub mod request_source;
46pub mod request_type;
47pub mod rocketmq_serializable;
48pub mod route;
49pub mod static_topic;
50pub mod subscription;
51pub mod topic;
52
53#[repr(u8)]
54#[derive(Debug, PartialEq, Eq)]
55pub enum RemotingCommandType {
56    REQUEST,
57    RESPONSE,
58}
59
60impl RemotingCommandType {
61    pub fn value_of(code: u8) -> Option<Self> {
62        match code {
63            0 => Some(RemotingCommandType::REQUEST),
64            1 => Some(RemotingCommandType::RESPONSE),
65            _ => None,
66        }
67    }
68
69    pub fn get_code(&self) -> u8 {
70        match self {
71            RemotingCommandType::REQUEST => 0,
72            RemotingCommandType::RESPONSE => 1,
73        }
74    }
75
76    pub fn get_type_from_name(name: &str) -> Option<Self> {
77        match name.to_ascii_uppercase().as_str() {
78            "REQUEST" => Some(RemotingCommandType::REQUEST),
79            "RESPONSE" => Some(RemotingCommandType::RESPONSE),
80            _ => None,
81        }
82    }
83}
84
85#[repr(u8)]
86#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Default, Hash, Copy)]
87pub enum LanguageCode {
88    JAVA,
89    CPP,
90    DOTNET,
91    PYTHON,
92    DELPHI,
93    ERLANG,
94    RUBY,
95    OTHER,
96    HTTP,
97    GO,
98    PHP,
99    OMS,
100    #[default]
101    RUST,
102}
103
104impl fmt::Display for LanguageCode {
105    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
106        match *self {
107            LanguageCode::JAVA => write!(f, "JAVA"),
108            LanguageCode::CPP => write!(f, "CPP"),
109            LanguageCode::DOTNET => write!(f, "DOTNET"),
110            LanguageCode::PYTHON => write!(f, "PYTHON"),
111            LanguageCode::DELPHI => write!(f, "DELPHI"),
112            LanguageCode::ERLANG => write!(f, "ERLANG"),
113            LanguageCode::RUBY => write!(f, "RUBY"),
114            LanguageCode::OTHER => write!(f, "OTHER"),
115            LanguageCode::HTTP => write!(f, "HTTP"),
116            LanguageCode::GO => write!(f, "GO"),
117            LanguageCode::PHP => write!(f, "PHP"),
118            LanguageCode::OMS => write!(f, "OMS"),
119            LanguageCode::RUST => write!(f, "RUST"),
120        }
121    }
122}
123
124impl From<LanguageCode> for u8 {
125    fn from(code: LanguageCode) -> Self {
126        code.get_code()
127    }
128}
129
130impl From<LanguageCode> for i32 {
131    fn from(code: LanguageCode) -> Self {
132        code.get_code() as i32
133    }
134}
135
136impl From<u32> for LanguageCode {
137    fn from(code: u32) -> Self {
138        if let Ok(c) = u8::try_from(code) {
139            LanguageCode::value_of(c).unwrap_or(LanguageCode::OTHER)
140        } else {
141            LanguageCode::OTHER
142        }
143    }
144}
145
146impl From<i32> for LanguageCode {
147    fn from(code: i32) -> Self {
148        if let Ok(c) = u8::try_from(code) {
149            LanguageCode::value_of(c).unwrap_or(LanguageCode::OTHER)
150        } else {
151            LanguageCode::OTHER
152        }
153    }
154}
155
156impl From<u8> for LanguageCode {
157    fn from(code: u8) -> Self {
158        LanguageCode::value_of(code).unwrap_or(LanguageCode::OTHER)
159    }
160}
161
162impl LanguageCode {
163    pub fn value_of(code: u8) -> Option<Self> {
164        match code {
165            0 => Some(LanguageCode::JAVA),
166            1 => Some(LanguageCode::CPP),
167            2 => Some(LanguageCode::DOTNET),
168            3 => Some(LanguageCode::PYTHON),
169            4 => Some(LanguageCode::DELPHI),
170            5 => Some(LanguageCode::ERLANG),
171            6 => Some(LanguageCode::RUBY),
172            7 => Some(LanguageCode::OTHER),
173            8 => Some(LanguageCode::HTTP),
174            9 => Some(LanguageCode::GO),
175            10 => Some(LanguageCode::PHP),
176            11 => Some(LanguageCode::OMS),
177            12 => Some(LanguageCode::RUST),
178            _ => Some(LanguageCode::OTHER),
179        }
180    }
181
182    pub fn get_code(&self) -> u8 {
183        match self {
184            LanguageCode::JAVA => 0,
185            LanguageCode::CPP => 1,
186            LanguageCode::DOTNET => 2,
187            LanguageCode::PYTHON => 3,
188            LanguageCode::DELPHI => 4,
189            LanguageCode::ERLANG => 5,
190            LanguageCode::RUBY => 6,
191            LanguageCode::OTHER => 7,
192            LanguageCode::HTTP => 8,
193            LanguageCode::GO => 9,
194            LanguageCode::PHP => 10,
195            LanguageCode::OMS => 11,
196            LanguageCode::RUST => 12,
197        }
198    }
199
200    pub fn get_code_from_name(name: &str) -> Option<Self> {
201        match name.to_ascii_uppercase().as_str() {
202            "JAVA" => Some(LanguageCode::JAVA),
203            "CPP" => Some(LanguageCode::CPP),
204            "DOTNET" => Some(LanguageCode::DOTNET),
205            "PYTHON" => Some(LanguageCode::PYTHON),
206            "DELPHI" => Some(LanguageCode::DELPHI),
207            "ERLANG" => Some(LanguageCode::ERLANG),
208            "RUBY" => Some(LanguageCode::RUBY),
209            "OTHER" => Some(LanguageCode::OTHER),
210            "HTTP" => Some(LanguageCode::HTTP),
211            "GO" => Some(LanguageCode::GO),
212            "PHP" => Some(LanguageCode::PHP),
213            "OMS" => Some(LanguageCode::OMS),
214            "RUST" => Some(LanguageCode::RUST),
215            _ => None,
216        }
217    }
218}
219
220#[repr(u8)]
221#[derive(Debug, PartialEq, Eq, Clone, Copy, Deserialize, Serialize)]
222pub enum SerializeType {
223    JSON,
224    ROCKETMQ,
225}
226
227impl fmt::Display for SerializeType {
228    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
229        match *self {
230            SerializeType::JSON => write!(f, "JSON"),
231            SerializeType::ROCKETMQ => write!(f, "ROCKETMQ"),
232        }
233    }
234}
235
236impl SerializeType {
237    pub fn value_of(code: u8) -> Option<Self> {
238        match code {
239            0 => Some(SerializeType::JSON),
240            1 => Some(SerializeType::ROCKETMQ),
241            _ => None,
242        }
243    }
244
245    pub fn get_code(&self) -> u8 {
246        match self {
247            SerializeType::JSON => 0,
248            SerializeType::ROCKETMQ => 1,
249        }
250    }
251}
252
253#[derive(Debug)]
254pub struct DataVersion {
255    state_version: i64,
256    timestamp: i64,
257    counter: AtomicI64,
258}
259
260impl Serialize for DataVersion {
261    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
262    where
263        S: Serializer,
264    {
265        let mut state = serializer.serialize_struct("DataVersion", 3)?;
266        state.serialize_field("stateVersion", &self.state_version)?;
267        state.serialize_field("timestamp", &self.timestamp)?;
268        state.serialize_field("counter", &self.counter.load(Ordering::SeqCst))?;
269        state.end()
270    }
271}
272
273impl<'de> Deserialize<'de> for DataVersion {
274    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
275    where
276        D: serde::Deserializer<'de>,
277    {
278        #[derive(Deserialize)]
279        #[serde(rename_all = "camelCase")]
280        struct DataVersionHelper {
281            state_version: i64,
282            timestamp: i64,
283            counter: i64,
284        }
285
286        let helper = DataVersionHelper::deserialize(deserializer)?;
287        Ok(DataVersion {
288            state_version: helper.state_version,
289            timestamp: helper.timestamp,
290            counter: AtomicI64::new(helper.counter),
291        })
292    }
293}
294
295impl Clone for DataVersion {
296    fn clone(&self) -> Self {
297        DataVersion {
298            state_version: self.state_version,
299            timestamp: self.timestamp,
300            counter: AtomicI64::new(self.counter.load(Ordering::SeqCst)),
301        }
302    }
303}
304
305impl PartialEq for DataVersion {
306    fn eq(&self, other: &Self) -> bool {
307        self.state_version == other.state_version
308            && self.timestamp == other.timestamp
309            && self.get_counter() == other.get_counter()
310    }
311}
312
313impl Eq for DataVersion {}
314
315impl PartialOrd for DataVersion {
316    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
317        Some(self.cmp(other))
318    }
319}
320
321impl Ord for DataVersion {
322    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
323        self.state_version
324            .cmp(&other.state_version)
325            .then_with(|| self.timestamp.cmp(&other.timestamp))
326            .then_with(|| self.get_counter().cmp(&other.get_counter()))
327    }
328}
329
330impl Default for DataVersion {
331    fn default() -> Self {
332        DataVersion::new()
333    }
334}
335
336impl DataVersion {
337    pub fn new() -> Self {
338        let timestamp = time_utils::get_current_millis() as i64;
339
340        DataVersion {
341            state_version: 0,
342            timestamp,
343            counter: AtomicI64::new(0),
344        }
345    }
346
347    pub fn assign_new_one(&mut self, data_version: &DataVersion) {
348        self.timestamp = data_version.timestamp;
349        self.state_version = data_version.state_version;
350        self.counter = AtomicI64::new(data_version.counter.load(Ordering::SeqCst));
351    }
352
353    pub fn get_state_version(&self) -> i64 {
354        self.state_version
355    }
356
357    pub fn set_state_version(&mut self, state_version: i64) {
358        self.state_version = state_version;
359    }
360
361    pub fn get_timestamp(&self) -> i64 {
362        self.timestamp
363    }
364
365    pub fn set_timestamp(&mut self, timestamp: i64) {
366        self.timestamp = timestamp;
367    }
368
369    pub fn get_counter(&self) -> i64 {
370        self.counter.load(Ordering::Relaxed)
371    }
372
373    pub fn set_counter(&mut self, counter: i64) {
374        self.counter.store(counter, Ordering::Relaxed);
375    }
376
377    pub fn increment_counter(&self) -> i64 {
378        self.counter.fetch_add(1, Ordering::Relaxed) + 1
379    }
380
381    pub fn state_version(&self) -> i64 {
382        self.state_version
383    }
384
385    pub fn timestamp(&self) -> i64 {
386        self.timestamp
387    }
388
389    pub fn counter(&self) -> i64 {
390        self.counter.load(Ordering::Relaxed)
391    }
392
393    pub fn next_version(&mut self) {
394        self.next_version_with(0)
395    }
396
397    pub fn next_version_with(&mut self, state_version: i64) {
398        self.timestamp = time_utils::get_current_millis() as i64;
399        self.state_version = state_version;
400        self.counter.fetch_add(1, Ordering::SeqCst);
401    }
402}
403
404impl Display for DataVersion {
405    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
406        let counter_value = self.counter.load(Ordering::SeqCst);
407        write!(
408            f,
409            "State Version: {}, Timestamp: {}, Counter: {}",
410            self.state_version, self.timestamp, counter_value
411        )
412    }
413}
414
415/// Trait for serializable objects in a remoting context.
416///
417/// This trait defines methods for serializing objects into different formats,
418/// including binary, standard JSON, and pretty-printed JSON. It is intended for
419/// use with types that need to be transmitted over a network or stored in a format
420/// that can be easily shared or read.
421pub trait RemotingSerializable {
422    /// Encodes the object into a vector of bytes.
423    ///
424    /// # Returns
425    /// A `Result` containing a vector of bytes representing the encoded object,
426    /// or an error if encoding fails.
427    fn encode(&self) -> rocketmq_error::RocketMQResult<Vec<u8>>;
428
429    /// Serializes the object into a JSON string.
430    ///
431    /// # Returns
432    /// A `Result` containing a JSON string representing the object,
433    /// or an error if serialization fails.
434    fn serialize_json(&self) -> rocketmq_error::RocketMQResult<String>;
435
436    /// Serializes the object into a pretty-printed JSON string.
437    ///
438    /// # Returns
439    /// A `Result` containing a pretty-printed JSON string representing the object,
440    /// or an error if serialization fails.
441    fn serialize_json_pretty(&self) -> rocketmq_error::RocketMQResult<String>;
442}
443
444/// Trait for deserializing objects in a remoting context.
445///
446/// This trait defines a method for deserializing objects from a binary format,
447/// typically received over a network. Implementors of this trait can specify
448/// their own output types and deserialization logic, making it flexible for
449/// various use cases.
450///
451/// # Type Parameters
452/// - `Output`: The type of the object after deserialization.
453pub trait RemotingDeserializable {
454    /// The output type resulting from the deserialization.
455    type Output;
456
457    /// Deserializes an object from a slice of bytes.
458    ///
459    /// This method attempts to convert a slice of bytes into an instance of the `Output` type.
460    /// It returns a `Result` indicating either success with the deserialized object or an error
461    /// if deserialization fails.
462    ///
463    /// # Arguments
464    /// * `bytes` - A slice of bytes representing the serialized object.
465    ///
466    /// # Returns
467    /// A `Result` containing either the deserialized object of type `Output` or an `Error` if
468    /// deserialization fails.
469    fn decode(bytes: &[u8]) -> rocketmq_error::RocketMQResult<Self::Output>;
470}
471
472pub trait JsonSerializable: Serialize + RemotingSerializable {}
473
474impl<T: Serialize> RemotingSerializable for T {
475    fn encode(&self) -> rocketmq_error::RocketMQResult<Vec<u8>> {
476        SerdeJsonUtils::serialize_json_vec(self)
477    }
478
479    fn serialize_json(&self) -> rocketmq_error::RocketMQResult<String> {
480        SerdeJsonUtils::serialize_json(self)
481    }
482
483    fn serialize_json_pretty(&self) -> rocketmq_error::RocketMQResult<String> {
484        SerdeJsonUtils::serialize_json_pretty(self)
485    }
486}
487
488impl<T: serde::de::DeserializeOwned> RemotingDeserializable for T {
489    type Output = T;
490    fn decode(bytes: &[u8]) -> rocketmq_error::RocketMQResult<Self::Output> {
491        SerdeJsonUtils::from_json_bytes(bytes)
492    }
493}
494
495/// Trait for handling fast encoding and decoding headers in a RocketMQ message.
496///
497/// This trait provides methods for efficiently encoding and decoding message headers,
498/// leveraging RocketMQ's serialization capabilities. It is designed for internal use
499/// within the RocketMQ client to optimize performance in message processing.
500pub trait FastCodesHeader {
501    /// Writes a key-value pair to the output buffer if the value is not null or empty.
502    ///
503    /// This method is a utility function used during the encoding process to ensure
504    /// that only non-empty values are written to the buffer, avoiding unnecessary
505    /// serialization of empty strings.
506    ///
507    /// # Arguments
508    /// * `out` - A mutable reference to the output buffer (`bytes::BytesMut`) where the key-value
509    ///   pair is written.
510    /// * `key` - The key as a string slice.
511    /// * `value` - The value as a string slice.
512    fn write_if_not_null(out: &mut bytes::BytesMut, key: &str, value: &str) {
513        if !value.is_empty() {
514            RocketMQSerializable::write_str(out, true, key);
515            RocketMQSerializable::write_str(out, false, key);
516        }
517    }
518
519    /// Encodes the implementing object's data into the provided output buffer.
520    ///
521    /// This method should be implemented to encode the specific header fields of a message
522    /// or another entity into a compact binary format for transmission or storage.
523    ///
524    /// # Arguments
525    /// * `out` - A mutable reference to the output buffer (`bytes::BytesMut`) where the encoded
526    ///   data is written.
527    fn encode_fast(&mut self, out: &mut bytes::BytesMut);
528
529    /// Decodes data from a map of fields into the implementing object.
530    ///
531    /// This method should be implemented to populate the object's fields from a map
532    /// containing header names and values. It is used to reconstruct an object from
533    /// data received over the network or read from storage.
534    ///
535    /// # Arguments
536    /// * `fields` - A reference to a `HashMap` containing the header fields as key-value pairs.
537    fn decode_fast(&mut self, fields: &HashMap<CheetahString, CheetahString>);
538}
539
540#[cfg(test)]
541mod tests {
542    use super::*;
543
544    #[test]
545    fn test_remoting_command_type() {
546        // Test RemotingCommandType::value_of
547        assert_eq!(
548            Some(RemotingCommandType::REQUEST),
549            RemotingCommandType::value_of(0)
550        );
551        assert_eq!(
552            Some(RemotingCommandType::RESPONSE),
553            RemotingCommandType::value_of(1)
554        );
555        assert_eq!(None, RemotingCommandType::value_of(2));
556
557        // Test RemotingCommandType::get_code
558        assert_eq!(0, RemotingCommandType::REQUEST.get_code());
559        assert_eq!(1, RemotingCommandType::RESPONSE.get_code());
560
561        // Test RemotingCommandType::get_type_from_name
562        assert_eq!(
563            Some(RemotingCommandType::REQUEST),
564            RemotingCommandType::get_type_from_name("REQUEST")
565        );
566        assert_eq!(
567            Some(RemotingCommandType::RESPONSE),
568            RemotingCommandType::get_type_from_name("RESPONSE")
569        );
570        assert_eq!(None, RemotingCommandType::get_type_from_name("UNKNOWN"));
571    }
572
573    #[test]
574    fn test_language_code() {
575        // Test LanguageCode::value_of
576        assert_eq!(Some(LanguageCode::JAVA), LanguageCode::value_of(0));
577        assert_eq!(Some(LanguageCode::CPP), LanguageCode::value_of(1));
578        assert_eq!(Some(LanguageCode::DOTNET), LanguageCode::value_of(2));
579        // Add more value_of tests for other variants...
580
581        // Test LanguageCode::get_code
582        assert_eq!(0, LanguageCode::JAVA.get_code());
583        assert_eq!(1, LanguageCode::CPP.get_code());
584        assert_eq!(2, LanguageCode::DOTNET.get_code());
585
586        // Test LanguageCode::get_code_from_name
587        assert_eq!(
588            Some(LanguageCode::JAVA),
589            LanguageCode::get_code_from_name("JAVA")
590        );
591        assert_eq!(
592            Some(LanguageCode::CPP),
593            LanguageCode::get_code_from_name("CPP")
594        );
595        assert_eq!(
596            Some(LanguageCode::DOTNET),
597            LanguageCode::get_code_from_name("DOTNET")
598        );
599    }
600
601    #[test]
602    fn data_version_serialization_deserialization() {
603        let mut data_version = DataVersion::new();
604        data_version.set_state_version(10);
605        let serialized = serde_json::to_string(&data_version).unwrap();
606        let deserialized: DataVersion = serde_json::from_str(&serialized).unwrap();
607        assert_eq!(data_version.state_version, deserialized.state_version);
608        assert_eq!(data_version.timestamp, deserialized.timestamp);
609        assert_eq!(
610            data_version.counter.load(Ordering::SeqCst),
611            deserialized.counter.load(Ordering::SeqCst)
612        );
613    }
614
615    #[test]
616    fn data_version_counter_increment() {
617        let data_version = DataVersion::new();
618        let initial_counter = data_version.counter.load(Ordering::SeqCst);
619        data_version.increment_counter();
620        assert_eq!(
621            initial_counter + 1,
622            data_version.counter.load(Ordering::SeqCst)
623        );
624    }
625
626    #[test]
627    fn data_version_next_version() {
628        let mut data_version = DataVersion::new();
629        let initial_state_version = data_version.state_version;
630        let initial_timestamp = data_version.timestamp;
631        let initial_counter = data_version.counter.load(Ordering::SeqCst);
632        data_version.next_version();
633        assert_eq!(initial_state_version, data_version.state_version);
634        assert!(data_version.timestamp >= initial_timestamp);
635        assert_eq!(
636            initial_counter + 1,
637            data_version.counter.load(Ordering::SeqCst)
638        );
639    }
640
641    #[test]
642    fn data_version_next_version_with_state() {
643        let mut data_version = DataVersion::new();
644        let initial_timestamp = data_version.timestamp;
645        let initial_counter = data_version.counter.load(Ordering::SeqCst);
646        data_version.next_version_with(20);
647        assert_eq!(20, data_version.state_version);
648        assert!(data_version.timestamp >= initial_timestamp);
649        assert_eq!(
650            initial_counter + 1,
651            data_version.counter.load(Ordering::SeqCst)
652        );
653    }
654
655    #[test]
656    fn data_version_equality() {
657        let data_version1 = DataVersion::new();
658        let data_version2 = data_version1.clone();
659        assert_eq!(data_version1, data_version2);
660
661        data_version2.increment_counter();
662        assert_ne!(data_version1, data_version2);
663    }
664
665    #[test]
666    fn data_version_partial_ordering() {
667        let data_version1 = DataVersion::new();
668        let mut data_version2 = data_version1.clone();
669
670        assert_eq!(
671            data_version1.partial_cmp(&data_version2),
672            Some(std::cmp::Ordering::Equal)
673        );
674
675        data_version2.set_state_version(data_version1.get_state_version() + 1);
676        assert_eq!(
677            data_version1.partial_cmp(&data_version2),
678            Some(std::cmp::Ordering::Less)
679        );
680        assert_eq!(
681            data_version2.partial_cmp(&data_version1),
682            Some(std::cmp::Ordering::Greater)
683        );
684    }
685
686    #[test]
687    fn data_version_total_ordering() {
688        let data_version1 = DataVersion::new();
689        let mut data_version2 = data_version1.clone();
690
691        assert_eq!(data_version1.cmp(&data_version2), std::cmp::Ordering::Equal);
692
693        data_version2.set_state_version(data_version1.get_state_version() + 1);
694        assert_eq!(data_version1.cmp(&data_version2), std::cmp::Ordering::Less);
695        assert_eq!(
696            data_version2.cmp(&data_version1),
697            std::cmp::Ordering::Greater
698        );
699    }
700}