datastreamcorelib 0.5.1

Rust version of https://gitlab.com/advian-oss/python-datastreamcorelib
Documentation
/// PUB/SUB related traits
use crate::abstracts::{RawMessage, ZMQCodec, ZMQSocketArc};
use crate::markers;
use bytes::BytesMut;
use failure::Fallible;
use std::convert::TryFrom;
use std::fmt;
use std::sync::{Arc, Mutex};
use uuid::Uuid;
use zmq;

/// Raw PUB/SUB message
#[derive(Debug, Default, Clone)]
pub struct PubSubMessage {
    pub topic: String,
    pub dataparts: Vec<BytesMut>,
}

impl PubSubMessage {
    /// Instantiate new PubSubDataMessage with given topic
    pub fn new(topic: String) -> Fallible<PubSubMessage> {
        Ok(PubSubMessage {
            topic: topic,
            ..Default::default()
        })
    }
}

impl ZMQCodec for PubSubMessage {
    /// Encode the abtracted message into ZMQ message parts, ready for sending
    fn zmq_encode(&self) -> Fallible<Vec<zmq::Message>> {
        let mut ret: Vec<zmq::Message> = Vec::with_capacity(self.dataparts.len() + 1);
        ret.push(zmq::Message::from(&self.topic.as_bytes()));
        for part in self.dataparts.iter() {
            ret.push(zmq::Message::from(&part[..]));
        }
        Ok(ret)
    }

    /// Decode ZMQ message parts into a pointer to abstracted message
    fn zmq_decode(from: Vec<zmq::Message>) -> Fallible<Box<Self>> {
        if from.len() < 2 {
            return Err(failure::err_msg("Too few parts"));
        }
        let topic = from[0]
            .as_str()
            .ok_or(failure::err_msg("Topic part is not string"))?
            .to_string();
        let mut dataparts: Vec<BytesMut> = Vec::with_capacity(from.len() - 1);
        for (idx, msg) in from.iter().enumerate() {
            if idx == 0 {
                continue;
            }
            dataparts.push(BytesMut::from(msg as &[u8]));
        }
        let msgbox = Box::new(PubSubMessage { topic, dataparts });
        Ok(msgbox)
    }
}

impl markers::ZMQMessageMarker for PubSubMessage {}
impl markers::PubSubMessageMarker for PubSubMessage {}

crate::naive_tryfrom!(PubSubMessage, [RawMessage]);
crate::naive_tryfrom!(RawMessage, [PubSubMessage]);

/// Generic traits any PUB/SUB manager should implement
pub trait PubSubManager {
    /// Add subscription to the manager
    fn subscribe(&mut self, sub: &Subscription) -> Fallible<()>;
    /// Return the list of subscriptons
    fn get_subscriptions(&self) -> &Vec<Subscription>;
    // TODO: use this form when PubSubCallbackType uses trait for the message type
    //fn dispatch<T: markers::PubSubMessage>(&self, psmsg: T) {
    /// Dispatch message to matching subscribers
    fn dispatch(&self, psmsg: PubSubMessage) {
        for sub in self.get_subscriptions().iter() {
            for topic in sub.topics.iter() {
                if !topic.contains(&psmsg.topic) {
                    continue;
                }
                let cb = &mut *sub.callback.lock().unwrap();
                (*cb)(&sub, psmsg.clone());
            }
        }
    }
    /// Get the default publish socket (if available)
    fn get_default_pub_socket(&self) -> &Option<ZMQSocketArc>;
    /// Publish a message (anything with pubsub marker) to given socket
    /// Usually you would just use .publish() instead
    fn publish_to_socket<T: markers::PubSubMessageMarker + fmt::Debug>(
        &self,
        msg: &T,
        socket: &ZMQSocketArc,
    ) -> Fallible<()> {
        let msgparts = msg.zmq_encode()?;
        match socket.lock() {
            Err(e) => {
                log::error!("Could not acquire socket lock: {}", e);
                return Err(failure::err_msg("Could not lock socket"));
            }
            Ok(socket) => {
                log::trace!("Sending {:?}", &msg);
                socket.send_multipart(msgparts, 0)?;
            }
        }
        Ok(())
    }
    /// Publish a message (anything with pubsub marker) to default socket
    fn publish<T: markers::PubSubMessageMarker + fmt::Debug>(&self, msg: &T) -> Fallible<()> {
        let socket = match self.get_default_pub_socket() {
            Some(socket) => socket,
            None => return Err(failure::err_msg("No default socket")),
        };
        self.publish_to_socket(msg, &socket)
    }
}

// TODO: How to make the callback take marker trait instead
pub type PubSubCallbackType = dyn FnMut(&Subscription, PubSubMessage) + Send;
pub type PubSubCBFailType = dyn FnMut(&Subscription, Vec<zmq::Message>) + Send;

/// Subscription encapsulation
#[derive(Clone)]
pub struct Subscription {
    pub socketuris: Vec<String>,
    pub topics: Vec<String>,
    pub metadata: serde_json::Value,
    pub trackingid: uuid::Uuid,
    pub callback: Arc<Mutex<Box<PubSubCallbackType>>>,
}

impl fmt::Debug for Subscription {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("Subscription")
            .field("socketuris", &self.socketuris)
            .field("topics", &self.topics)
            .field("metadata", &self.metadata)
            .field("trackingid", &self.trackingid)
            .field("callback", &"<hidden>".to_string())
            .finish()
    }
}

