use clap::{crate_authors, crate_version, App, Arg};
use datastreamcorelib::datamessage::PubSubDataMessage;
use datastreamcorelib::imagemessage::PubSubImageMessage;
use datastreamcorelib::logging::init_logging;
use datastreamcorelib::pubsub as coreps;
use datastreamservicelib::utils::{setup_signals_termination, wait_for_tasks};
use datastreamservicelib::zmqwrappers::TokioPubSubManager;
use datastreamservicelib::TerminationFlag;
use datastreamcorelib::pubsub::PubSubManager;
use failure::Fallible;
use futures::stream::FuturesUnordered;
use log;
use std::convert::TryFrom;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::runtime;
use tokio::time::delay_for;
fn main() -> Fallible<()> {
init_logging(log::LevelFilter::Debug)?;
let app = App::new("testpublisher")
.about("PUBlish test messages ")
.version(crate_version!())
.author(crate_authors!())
.arg(
Arg::with_name("socket_uri")
.short("s")
.long("socket_uri")
.takes_value(true)
.empty_values(false)
.required(true)
.help("For example ipc:///tmp/publisher.sock"),
)
.arg(
Arg::with_name("topic")
.short("t")
.long("topic")
.takes_value(true)
.empty_values(false)
.required(true)
.help("The topic to use for receiving"),
)
.arg(
Arg::with_name("images")
.short("i")
.long("images")
.help("Use image decoder"),
)
.arg(
Arg::with_name("count")
.short("c")
.long("count")
.takes_value(true)
.empty_values(false)
.help("Number of messages to receive"),
);
let app_args = app.get_matches();
let socket_uri = app_args.value_of("socket_uri").unwrap().to_string();
let topic = app_args.value_of("topic").unwrap().to_string();
let count: Option<i64> = match app_args.value_of("count") {
None => None,
Some(val) => Some(val.parse()?),
};
let images = app_args.is_present("images");
let mut rt = runtime::Builder::new()
.basic_scheduler()
.enable_time()
.enable_io()
.build()?;
rt.block_on(mainloop(socket_uri, topic, count, images))?;
Ok(())
}
struct MsgReceiver {
imagedecode: bool,
termflag: TerminationFlag,
count_to: i64,
counter: i64,
}
impl MsgReceiver {
pub fn callback(&mut self, _sub: &coreps::Subscription, msg: coreps::PubSubMessage) {
if self.imagedecode {
let msg = match PubSubImageMessage::try_from(msg) {
Err(e) => {
log::error!("Decode error {:?}", e);
return;
}
Ok(msg) => msg,
};
log::debug!("Got image {:?} on sub {:?}", &msg, &_sub);
} else {
let msg = match PubSubDataMessage::try_from(msg) {
Err(e) => {
log::error!("Decode error {:?}", e);
return;
}
Ok(msg) => msg,
};
log::debug!("Got message {:?} on sub {:?}", &msg, &_sub);
}
self.counter = self.counter + 1;
if self.count_to > 0 && self.counter >= self.count_to {
log::info!("Got {} messages, exiting now", self.counter);
self.termflag.store(true, Ordering::Relaxed);
}
}
}
async fn mainloop(
socket_uri: String,
topic: String,
count: Option<i64>,
imagedecode: bool,
) -> Fallible<()> {
let term = setup_signals_termination()?;
let socketuris = vec![socket_uri.clone()];
let psmgr = TokioPubSubManager::instance();
let count_to = match count {
Some(cnt) => cnt,
None => -1,
};
let recv = Arc::new(Mutex::new(MsgReceiver {
imagedecode: imagedecode,
termflag: term.clone(),
count_to: count_to,
counter: 0,
}));
let crecv = recv.clone();
let sub = coreps::Subscription::new(
socketuris.clone(),
vec![topic.clone()],
move |sub: &coreps::Subscription, msg: coreps::PubSubMessage| {
crecv.lock().unwrap().callback(&sub, msg)
},
)?;
{
let mut psmgr_locked = psmgr.lock();
psmgr_locked.term_flag = term.clone();
psmgr_locked.subscribe(&sub)?;
}
log::info!("Subscription done");
delay_for(Duration::from_millis(150)).await;
let tasks = FuturesUnordered::new();
tasks.push(tokio::spawn(idletask(term.clone())));
wait_for_tasks(tasks).await?;
Ok(())
}
async fn idletask(term: TerminationFlag) -> Fallible<()> {
loop {
if term.load(Ordering::Relaxed) {
log::trace!("Got term flag, exiting idle task");
return Ok(());
}
delay_for(Duration::from_millis(10)).await;
}
}