datastreamcorelib 0.5.1

Rust version of https://gitlab.com/advian-oss/python-datastreamcorelib
Documentation
/// DataMessage
use crate::abstracts::{RawMessage, ZMQCodec};
use crate::binpackers::vec_to_uuid;
use crate::markers;
use crate::pubsub::PubSubMessage;
use bytes::BytesMut;
use chrono;
use failure::Fallible;
use rmp_serde;
use serde_json;
use std::convert::From;
use std::convert::TryFrom;
use uuid::Uuid;
use zmq;

/// Generic datamessage (not PUB/SUB)
#[derive(Debug, Default, Clone)]
pub struct DataMessage {
    pub msgid: uuid::Uuid,
    pub data: serde_json::Value,
    pub extra_parts: Vec<BytesMut>,
}

impl DataMessage {
    /// Instantiate new datamessage
    pub fn new() -> Fallible<DataMessage> {
        let mut ret = DataMessage {
            msgid: Uuid::new_v4(),
            ..Default::default()
        };
        ret.data["systemtime"] = serde_json::to_value(
            chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
        )?;
        Ok(ret)
    }
}

impl markers::ZMQMessageMarker for DataMessage {}
impl markers::DataMessageMarker for DataMessage {}

impl ZMQCodec for DataMessage {
    /// Decode ZMQ message parts into a pointer to abstracted message
    fn zmq_decode(from: Vec<zmq::Message>) -> Fallible<Box<Self>> {
        if from.len() < 2 {
            return Err(failure::err_msg("Too few parts"));
        }
        let msgid = vec_to_uuid(from[0].to_vec())?;
        let data: serde_json::Value = rmp_serde::from_slice(&from[1][..])?;
        let mut extra_parts: Vec<BytesMut> = Vec::with_capacity(from.len() - 2);
        for (idx, zmqpart) in from.iter().enumerate() {
            if idx < 2 {
                continue;
            }
            extra_parts.push(BytesMut::from(zmqpart as &[u8]));
        }
        let msgbox = Box::new(DataMessage {
            msgid,
            data,
            extra_parts,
        });
        Ok(msgbox)
    }

    /// Encode the abtracted message into ZMQ message parts, ready for sending
    fn zmq_encode(&self) -> Fallible<Vec<zmq::Message>> {
        let mut ret: Vec<zmq::Message> = Vec::with_capacity(2 + self.extra_parts.len());
        ret.push(zmq::Message::from(&self.msgid.as_bytes()[..]));
        ret.push(zmq::Message::from(rmp_serde::to_vec(&self.data)?));
        for extrapart in self.extra_parts.iter() {
            ret.push(zmq::Message::from(&extrapart[..]))
        }
        Ok(ret)
    }
}

crate::naive_tryfrom!(DataMessage, [RawMessage]);
crate::naive_tryfrom!(RawMessage, [DataMessage]);

/// PubSubDataMessage can be converted to DataMessage but topic is dropped
impl From<PubSubDataMessage> for DataMessage {
    fn from(dmsg: PubSubDataMessage) -> Self {
        let dmsg = DataMessage {
            msgid: dmsg.msgid,
            data: dmsg.data,
            extra_parts: dmsg.extra_parts,
        };
        dmsg
    }
}

/// PubSubDataMessage can be converted to DataMessage but topic is dropped
impl From<&PubSubDataMessage> for DataMessage {
    fn from(dmsg: &PubSubDataMessage) -> Self {
        let dmsg = DataMessage {
            msgid: dmsg.msgid.clone(),
            data: dmsg.data.clone(),
            extra_parts: dmsg.extra_parts.clone(),
        };
        dmsg
    }
}

/// Generic PUB/SUB datamessage
#[derive(Debug, Default, Clone)]
pub struct PubSubDataMessage {
    pub topic: String,
    pub msgid: uuid::Uuid,
    pub data: serde_json::Value,
    pub extra_parts: Vec<BytesMut>,
}

impl PubSubDataMessage {
    /// Instantiate new PubSubDataMessage with given topic
    pub fn new(topic: String) -> Fallible<PubSubDataMessage> {
        let mut ret = PubSubDataMessage {
            topic: topic,
            msgid: Uuid::new_v4(),
            ..Default::default()
        };
        ret.data["systemtime"] = serde_json::to_value(
            chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
        )?;
        Ok(ret)
    }
}