impl Subscription {
    pub fn new(
        socketuris: Vec<String>,
        topics: Vec<String>,
        callback: impl FnMut(&Subscription, PubSubMessage) + Send + 'static,
    ) -> Fallible<Subscription> {
        let ret = Subscription {
            socketuris,
            topics,
            metadata: serde_json::json!({}),
            callback: Arc::new(Mutex::new(Box::new(callback))),
            trackingid: Uuid::new_v4(),
        };
        Ok(ret)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::datamessage::PubSubDataMessage;

    #[test]
    fn test_new_psmsg() {
        let psmsg = PubSubMessage::new("foobar".to_string()).unwrap();
        assert_eq!(psmsg.topic, "foobar".to_string());
        log::debug!("Message is {:?}", psmsg);
    }

    #[test]
    fn test_datamessage_decodes_as_pubsubmessage() {
        let dmsg = PubSubDataMessage::new("testtopic".to_string()).unwrap();
        let msgparts = dmsg.zmq_encode().unwrap();
        assert!(msgparts.len() == 3);
        let psmsg = *PubSubMessage::zmq_decode(msgparts).unwrap();
        assert_eq!(dmsg.topic, psmsg.topic);
        //assert_eq!(dmsg.topic, psmsg.get_topic());
        assert!(psmsg.dataparts.len() == 2);
    }

    #[test]
    fn test_datamessage_tryinto_pubsubmessage() {
        let dmsg = PubSubDataMessage::new("testtopic".to_string()).unwrap();
        let psmsg = PubSubMessage::try_from(&dmsg).unwrap();
        assert_eq!(dmsg.topic, psmsg.topic);
        //assert_eq!(dmsg.topic, psmsg.get_topic());
        assert!(psmsg.dataparts.len() == 2);
    }

    #[test]
    fn test_datamessage_tryinto_pubsubmessage_consume() {
        let dmsg = PubSubDataMessage::new("testtopic".to_string()).unwrap();
        let psmsg = PubSubMessage::try_from(dmsg).unwrap();
        assert_eq!(psmsg.topic, "testtopic".to_string());
        assert!(psmsg.dataparts.len() == 2);
    }

    #[test]
    fn test_rawmessage_tryinto_roundtrip() {
        let mut dmsg_orig = PubSubDataMessage::new("testtopic".to_string()).unwrap();
        dmsg_orig.data["testkey"] = serde_json::to_value(String::from("ÄäkkösTesti")).unwrap();
        let rawmsg = RawMessage::try_from(dmsg_orig).unwrap();
        let psmsg = PubSubMessage::try_from(rawmsg).unwrap();
        assert_eq!(psmsg.topic, "testtopic".to_string());
        assert!(psmsg.dataparts.len() == 2);
    }

    struct DispatchReceiver {
        messages: Vec<PubSubMessage>,
    }

    impl DispatchReceiver {
        pub fn callback(&mut self, _sub: &Subscription, msg: PubSubMessage) {
            //log::debug!("Got message {:?} on sub {:?}", &msg, &_sub);
            log::debug!("DPR callback");
            self.messages.push(msg);
        }
    }

    #[test]
    fn test_subscription_callback() {
        let recv = Arc::new(Mutex::new(DispatchReceiver {
            messages: Vec::new(),
        }));
        let crecv = recv.clone();
        let sub = Subscription::new(
            vec!["inproc://foobar".to_string()],
            vec!["test_topic_1".to_string()],
            move |sub: &Subscription, msg: PubSubMessage| crecv.lock().unwrap().callback(&sub, msg),
        )
        .unwrap();
        log::debug!("Sub is {:?}", sub);
        let psmsg = PubSubMessage::new("foobar".to_string()).unwrap();
        assert_eq!(recv.lock().unwrap().messages.len(), 0);
        let cb = &mut *sub.callback.lock().unwrap();
        (*cb)(&sub, psmsg);
        assert_eq!(recv.lock().unwrap().messages.len(), 1);
    }

    #[derive(Default)]
    struct DispatchTestPSMgr {
        pub subscriptions: Vec<Subscription>,
    }

    impl PubSubManager for DispatchTestPSMgr {
        fn get_subscriptions(&self) -> &Vec<Subscription> {
            &self.subscriptions
        }
        fn subscribe(&mut self, sub: &Subscription) -> Fallible<()> {
            self.subscriptions.push(sub.clone());
            Ok(())
        }
        fn get_default_pub_socket(&self) -> &Option<ZMQSocketArc> {
            &None
        }
    }

    #[test]
    fn test_dispatch() {
        let recv = Arc::new(Mutex::new(DispatchReceiver {
            messages: Vec::new(),
        }));
        let mut psmgr = DispatchTestPSMgr {
            ..Default::default()
        };
        let crecv = recv.clone();
        let sub = Subscription::new(
            vec!["inproc://foobar".to_string()],
            vec!["test_topic_1".to_string()],
            move |sub: &Subscription, msg: PubSubMessage| crecv.lock().unwrap().callback(&sub, msg),
        )
        .unwrap();
        psmgr.subscribe(&sub).unwrap();

        let msg1 = PubSubMessage::new("test".to_string()).unwrap();
        let msg2 = PubSubMessage::new("test_topic_1".to_string()).unwrap();
        let msg3 = PubSubMessage::new("test_topic_nomatch".to_string()).unwrap();
        assert_eq!(recv.lock().unwrap().messages.len(), 0);
        psmgr.dispatch(msg1);
        psmgr.dispatch(msg2);
        psmgr.dispatch(msg3);
        assert_eq!(recv.lock().unwrap().messages.len(), 2);
    }
}