use crate::TerminationFlag;
use datastreamcorelib;
use datastreamcorelib::abstracts as coreabs;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use datastreamcorelib::abstracts::SocketHandler;
use datastreamcorelib::abstracts::{ZMQCodec, ZMQSocketArc};
use datastreamcorelib::pubsub as coreps;
use datastreamcorelib::pubsub::PubSubManager;
use failure::Fallible;
use lazy_static::lazy_static;
use log;
use parking_lot::Mutex;
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use tokio;
#[derive(Default)]
pub struct TokioPubSubManager {
pub subscriptions: Vec<coreps::Subscription>,
pub default_pub_socket: Option<ZMQSocketArc>,
pub term_flag: TerminationFlag,
_socket_readers: HashMap<coreabs::ZMQSocketType, tokio::task::JoinHandle<Fallible<()>>>,
}
impl fmt::Debug for TokioPubSubManager {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TokioPubSubManager")
.field("subscriptions", &self.subscriptions)
.field("term_flag", &self.term_flag)
.field("_socket_readers", &"<hidden>".to_string())
.finish()
}
}
pub type PSManagerArc = Arc<Mutex<TokioPubSubManager>>;
lazy_static! {
static ref PUBSUBMANAGER_SINGLETON: PSManagerArc =
Arc::new(Mutex::new(TokioPubSubManager::new()));
}
impl coreps::PubSubManager for TokioPubSubManager {
fn get_subscriptions(&self) -> &Vec<coreps::Subscription> {
&self.subscriptions
}
fn get_default_pub_socket(&self) -> &Option<coreabs::ZMQSocketArc> {
&self.default_pub_socket
}
fn subscribe(&mut self, sub: &coreps::Subscription) -> Fallible<()> {
self.subscriptions.push(sub.clone());
let sdesc = coreabs::ZMQSocketDescription {
socketuris: sub.socketuris.clone(),
sockettype: coreabs::ZMQSocketType::SUB,
};
let smgr_mutex = coreabs::BaseSocketHandler::instance();
let socket_mutex = smgr_mutex.lock().get_socket(&sdesc)?;
{
let socket = socket_mutex.lock();
for topic in sub.topics.iter() {
socket.set_subscribe(topic.as_bytes())?;
}
}
let readerkey = coreabs::ZMQSocketType::SUB;
if self._socket_readers.contains_key(&readerkey) {
log::debug!("SUB socket reader found (we hope it's not failed)");
return Ok(());
}
let task = tokio::spawn(reader_task(socket_mutex.clone(), self.term_flag.clone()));
match self._socket_readers.insert(readerkey, task) {
Some(_) => {
panic!("Inserted a task on top of a previous one, this should not happen");
}
None => {}
}
Ok(())
}
}
impl TokioPubSubManager {
pub fn instance() -> PSManagerArc {
PUBSUBMANAGER_SINGLETON.clone()
}
pub fn new() -> TokioPubSubManager {
TokioPubSubManager {
term_flag: Arc::new(AtomicBool::new(false)),
..Default::default()
}
}
pub fn set_default_pub_uris(&mut self, uris: &Vec<String>) -> Fallible<()> {
let smgr = coreabs::BaseSocketHandler::instance();
let sdesc = coreabs::ZMQSocketDescription {
socketuris: uris.clone(),
sockettype: coreabs::ZMQSocketType::PUB,
};
self.default_pub_socket = Some(smgr.lock().get_socket(&sdesc)?);
Ok(())
}
}
async fn reader_task(
socket_mutex: coreabs::ZMQSocketArc,
termflag: TerminationFlag,
) -> Fallible<()> {
let psmgr_mutex = TokioPubSubManager::instance();
loop {
tokio::task::yield_now().await;
let socket = socket_mutex.lock();
match socket.recv_multipart(zmq::DONTWAIT) {
Err(_e) => {
continue;
}
Ok(parts) => {
log::trace!("Got message parts {:?}", &parts);
let mut msgparts = Vec::with_capacity(parts.len());
for part in parts {
msgparts.push(zmq::Message::from(part));
}
let psmsg = match coreps::PubSubMessage::zmq_decode(msgparts) {
Ok(msg) => *msg,
Err(e) => {
log::error!("Could not decode message: {}", e);
continue;
}
};
psmgr_mutex.lock().dispatch(psmsg);
}
}
if termflag.load(Ordering::Relaxed) {
log::trace!("Got term flag, exiting reader task");
return Ok(());
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::utils::send_task;
use datastreamcorelib::datamessage as coredm;
use datastreamcorelib::logging::init_logging;
use std::env::temp_dir;
use std::time::{Duration, SystemTime};
use tokio::time::delay_for;
#[test]
fn new_instace() {
init_logging(log::LevelFilter::Trace).unwrap();
let psmgr = TokioPubSubManager::instance();
log::debug!("psmgr is {:?}", psmgr);
let _ = psmgr.lock();
}
struct DispatchReceiver {
messages: Vec<coreps::PubSubMessage>,
}
impl DispatchReceiver {
pub fn callback(&mut self, _sub: &coreps::Subscription, msg: coreps::PubSubMessage) {
log::debug!("Got message {:?} on sub {:?}", &msg, &_sub);
self.messages.push(msg);
}
}
#[test]
fn test_publish() {
let smgr = coreabs::BaseSocketHandler::instance();
let psmgr = TokioPubSubManager::instance();
let socketuris = vec!["inproc://pubtest".to_string()];
let sdesc = coreabs::ZMQSocketDescription {
socketuris: socketuris.clone(),
sockettype: coreabs::ZMQSocketType::PUB,
};
psmgr.lock().default_pub_socket = Some(smgr.lock().get_socket(&sdesc).unwrap());
let msg = coredm::PubSubDataMessage::new("footopic".to_string()).unwrap();
psmgr.lock().publish(&msg).unwrap();
}
#[tokio::test]
async fn test_pubsub_roundtrip() {
let smgr = coreabs::BaseSocketHandler::instance();
let psmgr = TokioPubSubManager::instance();
let mut tmppath1 = temp_dir();
tmppath1.push("7a2d57bd-eed6-4c63-81a5-b08f62df0945_pub.sock");
let sockpath1 = "ipc://".to_string() + &tmppath1.to_string_lossy();
let socketuris = vec![sockpath1];
let sdesc = coreabs::ZMQSocketDescription {
socketuris: socketuris.clone(),
sockettype: coreabs::ZMQSocketType::PUB,
};
psmgr.lock().default_pub_socket = Some(smgr.lock().get_socket(&sdesc).unwrap());
let recv = Arc::new(Mutex::new(DispatchReceiver {
messages: Vec::new(),
}));
let crecv = recv.clone();
let sub = coreps::Subscription::new(
socketuris.clone(),
vec!["pstesttopic_1".to_string()],
move |sub: &coreps::Subscription, msg: coreps::PubSubMessage| {
crecv.lock().callback(&sub, msg)
},
)
.unwrap();
psmgr.lock().subscribe(&sub).unwrap();
delay_for(Duration::from_millis(100)).await;
let msg = coredm::PubSubDataMessage::new("pstesttopic_1".to_string()).unwrap();
assert_eq!(recv.lock().messages.len(), 0);
psmgr.lock().publish(&msg).unwrap();
let task = tokio::spawn(send_task(msg));
let started = SystemTime::now();
loop {
delay_for(Duration::from_millis(50)).await;
if recv.lock().messages.len() >= 2 {
break;
}
if SystemTime::now() - Duration::from_millis(1500) > started {
panic!("Timed out waiting for messages")
}
}
assert_eq!(recv.lock().messages.len(), 2);
match task.await {
Err(_) => panic!("sendertask failed"),
Ok(_) => {}
}
}
#[test]
fn test_dispatch() {
let recv = Arc::new(Mutex::new(DispatchReceiver {
messages: Vec::new(),
}));
let crecv = recv.clone();
let sub = coreps::Subscription::new(
vec!["inproc://foobar".to_string()],
vec!["test_topic_1".to_string()],
move |sub: &coreps::Subscription, msg: coreps::PubSubMessage| {
crecv.lock().callback(&sub, msg)
},
)
.unwrap();
let psmgr = TokioPubSubManager::instance();
psmgr.lock().subscriptions.push(sub.clone());
let msg1 = coreps::PubSubMessage::new("test".to_string()).unwrap();
let msg2 = coreps::PubSubMessage::new("test_topic_1".to_string()).unwrap();
let msg3 = coreps::PubSubMessage::new("test_topic_nomatch".to_string()).unwrap();
psmgr.lock().dispatch(msg1);
psmgr.lock().dispatch(msg2);
psmgr.lock().dispatch(msg3);
assert_eq!(recv.lock().messages.len(), 2);
}
}