rocketmq_client/
frame.rs

1//!
2//! Implement the classic length field based frame codec. Note specific frame are defined in the protocol module.
3//!
4use bytes::{self, Buf, BufMut, Bytes, BytesMut};
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::default::Default;
8use std::io::Cursor;
9use std::sync::atomic::{self, Ordering};
10
11use crate::error::{self, ClientError};
12
13#[derive(Serialize, Deserialize, Debug, PartialEq)]
14pub(crate) enum Language {
15    JAVA,
16    CPP,
17    RUST,
18}
19
20impl Default for Language {
21    fn default() -> Self {
22        Language::RUST
23    }
24}
25
26pub(crate) enum RequestCode {
27    GetRouteInfoByTopic = 105,
28    SendMessage = 10,
29}
30
31#[derive(Serialize, Deserialize, Debug, Default, PartialEq)]
32#[serde(rename_all = "camelCase")]
33pub struct Frame {
34    // Operation code
35    pub(crate) code: i32,
36
37    // Language of the SDK that generates this frame
38    pub(crate) language: Language,
39
40    // Version of the SDK that generates this frame
41    pub(crate) version: i32,
42
43    // frame identifier
44    pub(crate) opaque: i32,
45
46    // Bit-wise flag that overrides semantics of certain fields
47    pub(crate) flag: i32,
48
49    // Human readable remarks
50    #[serde(default, skip_serializing_if = "String::is_empty")]
51    pub(crate) remark: String,
52
53    #[serde(skip_serializing_if = "HashMap::is_empty", default = "HashMap::new")]
54    pub(crate) ext_fields: HashMap<String, String>,
55
56    #[serde(skip)]
57    pub(crate) body: bytes::Bytes,
58}
59
60#[derive(Debug)]
61pub(crate) enum Error {
62    // Not enough data is available to parse a message
63    Incomplete,
64
65    // Invalid message encoding
66    Other(error::ClientError),
67}
68
69#[derive(Debug, PartialEq)]
70pub(crate) enum Type {
71    Request,
72    Response,
73}
74
75impl Frame {
76    // Generate next opaque, aka, request identifier.
77    fn next_opaque() -> i32 {
78        static SEQUENCE: atomic::AtomicI32 = atomic::AtomicI32::new(0);
79        SEQUENCE.fetch_add(1, Ordering::Relaxed)
80    }
81
82    pub(crate) fn new() -> Self {
83        Frame {
84            opaque: Frame::next_opaque(),
85            ..Default::default()
86        }
87    }
88
89    pub(crate) fn check(src: &mut Cursor<&[u8]>) -> Result<(), Error> {
90        // frame-length = 4 + len(header) + len(body)
91        // frame-layout |header-length|---header-data---|---body---|
92        let frame_length = Frame::read_i32(src)? as usize;
93
94        if src.remaining() < frame_length {
95            return Err(Error::Incomplete);
96        }
97
98        src.advance(frame_length);
99
100        Ok(())
101    }
102
103    pub(crate) fn parse(src: &mut Cursor<&[u8]>) -> Result<Option<Self>, ClientError> {
104        let frame_length = Frame::read_i32(src).map_err(|_e| {
105            return ClientError::InvalidFrame("Invalid frame length".to_string());
106        })?;
107        let header_length = Frame::read_i32(src).map_err(|_e| {
108            return ClientError::InvalidFrame("Invalid frame header length".to_string());
109        })?;
110
111        let header = src.copy_to_bytes(header_length as usize);
112        let mut frame: Frame = serde_json::from_reader(header.reader()).map_err(|_e| {
113            return ClientError::InvalidFrame("Invalid frame header JSON".to_string());
114        })?;
115
116        let body_length = frame_length - 4 - header_length;
117        if body_length > 0 {
118            let body = src.copy_to_bytes(body_length as usize);
119            frame.body = body;
120        }
121        Ok(Some(frame))
122    }
123
124    fn read_i32(src: &mut Cursor<&[u8]>) -> Result<i32, Error> {
125        if src.remaining() < 4 {
126            return Err(Error::Incomplete);
127        }
128        Ok(src.get_i32())
129    }
130
131    pub(crate) fn encode(&self) -> Result<Option<Bytes>, ClientError> {
132        let header = serde_json::to_vec(self).map_err(|_e| {
133            return ClientError::InvalidFrame("Failed to JSON serialize frame header".to_string());
134        })?;
135        let len = 4 + header.len() + self.body.len();
136        let mut buf = BytesMut::with_capacity(len);
137        buf.put_i32(len as i32);
138        buf.put_i32(header.len() as i32);
139        buf.put_slice(&header);
140        buf.put_slice(&self.body);
141        Ok(Some(buf.into()))
142    }
143
144    pub(crate) fn put_ext_field(&mut self, key: &str, value: &str) {
145        self.ext_fields.insert(key.to_owned(), value.to_owned());
146    }
147
148    pub(crate) fn remark(&self) -> &str {
149        self.remark.as_str()
150    }
151
152    pub(crate) fn frame_type(&self) -> Type {
153        if self.flag & 1 == 1 {
154            return Type::Response;
155        }
156        Type::Request
157    }
158
159    pub(crate) fn mark_response_type(&mut self) {
160        self.flag |= 1;
161    }
162
163    pub(crate) fn add_ext_headers(&mut self, header: impl Into<HashMap<String, String>>) {
164        let map: HashMap<String, String> = header.into();
165        map.iter().for_each(|(k, v)| {
166            self.put_ext_field(k, v);
167        });
168    }
169
170    pub(crate) fn body(&self) -> bytes::Bytes {
171        self.body.clone()
172    }
173}
174
175mod tests {
176    use bytes::{Buf, BufMut, BytesMut};
177
178    use super::{Frame, Language, Type};
179
180    #[test]
181    fn test_new() {
182        let frame_0 = Frame::new();
183        let frame_1 = Frame::new();
184        assert_eq!(frame_0.opaque < frame_1.opaque, true);
185    }
186
187    #[test]
188    fn test_deserialization() -> Result<(), Box<dyn std::error::Error>> {
189        let json = r#"
190        {"code": 1, "language": "JAVA", "version": 0, "opaque": 0, "flag": 0}
191        "#;
192        let frame: Frame = serde_json::from_str(json)?;
193        assert_eq!(1, frame.code);
194        assert_eq!(Language::JAVA, frame.language);
195        Ok(())
196    }
197
198    #[test]
199    fn test_serialization() -> Result<(), Box<dyn std::error::Error>> {
200        let mut frame = Frame::new();
201        frame
202            .ext_fields
203            .insert("key".to_string(), "value".to_string());
204        let json = serde_json::to_string(&frame)?;
205        println!("json={}", json);
206        let frame2 = serde_json::from_str(&json)?;
207        assert_eq!(frame, frame2);
208        Ok(())
209    }
210
211    #[test]
212    fn test_deserialize_bytes() -> Result<(), Box<dyn std::error::Error>> {
213        let mut buf = BytesMut::with_capacity(1024);
214        let data = r#"{"code": 1, "language": "JAVA", "version": 0, "opaque": 0, "flag": 0}"#;
215        buf.put(data.as_bytes());
216        let frame: Frame = serde_json::from_reader(buf.reader())?;
217        assert_eq!(frame.language, Language::JAVA);
218        assert_eq!(frame.code, 1);
219        assert_eq!(frame.opaque, 0);
220        assert_eq!(frame.version, 0);
221        assert_eq!(frame.flag, 0);
222        assert_eq!(frame.ext_fields.is_empty(), true);
223        Ok(())
224    }
225
226    #[test]
227    fn test_type() {
228        let mut frame = Frame::new();
229        assert_eq!(frame.frame_type(), Type::Request);
230
231        frame.mark_response_type();
232        assert_eq!(frame.frame_type(), Type::Response);
233    }
234
235    #[test]
236    fn test_add_ext_headers() {
237        let header = crate::protocol::GetRouteInfoRequestHeader::new("Test");
238        let mut frame = Frame::new();
239        frame.add_ext_headers(header);
240        assert_eq!(frame.ext_fields.len(), 1);
241    }
242}