use std::io::{self, Cursor};
use std::str;
use bytes::{Buf, BufMut, BytesMut};
use tokio_io::codec::{Encoder, Decoder};
use log::error;
use crate::error::Error;
const HEADER_LENGTH: usize = 8;
const FRAME_TYPE_RESPONSE: i32 = 0x00;
const FRAME_TYPE_ERROR: i32 = 0x01;
const FRAME_TYPE_MESSAGE: i32 = 0x02;
const HEARTBEAT: &str = "_heartbeat_";
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum Cmd {
Heartbeat,
Magic(&'static str),
Response(String),
ResponseError(String),
ResponseMsg(Vec<(i64, u16, String, Vec<u8>)>),
Command(String),
Msg(String, String),
MMsg(String, Vec<String>),
}
pub struct NsqCodec;
pub fn decode_msg(buf: &mut BytesMut) -> Option<(i64, u16, String, Vec<u8>)> {
if buf.len() < 4 {
None
} else {
let frame = buf.clone();
let mut cursor = Cursor::new(frame);
let size = cursor.get_i32_be() as usize;
if buf.len() < size + 4 {
None
} else {
let _ = cursor.get_i32_be();
let timestamp = cursor.get_i64_be();
let attemps = cursor.get_u16_be();
let id_body_bytes = &cursor.bytes()[..size - HEADER_LENGTH - 6];
if id_body_bytes.len() < 16 {
return None;
}
let (id_bytes, body_bytes) = id_body_bytes.split_at(16);
let id = match str::from_utf8(id_bytes) {
Ok(s) => s,
Err(e) => {
error!("error deconding utf8 id: {}", e);
return None;
},
};
buf.split_to(size+4);
Some((timestamp, attemps, id.to_owned(), Vec::from(body_bytes)))
}
}
}
fn write_n(buf: &mut BytesMut) {
buf.put_u8(b'\n');
}
fn check_and_reserve(buf: &mut BytesMut, size: usize) {
let remaining_bytes = buf.remaining_mut();
if remaining_bytes < size {
buf.reserve(size);
}
}
fn write_cmd(buf: &mut BytesMut, cmd: String) {
let cmd_as_bytes = cmd.as_bytes();
let size = cmd_as_bytes.len() + 1;
check_and_reserve(buf, size);
buf.extend(cmd_as_bytes);
write_n(buf);
}
pub fn write_msg(buf: &mut BytesMut, msg: String) {
let msg_as_bytes = msg.as_bytes();
let msg_len = msg_as_bytes.len();
let size = 4 + msg_len;
check_and_reserve(buf, size);
buf.put_u32_be(msg_len as u32);
buf.extend(msg_as_bytes);
}
pub fn write_mmsg(buf: &mut BytesMut, cmd: String, msgs: Vec<String>) {
write_cmd(buf, cmd);
buf.put_u32_be(msgs.len() as u32);
for msg in msgs {
write_msg(buf, msg);
}
}
impl Decoder for NsqCodec {
type Item = Cmd;
type Error = Error;
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let length = buf.len();
if length < HEADER_LENGTH {
Ok(None)
} else {
let mut cursor = Cursor::new(buf.clone());
let _size = cursor.get_i32_be() as usize;
let frame_type: i32 = cursor.get_i32_be();
if frame_type == FRAME_TYPE_RESPONSE {
buf.take();
if let Ok(s) = str::from_utf8(&cursor.bytes()) {
if s == HEARTBEAT {
Ok(Some(Cmd::Heartbeat))
} else {
Ok(Some(Cmd::Response(s.to_owned())))
}
} else {
Err(Error::Internal("Invalid UTF-8 Data".to_owned()))
}
} else if frame_type == FRAME_TYPE_ERROR {
buf.take();
let s = String::from_utf8_lossy(cursor.bytes());
Ok(Some(Cmd::ResponseError(s.to_string())))
} else if frame_type == FRAME_TYPE_MESSAGE {
let mut resp_buf = buf.clone();
let mut msg_buf: Vec<(i64, u16, String, Vec<u8>)> = Vec::new();
let mut need_more = false;
loop {
if resp_buf.is_empty() { break };
if let Some((ts, at, id, bd)) = decode_msg(&mut resp_buf) {
msg_buf.push((ts, at, id.to_owned(), bd));
} else {
need_more = true;
break;
}
}
if need_more {
Ok(None)
} else {
buf.take();
Ok(Some(Cmd::ResponseMsg(msg_buf)))
}
} else {
Err(Error::Remote("invalid frame type".to_owned()))
}
}
}
}
impl Encoder for NsqCodec {
type Item = Cmd;
type Error = io::Error;
fn encode(&mut self, msg: Self::Item, buf: &mut BytesMut) -> Result<(), Self::Error> {
match msg {
Cmd::Magic(ver) => {
let bytes_ver = ver.as_bytes();
check_and_reserve(buf, bytes_ver.len());
buf.extend(bytes_ver);
Ok(())
}
Cmd::Command(cmd) => {
write_cmd(buf, cmd);
Ok(())
},
Cmd::Msg(cmd, msg) => {
write_cmd(buf, cmd);
write_msg(buf, msg);
Ok(())
},
Cmd::MMsg(cmd, msgs) => {
write_mmsg(buf, cmd, msgs);
Ok(())
},
_ => {
Err(io::Error::new(io::ErrorKind::Other, "Failed encoding data"))
},
}
}
}