use crate::abstracts::{RawMessage, ZMQCodec};
use crate::binpackers::vec_to_uuid;
use crate::datamessage::{DataMessage, PubSubDataMessage};
use crate::markers;
use crate::pubsub::PubSubMessage;
use bytes::BytesMut;
use chrono;
use failure::Fallible;
use log;
use math::round;
use rmp_serde;
use serde_json;
use std::convert::TryFrom;
use std::fmt;
use uuid::Uuid;
use zmq;
pub trait ImgdataSanityCheck {
fn get_imginfo(&self) -> &serde_json::Value;
fn get_imgdata(&self) -> &bytes::BytesMut;
fn sanity_check(&self) -> bool {
let imginfo = match self.get_imginfo().as_object() {
Some(inner) => inner.clone(),
None => {
return false;
}
};
if imginfo.contains_key("compressed") && imginfo["compressed"].is_boolean() {
if imginfo["compressed"].as_bool().unwrap() {
log::warn!("Cannot sanity-check compressed data");
return true;
}
}
for keynamestr in ["bpp", "ch", "w", "h"].iter() {
let keyname = keynamestr.to_string();
if !imginfo.contains_key(&keyname) {
log::error!("imginfo is missing key {}", &keyname);
return false;
}
if !imginfo[&keyname].is_u64() {
log::error!("imginfo[{}] is not (positive) integer", &keyname);
return false;
}
if imginfo[&keyname].as_u64().unwrap() == 0 {
log::error!("imginfo[{}] is not nonzero", &keyname);
return false;
}
}
let imgdata = self.get_imgdata();
let bytes_pp = round::ceil(imginfo["bpp"].as_f64().unwrap() / 8.0, 0) as u64;
let expected_len = bytes_pp
* imginfo["ch"].as_u64().unwrap()
* imginfo["w"].as_u64().unwrap()
* imginfo["h"].as_u64().unwrap();
if imgdata.len() != expected_len as usize {
log::error!(
"imgdata size ({}) does not match expected ({})",
imgdata.len(),
&expected_len
);
return false;
}
true
}
}
#[derive(Default, Clone)]
pub struct ImageMessage {
pub msgid: uuid::Uuid,
pub data: serde_json::Value,
pub imginfo: serde_json::Value,
pub imgdata: bytes::BytesMut,
pub extra_parts: Vec<BytesMut>,
}
impl fmt::Debug for ImageMessage {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ImageMessage")
.field("msgid", &self.msgid)
.field("data", &self.data)
.field("imginfo", &self.imginfo)
.field("imgdata", &"<hidden>".to_string())
.field("extra_parts", &self.extra_parts)
.finish()
}
}
impl ImageMessage {
pub fn new() -> Fallible<ImageMessage> {
let mut ret = ImageMessage {
msgid: Uuid::new_v4(),
..Default::default()
};
ret.data["systemtime"] = serde_json::to_value(
chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
)?;
ret.imginfo = serde_json::json!({
"format": "bgr8",
"bpp": 8,
"ch": 3,
"w": 0,
"h": 0
});
Ok(ret)
}
}
impl ImgdataSanityCheck for ImageMessage {
fn get_imginfo(&self) -> &serde_json::Value {
&self.imginfo
}
fn get_imgdata(&self) -> &bytes::BytesMut {
&self.imgdata
}
}
impl ZMQCodec for ImageMessage {
fn zmq_decode(from: Vec<zmq::Message>) -> Fallible<Box<Self>> {
if from.len() < 3 {
return Err(failure::err_msg("Too few parts"));
}
let msgid = vec_to_uuid(from[0].to_vec())?;
let mut rawdata: serde_json::Value = rmp_serde::from_slice(&from[1][..])?;
if !rawdata.is_object() {
return Err(failure::err_msg("Data is not map/object"));
}
let data = rawdata.as_object_mut().unwrap();
if !data.contains_key("imginfo") {
return Err(failure::err_msg("no 'imginfo' in data"));
}
let imginfo: serde_json::Value = data["imginfo"].clone();
data.remove("imginfo").unwrap();
let imgdata = BytesMut::from(&from[2][..]);
let mut extra_parts: Vec<BytesMut> = Vec::with_capacity(from.len() - 3);
for (idx, zmqpart) in from.iter().enumerate() {
if idx < 3 {
continue;
}
extra_parts.push(BytesMut::from(zmqpart as &[u8]));
}
let msg = ImageMessage {
msgid,
data: serde_json::to_value(data)?,
imginfo,
imgdata,
extra_parts,
};
if !msg.sanity_check() {
return Err(failure::err_msg("Image data is not sane"));
}
let msgbox = Box::new(msg);
Ok(msgbox)
}
fn zmq_encode(&self) -> Fallible<Vec<zmq::Message>> {
let mut ret: Vec<zmq::Message> = Vec::with_capacity(3 + self.extra_parts.len());
let mut data = self.data.clone();
data["imginfo"] = self.imginfo.clone();
ret.push(zmq::Message::from(&self.msgid.as_bytes()[..]));
ret.push(zmq::Message::from(rmp_serde::to_vec(&data)?));
ret.push(zmq::Message::from(&self.imgdata[..]));
for extrapart in self.extra_parts.iter() {
ret.push(zmq::Message::from(&extrapart[..]))
}
Ok(ret)
}
}
impl markers::ZMQMessageMarker for ImageMessage {}
impl markers::DataMessageMarker for ImageMessage {}
impl markers::ImageMessageMarker for ImageMessage {}
crate::naive_tryfrom!(ImageMessage, [RawMessage, DataMessage]);
crate::naive_tryfrom!(DataMessage, [ImageMessage]);
crate::naive_tryfrom!(RawMessage, [ImageMessage]);
#[derive(Default, Clone)]
pub struct PubSubImageMessage {
pub topic: String,
pub msgid: uuid::Uuid,
pub data: serde_json::Value,
pub imginfo: serde_json::Value,
pub imgdata: bytes::BytesMut,
pub extra_parts: Vec<BytesMut>,
}
impl fmt::Debug for PubSubImageMessage {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ImageMessage")
.field("topic", &self.topic)
.field("msgid", &self.msgid)
.field("data", &self.data)
.field("imginfo", &self.imginfo)
.field("imgdata", &"<hidden>".to_string())
.field("extra_parts", &self.extra_parts)
.finish()
}
}
impl PubSubImageMessage {
pub fn new(topic: String) -> Fallible<PubSubImageMessage> {
let mut ret = PubSubImageMessage {
topic: topic,
msgid: Uuid::new_v4(),
..Default::default()
};
ret.data["systemtime"] = serde_json::to_value(
chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
)?;
ret.imginfo = serde_json::json!({
"format": "bgr8",
"bpp": 8,
"ch": 3,
"w": 0,
"h": 0
});
Ok(ret)
}
}
impl ImgdataSanityCheck for PubSubImageMessage {
fn get_imginfo(&self) -> &serde_json::Value {
&self.imginfo
}
fn get_imgdata(&self) -> &bytes::BytesMut {
&self.imgdata
}
}
impl ZMQCodec for PubSubImageMessage {
fn zmq_decode(from: Vec<zmq::Message>) -> Fallible<Box<Self>> {
if from.len() < 4 {
return Err(failure::err_msg("Too few parts"));
}
let topic = from[0]
.as_str()
.ok_or(failure::err_msg("Topic part is not string"))?
.to_string();
let msgid = vec_to_uuid(from[1].to_vec())?;
let mut rawdata: serde_json::Value = rmp_serde::from_slice(&from[2][..])?;
if !rawdata.is_object() {
return Err(failure::err_msg("Data is not map/object"));
}
let data = rawdata.as_object_mut().unwrap();
if !data.contains_key("imginfo") {
return Err(failure::err_msg("no 'imginfo' in data"));
}
let imginfo: serde_json::Value = data["imginfo"].clone();
data.remove("imginfo").unwrap();
let imgdata = BytesMut::from(&from[3][..]);
let mut extra_parts: Vec<BytesMut> = Vec::with_capacity(from.len() - 4);
for (idx, zmqpart) in from.iter().enumerate() {
if idx < 4 {
continue;
}
extra_parts.push(BytesMut::from(zmqpart as &[u8]));
}
let msg = PubSubImageMessage {
topic,
msgid,
data: serde_json::to_value(data)?,
imginfo,
imgdata,
extra_parts,
};
if !msg.sanity_check() {
return Err(failure::err_msg("Image data is not sane"));
}
let msgbox = Box::new(msg);
Ok(msgbox)
}
fn zmq_encode(&self) -> Fallible<Vec<zmq::Message>> {
let mut ret: Vec<zmq::Message> = Vec::with_capacity(4 + self.extra_parts.len());
let mut data = self.data.clone();
data["imginfo"] = self.imginfo.clone();
ret.push(zmq::Message::from(&self.topic.as_bytes()));
ret.push(zmq::Message::from(&self.msgid.as_bytes()[..]));
ret.push(zmq::Message::from(rmp_serde::to_vec(&data)?));
ret.push(zmq::Message::from(&self.imgdata[..]));
for extrapart in self.extra_parts.iter() {
ret.push(zmq::Message::from(&extrapart[..]))
}
Ok(ret)
}
}
impl markers::ZMQMessageMarker for PubSubImageMessage {}
impl markers::DataMessageMarker for PubSubImageMessage {}
impl markers::ImageMessageMarker for PubSubImageMessage {}
impl markers::PubSubMessageMarker for PubSubImageMessage {}
impl markers::PubSubDataMessageMarker for PubSubImageMessage {}
crate::naive_tryfrom!(
PubSubImageMessage,
[RawMessage, PubSubMessage, PubSubDataMessage]
);
crate::naive_tryfrom!(PubSubDataMessage, [PubSubImageMessage]);
crate::naive_tryfrom!(PubSubMessage, [PubSubImageMessage]);
crate::naive_tryfrom!(RawMessage, [PubSubImageMessage]);
impl From<ImageMessage> for PubSubImageMessage {
fn from(dmsg: ImageMessage) -> Self {
let psmsg = PubSubImageMessage {
topic: "UNDEFINED".to_string(),
msgid: dmsg.msgid,
data: dmsg.data,
imginfo: dmsg.imginfo,
imgdata: dmsg.imgdata,
extra_parts: dmsg.extra_parts,
};
psmsg
}
}
impl From<&ImageMessage> for PubSubImageMessage {
fn from(dmsg: &ImageMessage) -> Self {
let psmsg = PubSubImageMessage {
topic: "UNDEFINED".to_string(),
msgid: dmsg.msgid.clone(),
data: dmsg.data.clone(),
imginfo: dmsg.imginfo.clone(),
imgdata: dmsg.imgdata.clone(),
extra_parts: dmsg.extra_parts.clone(),
};
psmsg
}
}
#[cfg(test)]
mod tests {
use super::*;
fn red_pixel() -> ImageMessage {
let mut msg = ImageMessage::new().unwrap();
msg.imginfo["w"] = serde_json::json!(1);
msg.imginfo["h"] = serde_json::json!(1);
let imgdata: [u8; 3] = [0, 0, 255];
msg.imgdata = BytesMut::from(&imgdata[..]);
msg.extra_parts.push(BytesMut::from("extra part 1"));
msg.extra_parts.push(BytesMut::from("extra part 2"));
return msg;
}
fn red_pubsub_pixel() -> PubSubImageMessage {
let mut msg = PubSubImageMessage::new("red".to_string()).unwrap();
msg.imginfo["w"] = serde_json::json!(1);
msg.imginfo["h"] = serde_json::json!(1);
let imgdata: [u8; 3] = [0, 0, 255];
msg.imgdata = BytesMut::from(&imgdata[..]);
msg.extra_parts.push(BytesMut::from("extra part 1"));
msg.extra_parts.push(BytesMut::from("extra part 2"));
return msg;
}
#[test]
fn test_new() {
let msg = ImageMessage::new().unwrap();
log::debug!("msg is {:?}", msg);
assert_eq!(msg.imginfo["format"], "bgr8".to_string());
assert_eq!(msg.imginfo["ch"], 3);
assert_ne!(msg.sanity_check(), true);
}
#[test]
fn test_new_pubsub() {
let msg = PubSubImageMessage::new("test".to_string()).unwrap();
log::debug!("msg is {:?}", msg);
assert_eq!(msg.imginfo["format"], "bgr8".to_string());
assert_eq!(msg.imginfo["ch"], 3);
assert_ne!(msg.sanity_check(), true);
}
#[test]
fn test_self_roundtrip() {
let mut red = red_pixel();
red.data["foobar"] = serde_json::json!("barfoo");
let msgparts = red.zmq_encode().unwrap();
let dec = *ImageMessage::zmq_decode(msgparts).unwrap();
assert_eq!(red.imgdata, dec.imgdata);
assert_eq!(red.imginfo, dec.imginfo);
assert_eq!(red.extra_parts, dec.extra_parts);
assert_eq!(red.data, dec.data);
assert_eq!(dec.data["foobar"], "barfoo".to_string());
}
#[test]
fn test_tryinto_roundtrip() {
let red = red_pixel();
let dmsg = DataMessage::try_from(&red).unwrap();
let rmsg = RawMessage::try_from(&dmsg).unwrap();
let dec = ImageMessage::try_from(&rmsg).unwrap();
assert_eq!(red.imgdata, dec.imgdata);
assert_eq!(red.imginfo, dec.imginfo);
assert_eq!(red.extra_parts, dec.extra_parts);
assert_eq!(red.data, dec.data);
let dec = ImageMessage::try_from(&dmsg).unwrap();
assert_eq!(red.imgdata, dec.imgdata);
assert_eq!(red.imginfo, dec.imginfo);
assert_eq!(red.extra_parts, dec.extra_parts);
assert_eq!(red.data, dec.data);
}
#[test]
fn test_pubsub_tryinto_roundtrip() {
let red = red_pubsub_pixel();
let dmsg = PubSubDataMessage::try_from(&red).unwrap();
let psmsg = PubSubMessage::try_from(&dmsg).unwrap();
let dec = PubSubImageMessage::try_from(&psmsg).unwrap();
assert_eq!(red.imgdata, dec.imgdata);
assert_eq!(red.imginfo, dec.imginfo);
assert_eq!(red.extra_parts, dec.extra_parts);
assert_eq!(red.data, dec.data);
let dec = PubSubImageMessage::try_from(&dmsg).unwrap();
assert_eq!(red.imgdata, dec.imgdata);
assert_eq!(red.imginfo, dec.imginfo);
assert_eq!(red.extra_parts, dec.extra_parts);
assert_eq!(red.data, dec.data);
}
#[test]
fn test_pubsub_self_roundtrip() {
let mut red = red_pubsub_pixel();
red.data["foobar"] = serde_json::json!("barfoo");
let msgparts = red.zmq_encode().unwrap();
let dec = *PubSubImageMessage::zmq_decode(msgparts).unwrap();
assert_eq!(red.imgdata, dec.imgdata);
assert_eq!(red.imginfo, dec.imginfo);
assert_eq!(red.extra_parts, dec.extra_parts);
assert_eq!(red.data, dec.data);
assert_eq!(dec.data["foobar"], "barfoo".to_string());
}
#[test]
fn test_pubsub_self_multibyte_roundtrip() {
let mut red = red_pubsub_pixel();
red.data["foobar"] = serde_json::json!("barfoo");
red.imginfo["bpp"] = serde_json::json!(12);
let imgdata: [u8; 6] = [0, 0, 0, 0, 15, 255];
red.imgdata = BytesMut::from(&imgdata[..]);
let msgparts = red.zmq_encode().unwrap();
let dec = *PubSubImageMessage::zmq_decode(msgparts).unwrap();
assert_eq!(red.imgdata, dec.imgdata);
assert_eq!(red.imginfo, dec.imginfo);
assert_eq!(red.extra_parts, dec.extra_parts);
assert_eq!(red.data, dec.data);
assert_eq!(dec.data["foobar"], "barfoo".to_string());
}
}