datastreamservicelib 1.0.0

Rust version of https://gitlab.com/advian-oss/python-datastreamservicelib
Documentation
/// ZMQ wrapping stuff
use crate::TerminationFlag;
use datastreamcorelib;
use datastreamcorelib::abstracts as coreabs;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
// we need to explicitly import this trait for it to work
use datastreamcorelib::abstracts::SocketHandler;
// we need to explicitly import this trait for it to work
use datastreamcorelib::abstracts::{ZMQCodec, ZMQSocketArc};
use datastreamcorelib::pubsub as coreps;
// we need to explicitly import this trait for it to work
use datastreamcorelib::pubsub::PubSubManager;
use failure::Fallible;
use lazy_static::lazy_static;
use log;
use parking_lot::Mutex;
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use tokio;

/// Track subscriptions, handle actual reader tasks etc
#[derive(Default)]
pub struct TokioPubSubManager {
    pub subscriptions: Vec<coreps::Subscription>,
    pub default_pub_socket: Option<ZMQSocketArc>,
    pub term_flag: TerminationFlag,
    _socket_readers: HashMap<coreabs::ZMQSocketType, tokio::task::JoinHandle<Fallible<()>>>,
}

impl fmt::Debug for TokioPubSubManager {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("TokioPubSubManager")
            .field("subscriptions", &self.subscriptions)
            .field("term_flag", &self.term_flag)
            .field("_socket_readers", &"<hidden>".to_string())
            .finish()
    }
}

/// Arc wrapping Mutex wrapping TokioPubSubManager instance
pub type PSManagerArc = Arc<Mutex<TokioPubSubManager>>;

lazy_static! {
    static ref PUBSUBMANAGER_SINGLETON: PSManagerArc =
        Arc::new(Mutex::new(TokioPubSubManager::new()));
}

impl coreps::PubSubManager for TokioPubSubManager {
    /// Return list of subscriptions
    fn get_subscriptions(&self) -> &Vec<coreps::Subscription> {
        &self.subscriptions
    }
    /// Get the default publish socket (if available)
    fn get_default_pub_socket(&self) -> &Option<coreabs::ZMQSocketArc> {
        &self.default_pub_socket
    }

    /// Subscribe and spawn a reader task
    fn subscribe(&mut self, sub: &coreps::Subscription) -> Fallible<()> {
        self.subscriptions.push(sub.clone());
        let sdesc = coreabs::ZMQSocketDescription {
            socketuris: sub.socketuris.clone(),
            sockettype: coreabs::ZMQSocketType::SUB,
        };
        let smgr_mutex = coreabs::BaseSocketHandler::instance();
        let socket_mutex = smgr_mutex.lock().get_socket(&sdesc)?;
        // Scope limit the lock to this critical part
        {
            let socket = socket_mutex.lock();
            for topic in sub.topics.iter() {
                socket.set_subscribe(topic.as_bytes())?;
            }
        }

        let readerkey = coreabs::ZMQSocketType::SUB;
        if self._socket_readers.contains_key(&readerkey) {
            log::debug!("SUB socket reader found (we hope it's not failed)");
            return Ok(());
        }
        let task = tokio::spawn(reader_task(socket_mutex.clone(), self.term_flag.clone()));
        match self._socket_readers.insert(readerkey, task) {
            Some(_) => {
                panic!("Inserted a task on top of a previous one, this should not happen");
            }
            None => {}
        }
        Ok(())
    }
}

impl TokioPubSubManager {
    /// Return a singleton wrapped in a mutex
    pub fn instance() -> PSManagerArc {
        PUBSUBMANAGER_SINGLETON.clone()
    }
    /// Create a new instance, generally use the .instance() singleton getter instead
    pub fn new() -> TokioPubSubManager {
        TokioPubSubManager {
            term_flag: Arc::new(AtomicBool::new(false)),
            ..Default::default()
        }
    }
    /// Create and set the default PUB socket
    pub fn set_default_pub_uris(&mut self, uris: &Vec<String>) -> Fallible<()> {
        let smgr = coreabs::BaseSocketHandler::instance();
        let sdesc = coreabs::ZMQSocketDescription {
            socketuris: uris.clone(),
            sockettype: coreabs::ZMQSocketType::PUB,
        };
        // TODO: Why unwrap compiles but ?-check for error does not ("cannot be sent between threads safely") ???
        self.default_pub_socket = Some(smgr.lock().get_socket(&sdesc)?);
        Ok(())
    }
}

