1use std::io::{self, Cursor};
28use std::str;
29
30use bytes::{Buf, BufMut, BytesMut};
31use tokio_io::codec::{Encoder, Decoder};
32use log::error;
33
34use crate::error::Error;
35
36const HEADER_LENGTH: usize = 8;
38
39const FRAME_TYPE_RESPONSE: i32 = 0x00;
41const FRAME_TYPE_ERROR: i32 = 0x01;
42const FRAME_TYPE_MESSAGE: i32 = 0x02;
43
44const HEARTBEAT: &str = "_heartbeat_";
45
46#[derive(Debug, Clone, Eq, PartialEq)]
47pub enum Cmd {
48 Heartbeat,
50
51 Magic(&'static str),
53
54 Response(String),
56
57 ResponseError(String),
59
60 ResponseMsg(Vec<(i64, u16, String, Vec<u8>)>),
62
63 Command(String),
65
66 Msg(String, String),
68
69 MMsg(String, Vec<String>),
71}
72
73pub struct NsqCodec;
75
76pub fn decode_msg(buf: &mut BytesMut) -> Option<(i64, u16, String, Vec<u8>)> {
77 if buf.len() < 4 {
78 None
79 } else {
80 let frame = buf.clone();
81 let mut cursor = Cursor::new(frame);
82 let size = cursor.get_i32_be() as usize;
83 if buf.len() < size + 4 {
84 None
85 } else {
86 let _ = cursor.get_i32_be();
88 let timestamp = cursor.get_i64_be();
89 let attemps = cursor.get_u16_be();
90 let id_body_bytes = &cursor.bytes()[..size - HEADER_LENGTH - 6];
91 if id_body_bytes.len() < 16 {
92 return None;
93 }
94 let (id_bytes, body_bytes) = id_body_bytes.split_at(16);
95 let id = match str::from_utf8(id_bytes) {
96 Ok(s) => s,
97 Err(e) => {
98 error!("error deconding utf8 id: {}", e);
99 return None;
100 },
101 };
102 buf.split_to(size+4);
104 Some((timestamp, attemps, id.to_owned(), Vec::from(body_bytes)))
105 }
106 }
107}
108
109fn write_n(buf: &mut BytesMut) {
110 buf.put_u8(b'\n');
111}
112
113fn check_and_reserve(buf: &mut BytesMut, size: usize) {
114 let remaining_bytes = buf.remaining_mut();
115 if remaining_bytes < size {
116 buf.reserve(size);
117 }
118}
119
120fn write_cmd(buf: &mut BytesMut, cmd: String) {
122 let cmd_as_bytes = cmd.as_bytes();
123 let size = cmd_as_bytes.len() + 1;
124 check_and_reserve(buf, size);
125 buf.extend(cmd_as_bytes);
126 write_n(buf);
127}
128
129pub fn write_msg(buf: &mut BytesMut, msg: String) {
138 let msg_as_bytes = msg.as_bytes();
139 let msg_len = msg_as_bytes.len();
140 let size = 4 + msg_len;
141 check_and_reserve(buf, size);
142 buf.put_u32_be(msg_len as u32);
143 buf.extend(msg_as_bytes);
144}
145
146pub fn write_mmsg(buf: &mut BytesMut, cmd: String, msgs: Vec<String>) {
148 write_cmd(buf, cmd);
149 buf.put_u32_be(msgs.len() as u32);
150 for msg in msgs {
151 write_msg(buf, msg);
152 }
153}
154
155impl Decoder for NsqCodec {
156 type Item = Cmd;
157 type Error = Error;
158
159 fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
160 let length = buf.len();
161
162 if length < HEADER_LENGTH {
164 Ok(None)
165 } else {
166 let mut cursor = Cursor::new(buf.clone());
167 let _size = cursor.get_i32_be() as usize;
168 let frame_type: i32 = cursor.get_i32_be();
170
171 if frame_type == FRAME_TYPE_RESPONSE {
173 buf.take();
175 if let Ok(s) = str::from_utf8(&cursor.bytes()) {
176 if s == HEARTBEAT {
178 Ok(Some(Cmd::Heartbeat))
179 } else {
180 Ok(Some(Cmd::Response(s.to_owned())))
182 }
183 } else {
184 Err(Error::Internal("Invalid UTF-8 Data".to_owned()))
186 }
187 } else if frame_type == FRAME_TYPE_ERROR {
189 buf.take();
191 let s = String::from_utf8_lossy(cursor.bytes());
192 Ok(Some(Cmd::ResponseError(s.to_string())))
194 } else if frame_type == FRAME_TYPE_MESSAGE {
196 let mut resp_buf = buf.clone();
197 let mut msg_buf: Vec<(i64, u16, String, Vec<u8>)> = Vec::new();
198 let mut need_more = false;
199 loop {
200 if resp_buf.is_empty() { break };
201 if let Some((ts, at, id, bd)) = decode_msg(&mut resp_buf) {
202 msg_buf.push((ts, at, id.to_owned(), bd));
203 } else {
204 need_more = true;
205 break;
206 }
207 }
208 if need_more {
209 Ok(None)
210 } else {
211 buf.take();
212 Ok(Some(Cmd::ResponseMsg(msg_buf)))
213 }
214 } else {
215 Err(Error::Remote("invalid frame type".to_owned()))
216 }
217 }
218 }
219}
220
221
222impl Encoder for NsqCodec {
223 type Item = Cmd;
224 type Error = io::Error;
225
226 fn encode(&mut self, msg: Self::Item, buf: &mut BytesMut) -> Result<(), Self::Error> {
227 match msg {
228 Cmd::Magic(ver) => {
229 let bytes_ver = ver.as_bytes();
230 check_and_reserve(buf, bytes_ver.len());
231 buf.extend(bytes_ver);
232 Ok(())
233 }
234 Cmd::Command(cmd) => {
235 write_cmd(buf, cmd);
236 Ok(())
237 },
238 Cmd::Msg(cmd, msg) => {
239 write_cmd(buf, cmd);
240 write_msg(buf, msg);
241 Ok(())
242 },
243 Cmd::MMsg(cmd, msgs) => {
244 write_mmsg(buf, cmd, msgs);
245 Ok(())
246 },
247 _ => {
248 Err(io::Error::new(io::ErrorKind::Other, "Failed encoding data"))
249 },
250 }
251 }
252}