nsq_client/
codec.rs

1// MIT License
2// 
3// Copyright (c) 2019-2021 Alessandro Cresto Miseroglio <alex179ohm@gmail.com>
4// Copyright (c) 2019-2021 Tangram Technologies S.R.L. <https://tngrm.io>
5// 
6// Permission is hereby granted, free of charge, to any person obtaining a copy
7// of this software and associated documentation files (the "Software"), to deal
8// in the Software without restriction, including without limitation the rights
9// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10// copies of the Software, and to permit persons to whom the Software is
11// furnished to do so, subject to the following conditions:
12// 
13// The above copyright notice and this permission notice shall be included in all
14// copies or substantial portions of the Software.
15// 
16// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22// SOFTWARE.
23
24//! And implementation of the NSQ protocol,
25//! Source: https://github.com/alex179ohm/nsq-client-rs/blob/master/src/codec.rs
26
27use std::io::{self, Cursor};
28use std::str;
29
30use bytes::{Buf, BufMut, BytesMut};
31use tokio_io::codec::{Encoder, Decoder};
32use log::error;
33
34use crate::error::Error;
35
36// Header: Size(4-Byte) + FrameType(4-Byte)
37const HEADER_LENGTH: usize = 8;
38
39// Frame Types
40const FRAME_TYPE_RESPONSE: i32 = 0x00;
41const FRAME_TYPE_ERROR: i32 = 0x01;
42const FRAME_TYPE_MESSAGE: i32 = 0x02;
43
44const HEARTBEAT: &str = "_heartbeat_";
45
46#[derive(Debug, Clone, Eq, PartialEq)]
47pub enum Cmd {
48    /// nsqd heartbeat msg.
49    Heartbeat,
50
51    /// Magic "  V2"
52    Magic(&'static str),
53
54    /// Succefull response.
55    Response(String),
56
57    /// Error Response E_FIN_FAILED, E_REQ_FAILED, E_TOUCH_FAILED
58    ResponseError(String),
59
60    /// Message response.
61    ResponseMsg(Vec<(i64, u16, String, Vec<u8>)>),
62
63    /// A simple Command whitch not sends msg.
64    Command(String),
65
66    /// A simple message (pub or dpub).
67    Msg(String, String),
68
69    /// Multiple message (mpub)
70    MMsg(String, Vec<String>),
71}
72
73/// NSQ codec
74pub struct NsqCodec;
75
76pub fn decode_msg(buf: &mut BytesMut) -> Option<(i64, u16, String, Vec<u8>)> {
77    if buf.len() < 4 {
78        None
79    } else {
80        let frame = buf.clone();
81        let mut cursor = Cursor::new(frame);
82        let size = cursor.get_i32_be() as usize;
83        if buf.len() < size + 4 {
84            None
85        } else {
86            // skip frame_type
87            let _ = cursor.get_i32_be();
88            let timestamp = cursor.get_i64_be();
89            let attemps = cursor.get_u16_be();
90            let id_body_bytes = &cursor.bytes()[..size - HEADER_LENGTH - 6];
91            if id_body_bytes.len() < 16 {
92                return None;
93            }
94            let (id_bytes, body_bytes) = id_body_bytes.split_at(16);
95            let id = match str::from_utf8(id_bytes) {
96                Ok(s) => s,
97                Err(e) => {
98                    error!("error deconding utf8 id: {}", e);
99                    return None;
100                },
101            };
102            // clean the buffer at frame size
103            buf.split_to(size+4);
104            Some((timestamp, attemps, id.to_owned(), Vec::from(body_bytes)))
105        }
106    }
107}
108
109fn write_n(buf: &mut BytesMut) {
110    buf.put_u8(b'\n');
111}
112
113fn check_and_reserve(buf: &mut BytesMut, size: usize) {
114    let remaining_bytes = buf.remaining_mut();
115    if remaining_bytes < size {
116        buf.reserve(size);
117    }
118}
119
120/// write command in buffer and append 0x2 ("\n")
121fn write_cmd(buf: &mut BytesMut, cmd: String) {
122    let cmd_as_bytes = cmd.as_bytes();
123    let size = cmd_as_bytes.len() + 1;
124    check_and_reserve(buf, size);
125    buf.extend(cmd_as_bytes);
126    write_n(buf);
127}
128
129/// write command and msg in buffer.
130///
131/// packet format:
132/// <command>\n
133/// [ 4 byte size in bytes as BigEndian i64 ][ N-byte binary data ]
134///
135/// https://nsq.io/clients/tcp_protocol_spec.html.
136/// command could be PUB or DPUB or any command witch send a message.
137pub fn write_msg(buf: &mut BytesMut, msg: String) {
138    let msg_as_bytes = msg.as_bytes();
139    let msg_len = msg_as_bytes.len();
140    let size = 4 + msg_len;
141    check_and_reserve(buf, size);
142    buf.put_u32_be(msg_len as u32);
143    buf.extend(msg_as_bytes);
144}
145
146/// write multiple messages (aka msub command).
147pub fn write_mmsg(buf: &mut BytesMut, cmd: String, msgs: Vec<String>) {
148    write_cmd(buf, cmd);
149    buf.put_u32_be(msgs.len() as u32);
150    for msg in msgs {
151        write_msg(buf, msg);
152    }
153}
154
155impl Decoder for NsqCodec {
156    type Item = Cmd;
157    type Error = Error;
158
159    fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
160        let length = buf.len();
161
162        // if length is less than HEADER_LENGTH there is a problem
163        if length < HEADER_LENGTH {
164            Ok(None)
165        } else {
166            let mut cursor = Cursor::new(buf.clone());
167            let _size = cursor.get_i32_be() as usize;
168            // get frame type
169            let frame_type: i32 = cursor.get_i32_be();
170
171            // maybe we have a response type frame
172            if frame_type == FRAME_TYPE_RESPONSE {
173                // clean the buffer
174                buf.take();
175                if let Ok(s) = str::from_utf8(&cursor.bytes()) {
176                        // check for heartbeat
177                    if s == HEARTBEAT {
178                        Ok(Some(Cmd::Heartbeat))
179                    } else {
180                        // return response
181                        Ok(Some(Cmd::Response(s.to_owned())))
182                    }
183                } else {
184                    // error parsing bytes as utf8
185                    Err(Error::Internal("Invalid UTF-8 Data".to_owned()))
186                }
187            // maybe it is a error type
188            } else if frame_type == FRAME_TYPE_ERROR {
189                // clean buffer
190                buf.take();
191                let s = String::from_utf8_lossy(cursor.bytes());
192                    // it's a remote error (E_FIN_FAILED, E_REQ_FAILED, E_TOUCH_FAILED)
193                Ok(Some(Cmd::ResponseError(s.to_string())))
194            // it's a message
195            } else if frame_type == FRAME_TYPE_MESSAGE {
196                let mut resp_buf = buf.clone();
197                let mut msg_buf: Vec<(i64, u16, String, Vec<u8>)> = Vec::new();
198                let mut need_more = false;
199                loop {
200                    if resp_buf.is_empty() { break };
201                    if let Some((ts, at, id, bd)) = decode_msg(&mut resp_buf) {
202                        msg_buf.push((ts, at, id.to_owned(), bd));
203                    } else {
204                        need_more = true;
205                        break;
206                    }
207                }
208                if need_more {
209                    Ok(None)
210                } else {
211                    buf.take();
212                    Ok(Some(Cmd::ResponseMsg(msg_buf)))
213                }
214            } else {
215                Err(Error::Remote("invalid frame type".to_owned()))
216            }
217        }
218    }
219}
220
221
222impl Encoder for NsqCodec {
223    type Item = Cmd;
224    type Error = io::Error;
225
226    fn encode(&mut self, msg: Self::Item, buf: &mut BytesMut) -> Result<(), Self::Error> {
227        match msg {
228            Cmd::Magic(ver) => {
229                let bytes_ver = ver.as_bytes();
230                check_and_reserve(buf, bytes_ver.len());
231                buf.extend(bytes_ver);
232                Ok(())
233            }
234            Cmd::Command(cmd) => {
235                write_cmd(buf, cmd);
236                Ok(())
237            },
238            Cmd::Msg(cmd, msg) => {
239                write_cmd(buf, cmd);
240                write_msg(buf, msg);
241                Ok(())
242            },
243            Cmd::MMsg(cmd, msgs) => {
244                write_mmsg(buf, cmd, msgs);
245                Ok(())
246            },
247            _ => {
248                Err(io::Error::new(io::ErrorKind::Other, "Failed encoding data"))
249            },
250        }
251    }
252}