// Due to lifetimes this must be a standalone function instead of method
/// Reads socket for messages, matches topics and passes the messages to callbacks
async fn reader_task(
    socket_mutex: coreabs::ZMQSocketArc,
    termflag: TerminationFlag,
) -> Fallible<()> {
    let psmgr_mutex = TokioPubSubManager::instance();
    loop {
        // We can't await inside the lock match so yield here instead to the arm
        // where we fail to acquire the lock
        // https://docs.rs/tokio/0.2.16/tokio/fn.spawn.html#using-send-values-from-a-task
        tokio::task::yield_now().await;
        let socket = socket_mutex.lock();
        match socket.recv_multipart(zmq::DONTWAIT) {
            Err(_e) => {
                // TODO: Check that it's the "no messages" error and not some other
                continue;
            }
            Ok(parts) => {
                log::trace!("Got message parts {:?}", &parts);
                let mut msgparts = Vec::with_capacity(parts.len());
                for part in parts {
                    msgparts.push(zmq::Message::from(part));
                }
                let psmsg = match coreps::PubSubMessage::zmq_decode(msgparts) {
                    Ok(msg) => *msg,
                    Err(e) => {
                        // TODO: handle the failure somehow ?
                        log::error!("Could not decode message: {}", e);
                        continue;
                    }
                };
                psmgr_mutex.lock().dispatch(psmsg);
            }
        }
        if termflag.load(Ordering::Relaxed) {
            log::trace!("Got term flag, exiting reader task");
            return Ok(());
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::utils::send_task;
    use datastreamcorelib::datamessage as coredm;
    use datastreamcorelib::logging::init_logging;
    use std::env::temp_dir;
    use std::time::{Duration, SystemTime};
    use tokio::time::delay_for;

    #[test]
    fn new_instace() {
        init_logging(log::LevelFilter::Trace).unwrap();
        let psmgr = TokioPubSubManager::instance();
        log::debug!("psmgr is {:?}", psmgr);
        let _ = psmgr.lock();
    }

    struct DispatchReceiver {
        messages: Vec<coreps::PubSubMessage>,
    }

    impl DispatchReceiver {
        pub fn callback(&mut self, _sub: &coreps::Subscription, msg: coreps::PubSubMessage) {
            log::debug!("Got message {:?} on sub {:?}", &msg, &_sub);
            self.messages.push(msg);
        }
    }

    #[test]
    fn test_publish() {
        let smgr = coreabs::BaseSocketHandler::instance();
        let psmgr = TokioPubSubManager::instance();
        let socketuris = vec!["inproc://pubtest".to_string()];
        let sdesc = coreabs::ZMQSocketDescription {
            socketuris: socketuris.clone(),
            sockettype: coreabs::ZMQSocketType::PUB,
        };
        psmgr.lock().default_pub_socket = Some(smgr.lock().get_socket(&sdesc).unwrap());
        let msg = coredm::PubSubDataMessage::new("footopic".to_string()).unwrap();
        psmgr.lock().publish(&msg).unwrap();
    }

    #[tokio::test]
    async fn test_pubsub_roundtrip() {
        let smgr = coreabs::BaseSocketHandler::instance();
        let psmgr = TokioPubSubManager::instance();
        let mut tmppath1 = temp_dir();
        tmppath1.push("7a2d57bd-eed6-4c63-81a5-b08f62df0945_pub.sock");
        let sockpath1 = "ipc://".to_string() + &tmppath1.to_string_lossy();
        // for some reason inproc socket won't work here
        let socketuris = vec![sockpath1];
        let sdesc = coreabs::ZMQSocketDescription {
            socketuris: socketuris.clone(),
            sockettype: coreabs::ZMQSocketType::PUB,
        };
        psmgr.lock().default_pub_socket = Some(smgr.lock().get_socket(&sdesc).unwrap());

        let recv = Arc::new(Mutex::new(DispatchReceiver {
            messages: Vec::new(),
        }));
        let crecv = recv.clone();
        let sub = coreps::Subscription::new(
            socketuris.clone(),
            vec!["pstesttopic_1".to_string()],
            move |sub: &coreps::Subscription, msg: coreps::PubSubMessage| {
                crecv.lock().callback(&sub, msg)
            },
        )
        .unwrap();
        psmgr.lock().subscribe(&sub).unwrap();
        // Give the sockets some time to do their magic
        delay_for(Duration::from_millis(100)).await;

        let msg = coredm::PubSubDataMessage::new("pstesttopic_1".to_string()).unwrap();
        assert_eq!(recv.lock().messages.len(), 0);
        psmgr.lock().publish(&msg).unwrap();
        let task = tokio::spawn(send_task(msg));

        // Give the sockets some time to do their magic
        let started = SystemTime::now();
        loop {
            delay_for(Duration::from_millis(50)).await;
            if recv.lock().messages.len() >= 2 {
                break;
            }
            if SystemTime::now() - Duration::from_millis(1500) > started {
                panic!("Timed out waiting for messages")
            }
        }
        assert_eq!(recv.lock().messages.len(), 2);
        match task.await {
            Err(_) => panic!("sendertask failed"),
            Ok(_) => {}
        }
    }

    #[test]
    fn test_dispatch() {
        let recv = Arc::new(Mutex::new(DispatchReceiver {
            messages: Vec::new(),
        }));
        let crecv = recv.clone();
        let sub = coreps::Subscription::new(
            vec!["inproc://foobar".to_string()],
            vec!["test_topic_1".to_string()],
            move |sub: &coreps::Subscription, msg: coreps::PubSubMessage| {
                crecv.lock().callback(&sub, msg)
            },
        )
        .unwrap();
        // Fake subscribing since we don't want to start the task
        let psmgr = TokioPubSubManager::instance();
        psmgr.lock().subscriptions.push(sub.clone());

        // Instantiace some messages
        let msg1 = coreps::PubSubMessage::new("test".to_string()).unwrap();
        let msg2 = coreps::PubSubMessage::new("test_topic_1".to_string()).unwrap();
        let msg3 = coreps::PubSubMessage::new("test_topic_nomatch".to_string()).unwrap();

        psmgr.lock().dispatch(msg1);
        psmgr.lock().dispatch(msg2);
        psmgr.lock().dispatch(msg3);
        assert_eq!(recv.lock().messages.len(), 2);
    }
}