1use bytes::{self, Buf, BufMut, Bytes, BytesMut};
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::default::Default;
8use std::io::Cursor;
9use std::sync::atomic::{self, Ordering};
10
11use crate::error::{self, ClientError};
12
13#[derive(Serialize, Deserialize, Debug, PartialEq)]
14pub(crate) enum Language {
15 JAVA,
16 CPP,
17 RUST,
18}
19
20impl Default for Language {
21 fn default() -> Self {
22 Language::RUST
23 }
24}
25
26pub(crate) enum RequestCode {
27 GetRouteInfoByTopic = 105,
28 SendMessage = 10,
29}
30
31#[derive(Serialize, Deserialize, Debug, Default, PartialEq)]
32#[serde(rename_all = "camelCase")]
33pub struct Frame {
34 pub(crate) code: i32,
36
37 pub(crate) language: Language,
39
40 pub(crate) version: i32,
42
43 pub(crate) opaque: i32,
45
46 pub(crate) flag: i32,
48
49 #[serde(default, skip_serializing_if = "String::is_empty")]
51 pub(crate) remark: String,
52
53 #[serde(skip_serializing_if = "HashMap::is_empty", default = "HashMap::new")]
54 pub(crate) ext_fields: HashMap<String, String>,
55
56 #[serde(skip)]
57 pub(crate) body: bytes::Bytes,
58}
59
60#[derive(Debug)]
61pub(crate) enum Error {
62 Incomplete,
64
65 Other(error::ClientError),
67}
68
69#[derive(Debug, PartialEq)]
70pub(crate) enum Type {
71 Request,
72 Response,
73}
74
75impl Frame {
76 fn next_opaque() -> i32 {
78 static SEQUENCE: atomic::AtomicI32 = atomic::AtomicI32::new(0);
79 SEQUENCE.fetch_add(1, Ordering::Relaxed)
80 }
81
82 pub(crate) fn new() -> Self {
83 Frame {
84 opaque: Frame::next_opaque(),
85 ..Default::default()
86 }
87 }
88
89 pub(crate) fn check(src: &mut Cursor<&[u8]>) -> Result<(), Error> {
90 let frame_length = Frame::read_i32(src)? as usize;
93
94 if src.remaining() < frame_length {
95 return Err(Error::Incomplete);
96 }
97
98 src.advance(frame_length);
99
100 Ok(())
101 }
102
103 pub(crate) fn parse(src: &mut Cursor<&[u8]>) -> Result<Option<Self>, ClientError> {
104 let frame_length = Frame::read_i32(src).map_err(|_e| {
105 return ClientError::InvalidFrame("Invalid frame length".to_string());
106 })?;
107 let header_length = Frame::read_i32(src).map_err(|_e| {
108 return ClientError::InvalidFrame("Invalid frame header length".to_string());
109 })?;
110
111 let header = src.copy_to_bytes(header_length as usize);
112 let mut frame: Frame = serde_json::from_reader(header.reader()).map_err(|_e| {
113 return ClientError::InvalidFrame("Invalid frame header JSON".to_string());
114 })?;
115
116 let body_length = frame_length - 4 - header_length;
117 if body_length > 0 {
118 let body = src.copy_to_bytes(body_length as usize);
119 frame.body = body;
120 }
121 Ok(Some(frame))
122 }
123
124 fn read_i32(src: &mut Cursor<&[u8]>) -> Result<i32, Error> {
125 if src.remaining() < 4 {
126 return Err(Error::Incomplete);
127 }
128 Ok(src.get_i32())
129 }
130
131 pub(crate) fn encode(&self) -> Result<Option<Bytes>, ClientError> {
132 let header = serde_json::to_vec(self).map_err(|_e| {
133 return ClientError::InvalidFrame("Failed to JSON serialize frame header".to_string());
134 })?;
135 let len = 4 + header.len() + self.body.len();
136 let mut buf = BytesMut::with_capacity(len);
137 buf.put_i32(len as i32);
138 buf.put_i32(header.len() as i32);
139 buf.put_slice(&header);
140 buf.put_slice(&self.body);
141 Ok(Some(buf.into()))
142 }
143
144 pub(crate) fn put_ext_field(&mut self, key: &str, value: &str) {
145 self.ext_fields.insert(key.to_owned(), value.to_owned());
146 }
147
148 pub(crate) fn remark(&self) -> &str {
149 self.remark.as_str()
150 }
151
152 pub(crate) fn frame_type(&self) -> Type {
153 if self.flag & 1 == 1 {
154 return Type::Response;
155 }
156 Type::Request
157 }
158
159 pub(crate) fn mark_response_type(&mut self) {
160 self.flag |= 1;
161 }
162
163 pub(crate) fn add_ext_headers(&mut self, header: impl Into<HashMap<String, String>>) {
164 let map: HashMap<String, String> = header.into();
165 map.iter().for_each(|(k, v)| {
166 self.put_ext_field(k, v);
167 });
168 }
169
170 pub(crate) fn body(&self) -> bytes::Bytes {
171 self.body.clone()
172 }
173}
174
175mod tests {
176 use bytes::{Buf, BufMut, BytesMut};
177
178 use super::{Frame, Language, Type};
179
180 #[test]
181 fn test_new() {
182 let frame_0 = Frame::new();
183 let frame_1 = Frame::new();
184 assert_eq!(frame_0.opaque < frame_1.opaque, true);
185 }
186
187 #[test]
188 fn test_deserialization() -> Result<(), Box<dyn std::error::Error>> {
189 let json = r#"
190 {"code": 1, "language": "JAVA", "version": 0, "opaque": 0, "flag": 0}
191 "#;
192 let frame: Frame = serde_json::from_str(json)?;
193 assert_eq!(1, frame.code);
194 assert_eq!(Language::JAVA, frame.language);
195 Ok(())
196 }
197
198 #[test]
199 fn test_serialization() -> Result<(), Box<dyn std::error::Error>> {
200 let mut frame = Frame::new();
201 frame
202 .ext_fields
203 .insert("key".to_string(), "value".to_string());
204 let json = serde_json::to_string(&frame)?;
205 println!("json={}", json);
206 let frame2 = serde_json::from_str(&json)?;
207 assert_eq!(frame, frame2);
208 Ok(())
209 }
210
211 #[test]
212 fn test_deserialize_bytes() -> Result<(), Box<dyn std::error::Error>> {
213 let mut buf = BytesMut::with_capacity(1024);
214 let data = r#"{"code": 1, "language": "JAVA", "version": 0, "opaque": 0, "flag": 0}"#;
215 buf.put(data.as_bytes());
216 let frame: Frame = serde_json::from_reader(buf.reader())?;
217 assert_eq!(frame.language, Language::JAVA);
218 assert_eq!(frame.code, 1);
219 assert_eq!(frame.opaque, 0);
220 assert_eq!(frame.version, 0);
221 assert_eq!(frame.flag, 0);
222 assert_eq!(frame.ext_fields.is_empty(), true);
223 Ok(())
224 }
225
226 #[test]
227 fn test_type() {
228 let mut frame = Frame::new();
229 assert_eq!(frame.frame_type(), Type::Request);
230
231 frame.mark_response_type();
232 assert_eq!(frame.frame_type(), Type::Response);
233 }
234
235 #[test]
236 fn test_add_ext_headers() {
237 let header = crate::protocol::GetRouteInfoRequestHeader::new("Test");
238 let mut frame = Frame::new();
239 frame.add_ext_headers(header);
240 assert_eq!(frame.ext_fields.len(), 1);
241 }
242}