use log::*;
use std::time::Duration;
use std::slice::Iter;
use std::sync::atomic::{AtomicUsize, Ordering};
use chrono::prelude::*;
#[derive(Debug, PartialEq)]
pub enum Ownership {
Parser,
Session,
Destination,
Constant,
}
#[derive(Debug, Clone, PartialEq)]
pub enum MessageType {
Stomp,
Http,
}
#[derive(Debug, Clone, PartialEq)]
pub enum StompCommand {
Init,
Unknown,
Ack,
Send,
Nack,
Begin,
Abort,
Error,
Stomp,
Commit,
Connect,
Message,
Receipt,
Subscribe,
Connected,
Disconnect,
Unsubscribe,
Get,
Ok,
ServerError,
ServiceUnavailable,
ClientError,
NotFound,
Upgrade,
}
impl StompCommand {
pub fn as_string(&self) -> &str {
match &self {
StompCommand::Init => "",
StompCommand::Unknown => "",
StompCommand::Ack => "ACK",
StompCommand::Send => "SEND",
StompCommand::Nack => "NACK",
StompCommand::Begin => "BEGIN",
StompCommand::Abort => "ABORT",
StompCommand::Error => "ERROR",
StompCommand::Stomp => "STOMP",
StompCommand::Commit => "COMMIT",
StompCommand::Connect => "CONNECT",
StompCommand::Message => "MESSAGE",
StompCommand::Receipt => "RECEIPT",
StompCommand::Subscribe => "SUBSCRIBE",
StompCommand::Connected => "CONNECTED",
StompCommand::Disconnect => "DISCONNECT",
StompCommand::Unsubscribe => "UNSUBSCRIBE",
StompCommand::Get => "GET",
StompCommand::Ok => "HTTP/1.1 200 OK",
StompCommand::ServerError => "HTTP/1.1 500 Server Error",
StompCommand::ServiceUnavailable => "HTTP/1.1 503 Service Unavailable",
StompCommand::ClientError => "HTTP/1.1 400 Bad Request",
StompCommand::NotFound => "HTTP/1.1 404 Not Found",
StompCommand::Upgrade => "HTTP/1.1 101 Switching Protocols",
}
}
pub fn len(&self) -> usize {
self.as_string().len()
}
}
#[derive(Debug)]
pub struct StompMessage {
pub command: StompCommand,
pub message_type: MessageType,
pub id: usize,
pub owner: Ownership,
pub to: Option<String>,
pub timestamp: DateTime<Utc>,
pub expiry: Duration,
pub session_id: Option<usize>,
delivered: AtomicUsize,
headers: Box<Vec<Header>>,
hdr_len: usize,
body: Box<Vec<Box<Vec<u8>>>>,
body_len: usize,
}
impl Default for StompMessage {
fn default() -> Self {
StompMessage {
command: StompCommand::Unknown,
owner: Ownership::Session,
id: 0,
to: None,
timestamp: Utc::now(),
expiry: Duration::new(60, 0),
session_id: None,
delivered: AtomicUsize::new(0),
message_type: MessageType::Stomp,
headers: Box::new(Vec::with_capacity(10)),
hdr_len: 0,
body: Box::new(Vec::new()),
body_len: 0,
}
}
}
impl StompMessage {
pub fn new(owner: Ownership) -> StompMessage {
StompMessage { owner, ..Default::default() }
}
pub fn new_send(data: &[u8], id: usize) -> StompMessage {
let mut message = StompMessage {
command: StompCommand::Send,
owner: Ownership::Session,
id,
..Default::default()
};
message.add_body(data);
message
}
pub fn take(&mut self, owner: Ownership) -> StompMessage {
let mut message = StompMessage::new(owner);
message.command = std::mem::replace(&mut self.command, StompCommand::Unknown);
message.id = self.id;
self.id = 0;
if self.to.is_some() {
message.to.replace(self.to.take().unwrap());
} message.timestamp = self.timestamp;
self.timestamp = Utc::now();
message.expiry = self.expiry;
self.expiry = Duration::new(60, 0);
message.session_id = self.session_id;
message.delivered = AtomicUsize::new(self.delivered.load(Ordering::Relaxed));
self.delivered.store(0, Ordering::SeqCst);
message.message_type = std::mem::replace(&mut self.message_type, MessageType::Stomp);
message.headers = std::mem::replace(&mut self.headers, Box::new(Vec::with_capacity(10)));
message.hdr_len = self.hdr_len;
self.hdr_len = 0;
message.body = std::mem::replace(&mut self.body, Box::new(Vec::new()));
message.body_len = self.body_len;
self.body_len = 0;
message
}
pub fn clone(&self, owner: Ownership, id: usize) -> StompMessage {
let mut message = StompMessage::new(owner);
message.command = self.command.clone();
message.id = id;
message.to = self.to.clone();
message.timestamp = self.timestamp.clone();
message.expiry = self.expiry.clone();
message.session_id = self.session_id.clone();
message.delivered = AtomicUsize::new(self.delivered.load(Ordering::Relaxed));
message.message_type = self.message_type.clone();
message.headers = Box::new(Vec::with_capacity(self.headers.len()));
for hdr in self.headers.iter() {
message.headers.push(hdr.clone());
}
message.hdr_len = self.hdr_len;
message.body = Box::new(Vec::with_capacity(1));
message.body.push(self.combine());
message.body_len = self.body_len;
message
}
pub fn clone_to_message(&self, owner: Ownership, id: usize) -> StompMessage {
let mut message = self.clone(owner, id);
message.command = StompCommand::Message;
message.headers.retain(|hdr| {
match hdr.name.as_str() {
"receipt" | "ack" => false,
_ => true,
}
});
message.add_header_clone("message-id", &id.to_string());
message.recalculate_hdr_len();
message
}
pub fn add_header_clone(&mut self, name: &str, value: &str) {
let hdr = Header {
name: sanename::sanitize(name),
value: String::from(value.trim()),
};
self.hdr_len += hdr.len() + 1; self.headers.push(hdr);
}
pub fn add_header(&mut self, name: &'static str, value: &'static str) {
let hdr = Header {
name: String::from(name),
value: String::from(value),
};
self.hdr_len += hdr.len() + 1; self.headers.push(hdr);
}
pub fn push_header(&mut self, hdr: Header) {
self.hdr_len += hdr.len() + 1; self.headers.push(hdr);
}
pub fn get_header(&self, name: &str) -> Option<&String> {
for hdr in self.headers.iter() {
if name.eq(&hdr.name) {
return Some(&hdr.value);
}
}
None
}
pub fn get_header_case_insensitive(&self, name: &str) -> Option<&String> {
for hdr in self.headers.iter() {
if name.eq_ignore_ascii_case(&hdr.name) {
return Some(&hdr.value);
}
}
None
}
pub fn remove_header(&mut self, name: &str) {
self.headers.retain(|hdr| ! hdr.name.eq(name));
}
pub fn extract_header(&mut self, name: &str) -> Option<Header> {
if let Some(pos) = self.headers.iter().position(|hdr| hdr.name.eq(name)) {
let hdr = self.headers.remove(pos);
return Some(hdr);
}
None
}
pub fn count_headers(&self) -> usize {
self.headers.len()
}
pub fn delivered(&self) -> usize {
self.delivered.load(Ordering::SeqCst)
}
pub fn increment_delivered(&self, count: usize) -> usize {
self.delivered.fetch_add(count, Ordering::SeqCst)
}
pub fn header_len(&self) -> usize {
self.command.len() + 1 + self.hdr_len
}
fn recalculate_hdr_len(&mut self) {
let mut len: usize = 0;
for hdr in self.headers.iter() {
len += hdr.name.len();
len += 1;
len += hdr.value.len();
len += 1;
}
self.hdr_len = len;
}
pub fn headers(&self) -> Iter<Header> {
self.headers.iter()
}
pub fn body_len(&self) -> usize {
self.body_len
}
pub fn len(&self) -> usize {
self.header_len() + 1 + self.body_len() + 1
}
pub fn add_body(&mut self, chunk: &[u8]) {
self.body.push(Box::new(Vec::from(chunk)));
self.body_len += chunk.len();
}
pub fn combine(&self) -> Box<Vec<u8>> {
let mut body = Box::new(Vec::with_capacity(self.body_len));
for chunk in self.body.iter() {
body.extend_from_slice(chunk);
}
return body;
}
pub fn combine_chunks(&mut self) {
let body = self.combine();
self.body = Box::new(Vec::with_capacity(1));
self.body_len = body.len();
self.body.push(body);
}
pub fn hdrs_as_string(&self) -> String {
let mut hdrs: String = "".to_owned();
hdrs.push_str(self.command.as_string());
hdrs.push_str("\n");
for hdr in self.headers.iter() {
hdrs.push_str(hdr.name.as_str());
hdrs.push_str(":");
hdrs.push_str(hdr.value.as_str());
hdrs.push_str("\n");
}
hdrs
}
pub fn body_as_string(&self) -> String {
let mut body: String = "".to_owned();
for chunk in self.body.iter() {
body.push_str(&String::from_utf8_lossy(chunk));
}
return body;
}
pub fn body_as_bytes(&self) -> &Box<Vec<u8>> {
if self.body_len == 0 {
error!("fatal: attempt to serialize zero len message");
panic!("attempt to serialize zero len message");
}
if self.body.len() == 1 {
let chunk = self.body.iter().next().unwrap();
return chunk;
}
error!("fatal: attempt to serialize multi chunk message");
panic!("attempt to serialize multi chunk message");
}
pub fn as_string(&self) -> String {
let mut msg: String = "".to_owned();
msg.push_str(self.hdrs_as_string().as_str());
msg.push_str("\n");
msg.push_str(self.body_as_string().as_str());
match self.message_type {
MessageType::Stomp => {
msg.push_str("\0");
},
_ => {}
}
return msg;
}
}
#[derive(Debug, Clone)]
pub struct Header {
pub name: String,
pub value: String,
}
impl Header {
pub fn new(name: &str, value: &str) -> Header {
Header {
name: String::from(name),
value: String::from(value),
}
}
pub fn from_string(name: &str, value: String) -> Header {
Header {
name: String::from(name),
value,
}
}
pub fn len(&self) -> usize {
self.name.len() + 1 + self.value.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::message::stomp_message::MessageType::Http;
#[test]
fn test_init() {
let mut message = StompMessage::new(Ownership::Session);
message.command = StompCommand::Ack;
message.add_header("name1", "value1");
message.add_header("name2", "value2");
message.add_body(b"{\"resp1\": true}\n");
println!("StompMessage {:?}", message);
println!("serialize:: \n{}\n{}\0", message.hdrs_as_string(), message.body_as_string());
assert_eq!(message.body_len, message.body.iter().next().unwrap().len());
}
#[test]
fn test_chunks() {
let mut message = StompMessage::new(Ownership::Session);
message.command = StompCommand::Ack;
message.add_header("name1", "value1");
message.add_header("name2", "value2");
message.add_body(b"{\n");
message.add_body(b" {\"resp1\": true},\n");
message.add_body(b" {\"resp1\": true},\n");
message.add_body(b" {\"resp1\": true}\n");
message.add_body(b"}\n");
println!("StompMessage {:?}", message);
let mut len: usize = 0;
for vec in message.body.iter() {
len += vec.len();
}
assert_eq!(message.body_len(), len);
println!("serialize:: \n{}\n{}\0", message.hdrs_as_string(), message.body_as_string());
message.combine_chunks();
assert_eq!(message.body_len(), message.body.iter().next().unwrap().len());
println!("StompMessage {:?}", message);
println!("serialize:: \n{}\n{}\0", message.hdrs_as_string(), message.body_as_string());
}
#[test]
fn test_take() {
let mut message = StompMessage::new(Ownership::Session);
message.command = StompCommand::Ack;
message.add_header("name1", "value1");
message.add_header("name2", "value2");
message.add_body(b"{\"resp1\": true}\n");
assert_eq!(message.body_len(), 16);
let copy = message.take(Ownership::Session);
assert_eq!(message.body_len(), 0);
assert_eq!(copy.body_len(), 16);
println!("Take Test orig='{:?}'", message);
println!("Take Test copy='{:?}'", copy);
println!("Take Test serialize empty:: \n{}\n{}\0", message.hdrs_as_string(), message.body_as_string());
println!("Take Test serialize:: \n{}\n{}\0", copy.hdrs_as_string(), copy.body_as_string());
}
#[test]
fn test_clone_to_message() {
let mut message = StompMessage::new(Ownership::Session);
message.command = StompCommand::Send;
message.add_header("receipt", "booya");
message.add_header("ack", "client");
message.add_header("name1", "value1");
message.add_body(b"{\"resp1\": true}\n");
assert_eq!(message.body_len(), 16);
let copy = message.clone_to_message(Ownership::Destination, 23);
assert_eq!(message.body_len(), 16);
assert_eq!(copy.body_len(), 16);
assert_eq!(copy.count_headers(), 2);
assert_eq!(copy.header_len(), 35);
assert_eq!(copy.header_len(), copy.hdrs_as_string().len());
}
#[test]
pub fn test_partial_eq() {
let mut message = StompMessage::new(Ownership::Session);
message.command = StompCommand::Get;
message.message_type = Http;
assert_eq!(message.message_type, Http);
assert_eq!(message.command, StompCommand::Get);
}
#[test]
fn test_extract_hdr() {
let mut message = StompMessage::new(Ownership::Session);
message.command = StompCommand::Ack;
message.add_header("name1", "value1");
message.add_header("name2", "value2");
message.add_body(b"{\"resp1\": true}\n");
assert_eq!(message.count_headers(), 2);
assert!(message.extract_header("name2").is_some());
assert!(message.extract_header("name2").is_none());
assert_eq!(message.count_headers(), 1);
}
}