use crate::abstracts::{RawMessage, ZMQCodec};
use crate::binpackers::vec_to_uuid;
use crate::markers;
use crate::pubsub::PubSubMessage;
use bytes::BytesMut;
use chrono;
use failure::Fallible;
use rmp_serde;
use serde_json;
use std::convert::From;
use std::convert::TryFrom;
use uuid::Uuid;
use zmq;
#[derive(Debug, Default, Clone)]
pub struct DataMessage {
pub msgid: uuid::Uuid,
pub data: serde_json::Value,
pub extra_parts: Vec<BytesMut>,
}
impl DataMessage {
pub fn new() -> Fallible<DataMessage> {
let mut ret = DataMessage {
msgid: Uuid::new_v4(),
..Default::default()
};
ret.data["systemtime"] = serde_json::to_value(
chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
)?;
Ok(ret)
}
}
impl markers::ZMQMessageMarker for DataMessage {}
impl markers::DataMessageMarker for DataMessage {}
impl ZMQCodec for DataMessage {
fn zmq_decode(from: Vec<zmq::Message>) -> Fallible<Box<Self>> {
if from.len() < 2 {
return Err(failure::err_msg("Too few parts"));
}
let msgid = vec_to_uuid(from[0].to_vec())?;
let data: serde_json::Value = rmp_serde::from_slice(&from[1][..])?;
let mut extra_parts: Vec<BytesMut> = Vec::with_capacity(from.len() - 2);
for (idx, zmqpart) in from.iter().enumerate() {
if idx < 2 {
continue;
}
extra_parts.push(BytesMut::from(zmqpart as &[u8]));
}
let msgbox = Box::new(DataMessage {
msgid,
data,
extra_parts,
});
Ok(msgbox)
}
fn zmq_encode(&self) -> Fallible<Vec<zmq::Message>> {
let mut ret: Vec<zmq::Message> = Vec::with_capacity(2 + self.extra_parts.len());
ret.push(zmq::Message::from(&self.msgid.as_bytes()[..]));
ret.push(zmq::Message::from(rmp_serde::to_vec(&self.data)?));
for extrapart in self.extra_parts.iter() {
ret.push(zmq::Message::from(&extrapart[..]))
}
Ok(ret)
}
}
crate::naive_tryfrom!(DataMessage, [RawMessage]);
crate::naive_tryfrom!(RawMessage, [DataMessage]);
impl From<PubSubDataMessage> for DataMessage {
fn from(dmsg: PubSubDataMessage) -> Self {
let dmsg = DataMessage {
msgid: dmsg.msgid,
data: dmsg.data,
extra_parts: dmsg.extra_parts,
};
dmsg
}
}
impl From<&PubSubDataMessage> for DataMessage {
fn from(dmsg: &PubSubDataMessage) -> Self {
let dmsg = DataMessage {
msgid: dmsg.msgid.clone(),
data: dmsg.data.clone(),
extra_parts: dmsg.extra_parts.clone(),
};
dmsg
}
}
#[derive(Debug, Default, Clone)]
pub struct PubSubDataMessage {
pub topic: String,
pub msgid: uuid::Uuid,
pub data: serde_json::Value,
pub extra_parts: Vec<BytesMut>,
}
impl PubSubDataMessage {
pub fn new(topic: String) -> Fallible<PubSubDataMessage> {
let mut ret = PubSubDataMessage {
topic: topic,
msgid: Uuid::new_v4(),
..Default::default()
};
ret.data["systemtime"] = serde_json::to_value(
chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
)?;
Ok(ret)
}
}
impl markers::ZMQMessageMarker for PubSubDataMessage {}
impl markers::DataMessageMarker for PubSubDataMessage {}
impl markers::PubSubDataMessageMarker for PubSubDataMessage {}
impl markers::PubSubMessageMarker for PubSubDataMessage {}
impl ZMQCodec for PubSubDataMessage {
fn zmq_decode(from: Vec<zmq::Message>) -> Fallible<Box<Self>> {
if from.len() < 3 {
return Err(failure::err_msg("Too few parts"));
}
let topic = from[0]
.as_str()
.ok_or(failure::err_msg("Topic part is not string"))?
.to_string();
let msgid = vec_to_uuid(from[1].to_vec())?;
let data: serde_json::Value = rmp_serde::from_slice(&from[2][..])?;
let mut extra_parts: Vec<BytesMut> = Vec::with_capacity(from.len() - 3);
for (idx, zmqpart) in from.iter().enumerate() {
if idx < 3 {
continue;
}
extra_parts.push(BytesMut::from(zmqpart as &[u8]));
}
let msgbox = Box::new(PubSubDataMessage {
topic,
msgid,
data,
extra_parts,
});
Ok(msgbox)
}
fn zmq_encode(&self) -> Fallible<Vec<zmq::Message>> {
let mut ret: Vec<zmq::Message> = Vec::with_capacity(3 + self.extra_parts.len());
ret.push(zmq::Message::from(&self.topic.as_bytes()));
ret.push(zmq::Message::from(&self.msgid.as_bytes()[..]));
ret.push(zmq::Message::from(rmp_serde::to_vec(&self.data)?));
for extrapart in self.extra_parts.iter() {
ret.push(zmq::Message::from(&extrapart[..]))
}
Ok(ret)
}
}
crate::naive_tryfrom!(PubSubDataMessage, [PubSubMessage, RawMessage]);
crate::naive_tryfrom!(PubSubMessage, [PubSubDataMessage]);
crate::naive_tryfrom!(RawMessage, [PubSubDataMessage]);
impl From<DataMessage> for PubSubDataMessage {
fn from(dmsg: DataMessage) -> Self {
let psmsg = PubSubDataMessage {
topic: "UNDEFINED".to_string(),
msgid: dmsg.msgid,
data: dmsg.data,
extra_parts: dmsg.extra_parts,
};
psmsg
}
}
impl From<&DataMessage> for PubSubDataMessage {
fn from(dmsg: &DataMessage) -> Self {
let psmsg = PubSubDataMessage {
topic: "UNDEFINED".to_string(),
msgid: dmsg.msgid.clone(),
data: dmsg.data.clone(),
extra_parts: dmsg.extra_parts.clone(),
};
psmsg
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::pubsub::PubSubMessage;
use serde_json::json;
#[test]
fn test_encode_decode_roundtrip() {
let mut msg = DataMessage::new().unwrap();
log::debug!("msg is {:?}", msg);
msg.data["testkey"] = serde_json::to_value(String::from("ÄäkkösTesti")).unwrap();
msg.extra_parts.push(BytesMut::from("extra part 1"));
msg.extra_parts.push(BytesMut::from("extra part 2"));
let msgparts = msg.zmq_encode().unwrap();
assert_eq!(msgparts.len(), 4);
let decoded_uuid = vec_to_uuid(msgparts[0].to_vec()).unwrap();
assert_eq!(msg.msgid, decoded_uuid);
let msg2 = *DataMessage::zmq_decode(msgparts).unwrap();
assert_eq!(msg.msgid, msg2.msgid);
assert_eq!(msg.data, msg2.data);
assert_eq!(msg2.extra_parts.len(), 2);
assert_eq!(msg2.extra_parts[0], "extra part 1");
assert_eq!(msg2.extra_parts[1], "extra part 2");
}
#[test]
fn test_pubsub_encode_decode_roundtrip() {
let mut msg = PubSubDataMessage::new(String::from("testtopic")).unwrap();
log::debug!("msg is {:?}", msg);
msg.data["testkey"] = serde_json::to_value(String::from("ÄäkkösTesti")).unwrap();
let msg = msg;
let msgparts = msg.zmq_encode().unwrap();
assert_eq!(msgparts[0].as_str().unwrap(), String::from("testtopic"));
let decoded_uuid = vec_to_uuid(msgparts[1].to_vec()).unwrap();
assert_eq!(msg.msgid, decoded_uuid);
let msg2 = *PubSubDataMessage::zmq_decode(msgparts).unwrap();
assert_eq!(msg.msgid, msg2.msgid);
assert_eq!(msg.topic, msg2.topic);
assert_eq!(msg.data, msg2.data);
}
#[test]
fn test_pubsub_extraparts_roundtrip() {
let mut msgparts: Vec<zmq::Message> = Vec::with_capacity(5);
msgparts.push(zmq::Message::from(String::from("testtopic").as_bytes()));
let msgid = Uuid::new_v4();
msgparts.push(zmq::Message::from(msgid.as_bytes().to_vec()));
let data = json!({"testkey":"ÄäkkösTesti" });
assert_eq!(data["testkey"], String::from("ÄäkkösTesti"));
let mpdata = rmp_serde::to_vec(&data).unwrap();
msgparts.push(zmq::Message::from(mpdata));
msgparts.push(zmq::Message::from(String::from("extra part 1").as_bytes()));
msgparts.push(zmq::Message::from(String::from("extra part 2").as_bytes()));
let msg = *PubSubDataMessage::zmq_decode(msgparts).unwrap();
assert_eq!(msg.msgid, msgid);
assert_eq!(msg.topic, String::from("testtopic"));
assert_eq!(msg.data["testkey"], String::from("ÄäkkösTesti"));
assert_eq!(msg.extra_parts[0], String::from("extra part 1").as_bytes());
assert_eq!(msg.extra_parts[1], String::from("extra part 2").as_bytes());
let newmsgparts = msg.zmq_encode().unwrap();
assert_eq!(
newmsgparts[0],
zmq::Message::from(String::from("testtopic").as_bytes())
);
assert_eq!(
newmsgparts[3],
zmq::Message::from(String::from("extra part 1").as_bytes())
);
}
#[test]
fn test_dm_into_psdm() {
let mut msg = DataMessage::new().unwrap();
msg.data["testkey"] = serde_json::to_value(String::from("ÄäkkösTesti")).unwrap();
msg.extra_parts.push(BytesMut::from("extra part 1"));
msg.extra_parts.push(BytesMut::from("extra part 2"));
let psmsg = PubSubDataMessage::from(&msg);
assert_eq!(msg.msgid, psmsg.msgid);
assert_eq!(msg.data, psmsg.data);
assert_eq!(msg.extra_parts, psmsg.extra_parts);
}
#[test]
fn test_dm_into_psdm_consume() {
let mut msg = DataMessage::new().unwrap();
msg.data["testkey"] = serde_json::to_value(String::from("ÄäkkösTesti")).unwrap();
msg.extra_parts.push(BytesMut::from("extra part 1"));
msg.extra_parts.push(BytesMut::from("extra part 2"));
let cmp_msgid = msg.msgid.clone();
let cmp_data = msg.data.clone();
let cmp_extra = msg.extra_parts.clone();
let psmsg = PubSubDataMessage::from(msg);
assert_eq!(cmp_msgid, psmsg.msgid);
assert_eq!(cmp_data, psmsg.data);
assert_eq!(cmp_extra, psmsg.extra_parts);
}
#[test]
fn test_pubsub_tryinto_consume() {
let mut msgparts: Vec<zmq::Message> = Vec::with_capacity(5);
msgparts.push(zmq::Message::from(String::from("testtopic").as_bytes()));
let msgid = Uuid::new_v4();
msgparts.push(zmq::Message::from(msgid.as_bytes().to_vec()));
let data = json!({"testkey":"ÄäkkösTesti" });
assert_eq!(data["testkey"], String::from("ÄäkkösTesti"));
let mpdata = rmp_serde::to_vec(&data).unwrap();
msgparts.push(zmq::Message::from(mpdata));
msgparts.push(zmq::Message::from(String::from("extra part 1").as_bytes()));
msgparts.push(zmq::Message::from(String::from("extra part 2").as_bytes()));
let psmsg = *PubSubMessage::zmq_decode(msgparts).unwrap();
let msg = PubSubDataMessage::try_from(psmsg).unwrap();
assert_eq!(msg.msgid, msgid);
assert_eq!(msg.topic, String::from("testtopic"));
assert_eq!(msg.data["testkey"], String::from("ÄäkkösTesti"));
assert_eq!(msg.extra_parts[0], String::from("extra part 1").as_bytes());
assert_eq!(msg.extra_parts[1], String::from("extra part 2").as_bytes());
}
#[test]
fn test_pubsub_tryinto() {
let mut msgparts: Vec<zmq::Message> = Vec::with_capacity(5);
msgparts.push(zmq::Message::from(String::from("testtopic").as_bytes()));
let msgid = Uuid::new_v4();
msgparts.push(zmq::Message::from(msgid.as_bytes().to_vec()));
let data = json!({"testkey":"ÄäkkösTesti" });
assert_eq!(data["testkey"], String::from("ÄäkkösTesti"));
let mpdata = rmp_serde::to_vec(&data).unwrap();
msgparts.push(zmq::Message::from(mpdata));
msgparts.push(zmq::Message::from(String::from("extra part 1").as_bytes()));
msgparts.push(zmq::Message::from(String::from("extra part 2").as_bytes()));
let psmsg = *PubSubMessage::zmq_decode(msgparts).unwrap();
let msg = PubSubDataMessage::try_from(&psmsg).unwrap();
assert_eq!(msg.topic, psmsg.topic);
assert_eq!(msg.msgid.as_bytes().to_vec(), psmsg.dataparts[0].to_vec());
assert_eq!(msg.data["testkey"], String::from("ÄäkkösTesti"));
assert_eq!(msg.extra_parts[0], psmsg.dataparts[2]);
assert_eq!(msg.extra_parts[1], psmsg.dataparts[3]);
}
#[test]
fn test_rawmessage_tryinto_psdm_roundtrip() {
let mut dmsg_orig = PubSubDataMessage::new("testtopic".to_string()).unwrap();
dmsg_orig.data["testkey"] = serde_json::to_value(String::from("ÄäkkösTesti")).unwrap();
let rawmsg = RawMessage::try_from(&dmsg_orig).unwrap();
let dmsg = PubSubDataMessage::try_from(rawmsg).unwrap();
assert_eq!(dmsg.topic, dmsg_orig.topic);
assert_eq!(dmsg.msgid, dmsg_orig.msgid);
assert_eq!(dmsg.data, dmsg_orig.data);
assert_eq!(dmsg.extra_parts, dmsg_orig.extra_parts);
}
#[test]
fn test_rawmessage_tryinto_dm_roundtrip() {
let mut dmsg_orig = DataMessage::new().unwrap();
dmsg_orig.data["testkey"] = serde_json::to_value(String::from("ÄäkkösTesti")).unwrap();
let rawmsg = RawMessage::try_from(&dmsg_orig).unwrap();
let dmsg = DataMessage::try_from(rawmsg).unwrap();
assert_eq!(dmsg.msgid, dmsg_orig.msgid);
assert_eq!(dmsg.data, dmsg_orig.data);
assert_eq!(dmsg.extra_parts, dmsg_orig.extra_parts);
}
}