use std::io;
use std::io::Cursor;
use std::iter::Iterator;
use bytes::{Buf, BufMut, BytesMut, BigEndian};
use tokio_io::codec::{Encoder, Decoder};
use tokio_proto::streaming::pipeline::Frame;
use tokio_proto::streaming::{Body, Message};
use std::str;
use protocol::RequestMessage;
use response::Message as TypeMessage;
const HEADER_LENGTH: usize = 8;
const FRAME_TYPE_RESPONSE: i32 = 0x00;
const FRAME_TYPE_ERROR: i32 = 0x01;
const FRAME_TYPE_MESSAGE: i32 = 0x02;
const HEARTBEAT: &'static str = "_heartbeat_";
#[derive(Clone)]
pub struct ClientTypeMap<T> {
pub inner: T,
}
pub type NsqMessage = Message<RequestMessage, Body<RequestMessage, io::Error>>;
pub type NsqResponseMessage = Message<String, Body<TypeMessage, io::Error>>;
pub struct NsqCodec {
pub decoding_head: bool
}
impl Decoder for NsqCodec {
type Item = Frame<String, TypeMessage, io::Error>;
type Error = io::Error;
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, io::Error> {
let length = buf.len();
if length < HEADER_LENGTH {
return Ok(None);
}
let mut cursor = Cursor::new(buf.clone());
let size: i32 = cursor.get_i32::<BigEndian>();
if length < size as usize {
return Ok(None);
}
let frame_type: i32 = cursor.get_i32::<BigEndian>();
if frame_type == FRAME_TYPE_RESPONSE {
buf.split_to(HEADER_LENGTH + length);
match str::from_utf8(&cursor.bytes()) {
Ok(s) => {
let decoded_message = s.to_string();
if decoded_message == HEARTBEAT && !self.decoding_head {
Ok(Some(self.heartbeat_message()))
} else if decoded_message == HEARTBEAT {
Ok(Some(self.streaming_flag()))
} else {
Ok(Some(Frame::Message {
message: decoded_message,
body: false,
}))
}
}
Err(_) => Err(io::Error::new(io::ErrorKind::Other, "Invalid UTF-8")),
}
} else if frame_type == FRAME_TYPE_ERROR {
Err(io::Error::new(io::ErrorKind::InvalidData, "Invalid packet received"))
} else if frame_type == FRAME_TYPE_MESSAGE {
if self.decoding_head {
Ok(Some(self.streaming_flag()))
} else {
let timestamp = cursor.get_i64::<BigEndian>(); let _ = cursor.get_u16::<BigEndian>();
let data = str::from_utf8(&cursor.bytes()).unwrap().to_string();
let (id, body) = data.split_at(16);
let message = TypeMessage{
timestamp: timestamp,
message_id: id.to_string(),
message_body: body.to_string()
};
buf.split_to(HEADER_LENGTH + length);
Ok(Some(
Frame::Body {
chunk: Some(message),
}
))
}
} else {
Ok(None)
}
}
}
pub type CodecOutputFrame = Frame<RequestMessage, RequestMessage, io::Error>;
impl Encoder for NsqCodec {
type Item = CodecOutputFrame;
type Error = io::Error;
fn encode(&mut self, message: Self::Item, buf: &mut BytesMut) -> io::Result<()> {
match message {
Frame::Message { message, .. } => {
if let Some(version) = message.version {
buf.reserve(version.len());
buf.extend(version.as_bytes());
}
if let Some(header) = message.header {
buf.reserve(header.len() + 1);
buf.extend(header.as_bytes());
}
if let Some(body) = message.body {
let mut buf_32 = Vec::with_capacity(body.len());
let body_len = body.len() as u32;
buf_32.put_u32::<BigEndian>(body_len);
buf_32.put(&body[..]);
buf.extend(buf_32);
}
if let Some(body_messages) = message.body_messages {
let total_bytes = body_messages
.iter()
.map(|message| message.len())
.fold(0, |acc, len| acc + len);
let mut buf_32 = Vec::with_capacity(total_bytes);
let body_len = total_bytes as u32;
buf_32.put_u32::<BigEndian>(body_len);
let messages_len = body_messages.len() as u32;
buf_32.put_u32::<BigEndian>(messages_len);
for message in &body_messages {
let message_len = message.len() as u32;
buf_32.put_u32::<BigEndian>(message_len);
buf_32.put(&message[..]);
}
buf.extend(buf_32);
}
Ok(())
}
Frame::Error { error, .. } => Err(error),
Frame::Body { .. } => panic!("Streaming of Requests is not currently supported"),
}
}
}
impl NsqCodec {
fn heartbeat_message(&mut self) -> Frame<String, TypeMessage, io::Error>
{
let message = TypeMessage{
timestamp: 0,
message_id: HEARTBEAT.to_string(),
message_body: HEARTBEAT.to_string()
};
Frame::Body {
chunk: Some(message),
}
}
fn streaming_flag(&mut self) -> Frame<String, TypeMessage, io::Error>
{
self.decoding_head = false;
Frame::Message {
message: "".into(),
body: true,
}
}
}