use log::*;
use crate::message::stomp_message::{StompMessage, Header, MessageType};
use crate::message::stomp_message::StompCommand;
use crate::web_socket::ws_response::{FrameType, ws_write_frame_hdr};
use crate::session::mq::SessionMessage;
enum State {
WritingHeaders,
WritingBody,
WritingTerminator,
}
pub struct MessageSerializer {
message: Option<SessionMessage>,
state: State,
msg_pos: usize,
web_socket: bool,
}
impl MessageSerializer {
pub fn new() -> MessageSerializer {
MessageSerializer {
message: None,
state: State::WritingHeaders,
msg_pos: 0,
web_socket: false,
}
}
pub fn is_serializing(&self) -> bool {
self.message.is_some()
}
pub fn set_message(&mut self, message: SessionMessage) {
if self.is_serializing() {
panic!("BUG set_message() should not be called until the previous message is finished")
}
self.state = State::WritingHeaders;
self.msg_pos = 0;
self.message = Some(message);
}
pub fn ws_upgrade(&mut self) {
self.web_socket = true;
}
pub fn write_chunk(&mut self, out: &mut Box<[u8]>) -> Result<usize, ()> {
let message = &self.message.as_ref();
let message = message.unwrap();
match &self.state {
State::WritingHeaders => {
let mut ws_len: usize = 0;
if self.web_socket {
ws_len = 10;
}
let hdr_len = ws_len + message.message.header_len() + message.session_hdr_len() + 1;
if out.len() < hdr_len {
warn!("outgoing buffer flup");
drop(message);
self.message = None;
return Err(());
}
let mut pos = 0;
if self.web_socket && MessageType::Http != message.message.message_type {
let frame_len = message.message.header_len() + message.session_hdr_len() + 1 + message.message.body_len() + 1;
pos = ws_write_frame_hdr(out, frame_len, FrameType::FinText);
}
pos = self.write_common_hdrs(out, pos, &message.message);
pos = self.write_session_hdrs(out, pos, &message.headers);
out[pos] = b'\n';
pos += 1;
if message.message.body_len() == 0 {
if MessageType::Http == message.message.message_type {
} else {
out[pos] = b'\0';
pos += 1;
}
self.message.take();
return Ok(pos);
} else {
self.state = State::WritingBody;
return Ok(pos);
}
},
State::WritingBody => {
let bytes = message.message.body_as_bytes();
let pos = self.msg_pos;
let mut remaining = bytes.len() - pos;
if remaining > out.len() {
let len = out.len();
out[0..len].copy_from_slice(&bytes[pos..pos + len]);
self.msg_pos += len;
return Ok(len);
}
else if remaining == out.len() {
out[0..remaining].copy_from_slice(&bytes[pos..pos + remaining]);
self.msg_pos += remaining;
if StompCommand::Get == message.message.command {
self.message.take();
return Ok(remaining);
} else {
self.state = State::WritingTerminator;
return Ok(remaining);
}
} else {
out[0..remaining].copy_from_slice(&bytes[pos..pos + remaining]);
self.msg_pos += remaining;
if StompCommand::Get == message.message.command {
} else {
out[remaining] = b'\0';
remaining += 1;
}
self.message.take();
return Ok(remaining);
}
},
State::WritingTerminator => {
out[0] = b'\0';
self.message.take();
return Ok(1);
},
}
}
fn write_common_hdrs(&self, out: &mut Box<[u8]>, pos: usize, message: &StompMessage) -> usize {
let mut pos = pos;
let cmd = message.command.as_string().as_bytes();
out[pos..pos + cmd.len() ].copy_from_slice(cmd);
pos += cmd.len();
out[pos] = b'\n';
pos += 1;
for hdr in message.headers() {
pos = self.write_hdr(out, pos, hdr)
}
pos
}
fn write_session_hdrs(&self, out: &mut Box<[u8]>, pos: usize, headers: &Vec<Header>) -> usize {
let mut pos = pos;
for hdr in headers.iter() {
pos = self.write_hdr(out, pos, hdr)
}
pos
}
fn write_hdr(&self, out: &mut Box<[u8]>, pos: usize, hdr: &Header) -> usize {
let mut pos = pos;
let name_len = hdr.name.len();
out[pos..pos + name_len].copy_from_slice(&hdr.name.as_bytes());
pos += name_len;
out[pos] = b':';
pos += 1;
let value_len = hdr.value.len();
out[pos..pos + value_len].copy_from_slice(&hdr.value.as_bytes());
pos += value_len;
out[pos] = b'\n';
pos += 1;
pos
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
#[test]
fn test_happy() {
let mut message = StompMessage::new_send(b"some data", 0);
message.add_header("some", "header");
let message = Arc::new(message);
let mut serializer = MessageSerializer::new();
serializer.set_message(SessionMessage{message, headers: vec!()});
let mut out: Box<[u8]> = Box::new([0; 2048]);
match serializer.write_chunk(&mut out) {
Ok(sz) => {
println!("chunk write '{}'", String::from_utf8_lossy(&out[0..sz]));
assert_eq!(sz, 18);
},
Err(_sz) => {
panic!("should still have body to write")
}
}
match serializer.write_chunk(&mut out) {
Err(_sz) => {
panic!("we should finish")
},
Ok(sz) => {
println!("chunk write '{}'", String::from_utf8_lossy(&out[0..sz]));
assert_eq!(sz, 10);
assert_eq!('\0', out[9] as char);
}
}
}
#[test]
fn test_happy_multi_chunk_body() {
let mut message = StompMessage::new_send(b"01234567012345670123", 0);
message.add_header("some", "header");
let message = Arc::new(message);
let mut serializer = MessageSerializer::new();
serializer.set_message(SessionMessage{message, headers: vec!()});
let mut out: Box<[u8]> = Box::new([0; 2048]);
match serializer.write_chunk(&mut out) {
Ok(sz) => {
println!("chunk write {} '{}'", sz, String::from_utf8_lossy(&out[0..sz]));
assert_eq!(sz, 18);
},
Err(_sz) => {
panic!("should still have body to write")
}
}
let mut out: Box<[u8]> = Box::new([0; 8]);
match serializer.write_chunk(&mut out) {
Ok(sz) => {
println!("chunk write '{}'", String::from_utf8_lossy(&out[0..sz]));
assert_eq!(sz, 8);
assert_eq!('7', out[7] as char);
},
Err(_sz) => {
panic!("we should not finish")
}
}
match serializer.write_chunk(&mut out) {
Ok(sz) => {
println!("chunk write '{}'", String::from_utf8_lossy(&out[0..sz]));
assert_eq!(sz, 8);
assert_eq!('7', out[7] as char);
},
Err(_sz) => {
panic!("we should not finish")
}
}
match serializer.write_chunk(&mut out) {
Err(_sz) => {
panic!("we should finish")
},
Ok(sz) => {
println!("chunk write");
assert_eq!(sz, 5);
assert_eq!('3', out[3] as char);
assert_eq!('\0', out[4] as char);
}
}
}
#[test]
fn test_happy_trailing_zero_as_chunk() {
let mut message = StompMessage::new_send(b"some data", 0);
message.add_header("some", "header");
let message = Arc::new(message);
let mut serializer = MessageSerializer::new();
serializer.set_message(SessionMessage{message, headers: vec!()});
let mut out: Box<[u8]> = Box::new([0; 2048]);
match serializer.write_chunk(&mut out) {
Ok(sz) => {
println!("chunk write '{}'", String::from_utf8_lossy(&out[0..sz]));
assert_eq!(sz, 18);
},
Err(_sz) => {
panic!("should still have body to write")
}
}
let mut out: Box<[u8]> = Box::new([0; 9]);
match serializer.write_chunk(&mut out) {
Ok(sz) => {
println!("chunk write '{}'", String::from_utf8_lossy(&out[0..sz]));
assert_eq!(sz, 9);
assert_eq!('a', out[8] as char);
},
Err(_sz) => {
panic!("we should not finish")
}
}
match serializer.write_chunk(&mut out) {
Err(_sz) => {
panic!("we should finish")
},
Ok(sz) => {
println!("chunk write");
assert_eq!(sz, 1);
assert_eq!('\0', out[0] as char);
}
}
}
#[test]
fn test_happy_no_body() {
let mut message = StompMessage::new_send(b"", 0);
message.add_header("some", "header");
let message = Arc::new(message);
let mut serializer = MessageSerializer::new();
serializer.set_message(SessionMessage{message, headers: vec!()});
let mut out: Box<[u8]> = Box::new([0; 2048]);
match serializer.write_chunk(&mut out) {
Ok(sz) => {
println!("chunk write '{}'", String::from_utf8_lossy(&out[0..sz]));
assert_eq!(sz, 19);
assert_eq!("SEND\nsome:header\n\n", String::from_utf8_lossy(&out[0..sz - 1]));
},
Err(_sz) => {
panic!("returns Ok() even with no body")
}
}
}
#[test]
fn test_happy_no_body_ws() {
let mut message = StompMessage::new_send(b"", 0);
message.add_header("some", "header");
let message = Arc::new(message);
let mut serializer = MessageSerializer::new();
serializer.set_message(SessionMessage{message, headers: vec!()});
serializer.web_socket = true;
let mut out: Box<[u8]> = Box::new([0; 2048]);
match serializer.write_chunk(&mut out) {
Ok(sz) => {
println!("chunk write '{}'", String::from_utf8_lossy(&out[0..sz]));
assert_eq!(sz, 21);
assert_eq!("SEND\nsome:header\n\n", String::from_utf8_lossy(&out[2..sz - 1]));
},
Err(_sz) => {
panic!("returns Ok() even with no body")
}
}
}
#[test]
fn test_happy_no_body_ws_extra_hdr() {
let mut message = StompMessage::new_send(b"", 0);
message.add_header("some", "header");
let message = Arc::new(message);
let mut serializer = MessageSerializer::new();
serializer.set_message(SessionMessage{message, headers: vec!(Header::new("a", "b"))});
serializer.web_socket = true;
let mut out: Box<[u8]> = Box::new([0; 2048]);
match serializer.write_chunk(&mut out) {
Ok(sz) => {
println!("chunk write '{}'", String::from_utf8_lossy(&out[0..sz]));
assert_eq!(sz, 25);
assert_eq!("SEND\nsome:header\na:b\n\n", String::from_utf8_lossy(&out[2..sz - 1]));
},
Err(_sz) => {
panic!("returns Ok() even with no body")
}
}
}
}