impl markers::ZMQMessageMarker for PubSubDataMessage {}
impl markers::DataMessageMarker for PubSubDataMessage {}
impl markers::PubSubDataMessageMarker for PubSubDataMessage {}
impl markers::PubSubMessageMarker for PubSubDataMessage {}

impl ZMQCodec for PubSubDataMessage {
    /// Decode ZMQ message parts into a pointer to abstracted message
    fn zmq_decode(from: Vec<zmq::Message>) -> Fallible<Box<Self>> {
        if from.len() < 3 {
            return Err(failure::err_msg("Too few parts"));
        }
        let topic = from[0]
            .as_str()
            .ok_or(failure::err_msg("Topic part is not string"))?
            .to_string();
        let msgid = vec_to_uuid(from[1].to_vec())?;
        let data: serde_json::Value = rmp_serde::from_slice(&from[2][..])?;
        let mut extra_parts: Vec<BytesMut> = Vec::with_capacity(from.len() - 3);
        for (idx, zmqpart) in from.iter().enumerate() {
            if idx < 3 {
                continue;
            }
            extra_parts.push(BytesMut::from(zmqpart as &[u8]));
        }
        let msgbox = Box::new(PubSubDataMessage {
            topic,
            msgid,
            data,
            extra_parts,
        });
        Ok(msgbox)
    }

    /// Encode the abtracted message into ZMQ message parts, ready for sending
    fn zmq_encode(&self) -> Fallible<Vec<zmq::Message>> {
        let mut ret: Vec<zmq::Message> = Vec::with_capacity(3 + self.extra_parts.len());
        ret.push(zmq::Message::from(&self.topic.as_bytes()));
        ret.push(zmq::Message::from(&self.msgid.as_bytes()[..]));
        ret.push(zmq::Message::from(rmp_serde::to_vec(&self.data)?));
        for extrapart in self.extra_parts.iter() {
            ret.push(zmq::Message::from(&extrapart[..]))
        }
        Ok(ret)
    }
}

crate::naive_tryfrom!(PubSubDataMessage, [PubSubMessage, RawMessage]);
crate::naive_tryfrom!(PubSubMessage, [PubSubDataMessage]);
crate::naive_tryfrom!(RawMessage, [PubSubDataMessage]);

/// DataMessage can be converted to PubSubDataMessage but topic is set to UNDEFINED
impl From<DataMessage> for PubSubDataMessage {
    fn from(dmsg: DataMessage) -> Self {
        let psmsg = PubSubDataMessage {
            topic: "UNDEFINED".to_string(),
            msgid: dmsg.msgid,
            data: dmsg.data,
            extra_parts: dmsg.extra_parts,
        };
        psmsg
    }
}

