use crate::abstracts::{RawMessage, ZMQCodec, ZMQSocketArc};
use crate::markers;
use bytes::BytesMut;
use failure::Fallible;
use parking_lot::Mutex;
use std::convert::TryFrom;
use std::fmt;
use std::sync::Arc;
use uuid::Uuid;
use zmq;
#[derive(Debug, Default, Clone)]
pub struct PubSubMessage {
pub topic: String,
pub dataparts: Vec<BytesMut>,
}
impl PubSubMessage {
pub fn new(topic: String) -> Fallible<PubSubMessage> {
Ok(PubSubMessage {
topic: topic,
..Default::default()
})
}
}
impl ZMQCodec for PubSubMessage {
fn zmq_encode(&self) -> Fallible<Vec<zmq::Message>> {
let mut ret: Vec<zmq::Message> = Vec::with_capacity(self.dataparts.len() + 1);
ret.push(zmq::Message::from(&self.topic.as_bytes()));
for part in self.dataparts.iter() {
ret.push(zmq::Message::from(&part[..]));
}
Ok(ret)
}
fn zmq_decode(from: Vec<zmq::Message>) -> Fallible<Box<Self>> {
if from.len() < 2 {
return Err(failure::err_msg("Too few parts"));
}
let topic = from[0]
.as_str()
.ok_or(failure::err_msg("Topic part is not string"))?
.to_string();
let mut dataparts: Vec<BytesMut> = Vec::with_capacity(from.len() - 1);
for (idx, msg) in from.iter().enumerate() {
if idx == 0 {
continue;
}
dataparts.push(BytesMut::from(msg as &[u8]));
}
let msgbox = Box::new(PubSubMessage { topic, dataparts });
Ok(msgbox)
}
}
impl markers::ZMQMessageMarker for PubSubMessage {}
impl markers::PubSubMessageMarker for PubSubMessage {}
crate::naive_tryfrom!(PubSubMessage, [RawMessage]);
crate::naive_tryfrom!(RawMessage, [PubSubMessage]);
pub trait PubSubManager {
fn subscribe(&mut self, sub: &Subscription) -> Fallible<()>;
fn get_subscriptions(&self) -> &Vec<Subscription>;
fn dispatch(&self, psmsg: PubSubMessage) {
for sub in self.get_subscriptions().iter() {
for topic in sub.topics.iter() {
if !topic.contains(&psmsg.topic) {
continue;
}
let cb = &mut *sub.callback.lock();
(*cb)(&sub, psmsg.clone());
}
}
}
fn get_default_pub_socket(&self) -> &Option<ZMQSocketArc>;
fn publish_to_socket<T: markers::PubSubMessageMarker + fmt::Debug>(
&self,
msg: &T,
socket_mutex: &ZMQSocketArc,
) -> Fallible<()> {
let msgparts = msg.zmq_encode()?;
log::trace!("Sending {:?}", &msg);
socket_mutex.lock().send_multipart(msgparts, 0)?;
Ok(())
}
fn publish<T: markers::PubSubMessageMarker + fmt::Debug>(&self, msg: &T) -> Fallible<()> {
let socket = match self.get_default_pub_socket() {
Some(socket) => socket,
None => return Err(failure::err_msg("No default socket")),
};
self.publish_to_socket(msg, &socket)
}
}
pub type PubSubCallbackType = dyn FnMut(&Subscription, PubSubMessage) + Send;
pub type PubSubCBFailType = dyn FnMut(&Subscription, Vec<zmq::Message>) + Send;
#[derive(Clone)]
pub struct Subscription {
pub socketuris: Vec<String>,
pub topics: Vec<String>,
pub metadata: serde_json::Value,
pub trackingid: uuid::Uuid,
pub callback: Arc<Mutex<Box<PubSubCallbackType>>>,
}
impl fmt::Debug for Subscription {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Subscription")
.field("socketuris", &self.socketuris)
.field("topics", &self.topics)
.field("metadata", &self.metadata)
.field("trackingid", &self.trackingid)
.field("callback", &"<hidden>".to_string())
.finish()
}
}
impl Subscription {
pub fn new(
socketuris: Vec<String>,
topics: Vec<String>,
callback: impl FnMut(&Subscription, PubSubMessage) + Send + 'static,
) -> Fallible<Subscription> {
let ret = Subscription {
socketuris,
topics,
metadata: serde_json::json!({}),
callback: Arc::new(Mutex::new(Box::new(callback))),
trackingid: Uuid::new_v4(),
};
Ok(ret)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::datamessage::PubSubDataMessage;
#[test]
fn test_new_psmsg() {
let psmsg = PubSubMessage::new("foobar".to_string()).unwrap();
assert_eq!(psmsg.topic, "foobar".to_string());
log::debug!("Message is {:?}", psmsg);
}
#[test]
fn test_datamessage_decodes_as_pubsubmessage() {
let dmsg = PubSubDataMessage::new("testtopic".to_string()).unwrap();
let msgparts = dmsg.zmq_encode().unwrap();
assert!(msgparts.len() == 3);
let psmsg = *PubSubMessage::zmq_decode(msgparts).unwrap();
assert_eq!(dmsg.topic, psmsg.topic);
assert!(psmsg.dataparts.len() == 2);
}
#[test]
fn test_datamessage_tryinto_pubsubmessage() {
let dmsg = PubSubDataMessage::new("testtopic".to_string()).unwrap();
let psmsg = PubSubMessage::try_from(&dmsg).unwrap();
assert_eq!(dmsg.topic, psmsg.topic);
assert!(psmsg.dataparts.len() == 2);
}
#[test]
fn test_datamessage_tryinto_pubsubmessage_consume() {
let dmsg = PubSubDataMessage::new("testtopic".to_string()).unwrap();
let psmsg = PubSubMessage::try_from(dmsg).unwrap();
assert_eq!(psmsg.topic, "testtopic".to_string());
assert!(psmsg.dataparts.len() == 2);
}
#[test]
fn test_rawmessage_tryinto_roundtrip() {
let mut dmsg_orig = PubSubDataMessage::new("testtopic".to_string()).unwrap();
dmsg_orig.data["testkey"] = serde_json::to_value(String::from("ÄäkkösTesti")).unwrap();
let rawmsg = RawMessage::try_from(dmsg_orig).unwrap();
let psmsg = PubSubMessage::try_from(rawmsg).unwrap();
assert_eq!(psmsg.topic, "testtopic".to_string());
assert!(psmsg.dataparts.len() == 2);
}
struct DispatchReceiver {
messages: Vec<PubSubMessage>,
}
impl DispatchReceiver {
pub fn callback(&mut self, _sub: &Subscription, msg: PubSubMessage) {
log::debug!("DPR callback");
self.messages.push(msg);
}
}
#[test]
fn test_subscription_callback() {
let recv = Arc::new(Mutex::new(DispatchReceiver {
messages: Vec::new(),
}));
let crecv = recv.clone();
let sub = Subscription::new(
vec!["inproc://foobar".to_string()],
vec!["test_topic_1".to_string()],
move |sub: &Subscription, msg: PubSubMessage| crecv.lock().callback(&sub, msg),
)
.unwrap();
log::debug!("Sub is {:?}", sub);
let psmsg = PubSubMessage::new("foobar".to_string()).unwrap();
assert_eq!(recv.lock().messages.len(), 0);
let cb = &mut *sub.callback.lock();
(*cb)(&sub, psmsg);
assert_eq!(recv.lock().messages.len(), 1);
}
#[derive(Default)]
struct DispatchTestPSMgr {
pub subscriptions: Vec<Subscription>,
}
impl PubSubManager for DispatchTestPSMgr {
fn get_subscriptions(&self) -> &Vec<Subscription> {
&self.subscriptions
}
fn subscribe(&mut self, sub: &Subscription) -> Fallible<()> {
self.subscriptions.push(sub.clone());
Ok(())
}
fn get_default_pub_socket(&self) -> &Option<ZMQSocketArc> {
&None
}
}
#[test]
fn test_dispatch() {
let recv = Arc::new(Mutex::new(DispatchReceiver {
messages: Vec::new(),
}));
let mut psmgr = DispatchTestPSMgr {
..Default::default()
};
let crecv = recv.clone();
let sub = Subscription::new(
vec!["inproc://foobar".to_string()],
vec!["test_topic_1".to_string()],
move |sub: &Subscription, msg: PubSubMessage| crecv.lock().callback(&sub, msg),
)
.unwrap();
psmgr.subscribe(&sub).unwrap();
let msg1 = PubSubMessage::new("test".to_string()).unwrap();
let msg2 = PubSubMessage::new("test_topic_1".to_string()).unwrap();
let msg3 = PubSubMessage::new("test_topic_nomatch".to_string()).unwrap();
assert_eq!(recv.lock().messages.len(), 0);
psmgr.dispatch(msg1);
psmgr.dispatch(msg2);
psmgr.dispatch(msg3);
assert_eq!(recv.lock().messages.len(), 2);
}
}