/// DataMessage can be converted to PubSubDataMessage but topic is set to UNDEFINED
impl From<&DataMessage> for PubSubDataMessage {
    fn from(dmsg: &DataMessage) -> Self {
        let psmsg = PubSubDataMessage {
            topic: "UNDEFINED".to_string(),
            msgid: dmsg.msgid.clone(),
            data: dmsg.data.clone(),
            extra_parts: dmsg.extra_parts.clone(),
        };
        psmsg
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::pubsub::PubSubMessage;
    use serde_json::json;

    #[test]
    fn test_encode_decode_roundtrip() {
        let mut msg = DataMessage::new().unwrap();
        log::debug!("msg is {:?}", msg);
        msg.data["testkey"] = serde_json::to_value(String::from("ÄäkkösTesti")).unwrap();
        msg.extra_parts.push(BytesMut::from("extra part 1"));
        msg.extra_parts.push(BytesMut::from("extra part 2"));
        let msgparts = msg.zmq_encode().unwrap();
        assert_eq!(msgparts.len(), 4);
        let decoded_uuid = vec_to_uuid(msgparts[0].to_vec()).unwrap();
        assert_eq!(msg.msgid, decoded_uuid);
        let msg2 = *DataMessage::zmq_decode(msgparts).unwrap();
        assert_eq!(msg.msgid, msg2.msgid);
        assert_eq!(msg.data, msg2.data);
        assert_eq!(msg2.extra_parts.len(), 2);
        assert_eq!(msg2.extra_parts[0], "extra part 1");
        assert_eq!(msg2.extra_parts[1], "extra part 2");
    }

    #[test]
    fn test_pubsub_encode_decode_roundtrip() {
        let mut msg = PubSubDataMessage::new(String::from("testtopic")).unwrap();
        log::debug!("msg is {:?}", msg);
        msg.data["testkey"] = serde_json::to_value(String::from("ÄäkkösTesti")).unwrap();
        let msg = msg;
        let msgparts = msg.zmq_encode().unwrap();
        assert_eq!(msgparts[0].as_str().unwrap(), String::from("testtopic"));
        let decoded_uuid = vec_to_uuid(msgparts[1].to_vec()).unwrap();
        assert_eq!(msg.msgid, decoded_uuid);
        let msg2 = *PubSubDataMessage::zmq_decode(msgparts).unwrap();
        assert_eq!(msg.msgid, msg2.msgid);
        assert_eq!(msg.topic, msg2.topic);
        assert_eq!(msg.data, msg2.data);
        //assert_eq!(msg2.get_topic(), msg.get_topic());
        //assert_eq!(msg2.get_topic(), msg2.topic);
    }

    #[test]
    fn test_pubsub_extraparts_roundtrip() {
        let mut msgparts: Vec<zmq::Message> = Vec::with_capacity(5);
        msgparts.push(zmq::Message::from(String::from("testtopic").as_bytes()));
        let msgid = Uuid::new_v4();
        msgparts.push(zmq::Message::from(msgid.as_bytes().to_vec()));
        let data = json!({"testkey":"ÄäkkösTesti" });
        assert_eq!(data["testkey"], String::from("ÄäkkösTesti"));
        let mpdata = rmp_serde::to_vec(&data).unwrap();
        msgparts.push(zmq::Message::from(mpdata));
        msgparts.push(zmq::Message::from(String::from("extra part 1").as_bytes()));
        msgparts.push(zmq::Message::from(String::from("extra part 2").as_bytes()));

        let msg = *PubSubDataMessage::zmq_decode(msgparts).unwrap();
        assert_eq!(msg.msgid, msgid);
        assert_eq!(msg.topic, String::from("testtopic"));
        assert_eq!(msg.data["testkey"], String::from("ÄäkkösTesti"));
        assert_eq!(msg.extra_parts[0], String::from("extra part 1").as_bytes());
        assert_eq!(msg.extra_parts[1], String::from("extra part 2").as_bytes());

        let newmsgparts = msg.zmq_encode().unwrap();
        assert_eq!(
            newmsgparts[0],
            zmq::Message::from(String::from("testtopic").as_bytes())
        );
        assert_eq!(
            newmsgparts[3],
            zmq::Message::from(String::from("extra part 1").as_bytes())
        );
    }

    #[test]
    fn test_dm_into_psdm() {
        let mut msg = DataMessage::new().unwrap();
        msg.data["testkey"] = serde_json::to_value(String::from("ÄäkkösTesti")).unwrap();
        msg.extra_parts.push(BytesMut::from("extra part 1"));
        msg.extra_parts.push(BytesMut::from("extra part 2"));
        let psmsg = PubSubDataMessage::from(&msg);
        assert_eq!(msg.msgid, psmsg.msgid);
        assert_eq!(msg.data, psmsg.data);
        assert_eq!(msg.extra_parts, psmsg.extra_parts);
    }

    #[test]
    fn test_dm_into_psdm_consume() {
        let mut msg = DataMessage::new().unwrap();
        msg.data["testkey"] = serde_json::to_value(String::from("ÄäkkösTesti")).unwrap();
        msg.extra_parts.push(BytesMut::from("extra part 1"));
        msg.extra_parts.push(BytesMut::from("extra part 2"));
        let cmp_msgid = msg.msgid.clone();
        let cmp_data = msg.data.clone();
        let cmp_extra = msg.extra_parts.clone();
        let psmsg = PubSubDataMessage::from(msg);
        assert_eq!(cmp_msgid, psmsg.msgid);
        assert_eq!(cmp_data, psmsg.data);
        assert_eq!(cmp_extra, psmsg.extra_parts);
    }

    #[test]
    fn test_pubsub_tryinto_consume() {
        let mut msgparts: Vec<zmq::Message> = Vec::with_capacity(5);
        msgparts.push(zmq::Message::from(String::from("testtopic").as_bytes()));
        let msgid = Uuid::new_v4();
        msgparts.push(zmq::Message::from(msgid.as_bytes().to_vec()));
        let data = json!({"testkey":"ÄäkkösTesti" });
        assert_eq!(data["testkey"], String::from("ÄäkkösTesti"));
        let mpdata = rmp_serde::to_vec(&data).unwrap();
        msgparts.push(zmq::Message::from(mpdata));
        msgparts.push(zmq::Message::from(String::from("extra part 1").as_bytes()));
        msgparts.push(zmq::Message::from(String::from("extra part 2").as_bytes()));

        let psmsg = *PubSubMessage::zmq_decode(msgparts).unwrap();
        let msg = PubSubDataMessage::try_from(psmsg).unwrap();
        assert_eq!(msg.msgid, msgid);
        assert_eq!(msg.topic, String::from("testtopic"));
        assert_eq!(msg.data["testkey"], String::from("ÄäkkösTesti"));
        assert_eq!(msg.extra_parts[0], String::from("extra part 1").as_bytes());
        assert_eq!(msg.extra_parts[1], String::from("extra part 2").as_bytes());
    }

    #[test]
    fn test_pubsub_tryinto() {
        let mut msgparts: Vec<zmq::Message> = Vec::with_capacity(5);
        msgparts.push(zmq::Message::from(String::from("testtopic").as_bytes()));
        let msgid = Uuid::new_v4();
        msgparts.push(zmq::Message::from(msgid.as_bytes().to_vec()));
        let data = json!({"testkey":"ÄäkkösTesti" });
        assert_eq!(data["testkey"], String::from("ÄäkkösTesti"));
        let mpdata = rmp_serde::to_vec(&data).unwrap();
        msgparts.push(zmq::Message::from(mpdata));
        msgparts.push(zmq::Message::from(String::from("extra part 1").as_bytes()));
        msgparts.push(zmq::Message::from(String::from("extra part 2").as_bytes()));

        let psmsg = *PubSubMessage::zmq_decode(msgparts).unwrap();
        let msg = PubSubDataMessage::try_from(&psmsg).unwrap();
        assert_eq!(msg.topic, psmsg.topic);
        assert_eq!(msg.msgid.as_bytes().to_vec(), psmsg.dataparts[0].to_vec());
        assert_eq!(msg.data["testkey"], String::from("ÄäkkösTesti"));
        assert_eq!(msg.extra_parts[0], psmsg.dataparts[2]);
        assert_eq!(msg.extra_parts[1], psmsg.dataparts[3]);
    }

    #[test]
    fn test_rawmessage_tryinto_psdm_roundtrip() {
        let mut dmsg_orig = PubSubDataMessage::new("testtopic".to_string()).unwrap();
        dmsg_orig.data["testkey"] = serde_json::to_value(String::from("ÄäkkösTesti")).unwrap();
        let rawmsg = RawMessage::try_from(&dmsg_orig).unwrap();
        let dmsg = PubSubDataMessage::try_from(rawmsg).unwrap();
        assert_eq!(dmsg.topic, dmsg_orig.topic);
        assert_eq!(dmsg.msgid, dmsg_orig.msgid);
        assert_eq!(dmsg.data, dmsg_orig.data);
        assert_eq!(dmsg.extra_parts, dmsg_orig.extra_parts);
    }

    #[test]
    fn test_rawmessage_tryinto_dm_roundtrip() {
        let mut dmsg_orig = DataMessage::new().unwrap();
        dmsg_orig.data["testkey"] = serde_json::to_value(String::from("ÄäkkösTesti")).unwrap();
        let rawmsg = RawMessage::try_from(&dmsg_orig).unwrap();
        let dmsg = DataMessage::try_from(rawmsg).unwrap();
        assert_eq!(dmsg.msgid, dmsg_orig.msgid);
        assert_eq!(dmsg.data, dmsg_orig.data);
        assert_eq!(dmsg.extra_parts, dmsg_orig.extra_parts);
    }
}