use bytes::BytesMut;
use clap::{crate_authors, crate_version, App, Arg};
use datastreamcorelib::datamessage::PubSubDataMessage;
use datastreamcorelib::imagemessage::PubSubImageMessage;
use datastreamcorelib::logging::init_logging;
use datastreamservicelib::heartbeat::heartbeat_task;
use datastreamservicelib::utils::{setup_signals_termination, wait_for_tasks};
use datastreamservicelib::zmqwrappers::TokioPubSubManager;
use datastreamservicelib::TerminationFlag;
use datastreamcorelib::pubsub::PubSubManager;
use failure::Fallible;
use futures::stream::FuturesUnordered;
use log;
use serde_json;
use std::sync::atomic::Ordering;
use std::time::Duration;
use tokio::runtime;
use tokio::time::delay_for;
fn main() -> Fallible<()> {
init_logging(log::LevelFilter::Trace)?;
let app = App::new("testpublisher")
.about("PUBlish test messages ")
.version(crate_version!())
.author(crate_authors!())
.arg(
Arg::with_name("socket_uri")
.short("s")
.long("socket_uri")
.takes_value(true)
.empty_values(false)
.required(true)
.help("For example ipc:///tmp/publisher.sock"),
)
.arg(
Arg::with_name("topic")
.short("t")
.long("topic")
.takes_value(true)
.empty_values(false)
.required(true)
.help("The topic to use for sending"),
)
.arg(
Arg::with_name("images")
.short("i")
.long("images")
.help("Send images (one red pixel)"),
)
.arg(
Arg::with_name("count")
.short("c")
.long("count")
.takes_value(true)
.empty_values(false)
.help("Number of messages to send"),
);
let app_args = app.get_matches();
let socket_uri = app_args.value_of("socket_uri").unwrap().to_string();
let topic = app_args.value_of("topic").unwrap().to_string();
let count_to: i64 = match app_args.value_of("count") {
None => -1,
Some(val) => val.parse()?,
};
let images = app_args.is_present("images");
let mut rt = runtime::Builder::new()
.basic_scheduler()
.enable_time()
.enable_io()
.build()?;
rt.block_on(mainloop(socket_uri, topic, count_to, images))?;
Ok(())
}
async fn mainloop(socket_uri: String, topic: String, count_to: i64, images: bool) -> Fallible<()> {
let term = setup_signals_termination()?;
let psmgr = TokioPubSubManager::instance();
{
let mut psmgr_locked = psmgr.lock();
psmgr_locked.term_flag = term.clone();
psmgr_locked.set_default_pub_uris(&vec![socket_uri])?;
}
delay_for(Duration::from_millis(150)).await;
let tasks = FuturesUnordered::new();
tasks.push(tokio::spawn(heartbeat_task(term.clone())));
if images {
tasks.push(tokio::spawn(img_publisher_task(
topic,
count_to,
term.clone(),
)));
} else {
tasks.push(tokio::spawn(publisher_task(topic, count_to, term.clone())));
}
wait_for_tasks(tasks).await?;
Ok(())
}
async fn publisher_task(topic: String, count_to: i64, term: TerminationFlag) -> Fallible<()> {
let psmgr = TokioPubSubManager::instance();
let mut msgno: i64 = 0;
loop {
msgno = msgno + 1;
let mut msg = PubSubDataMessage::new(topic.clone())?;
msg.data["msgno"] = serde_json::to_value(msgno)?;
psmgr.lock().publish(&msg)?;
if count_to > 0 && msgno >= count_to {
log::info!("Sent {} messages, exiting now", msgno);
term.store(true, Ordering::Relaxed);
return Ok(());
}
if term.load(Ordering::Relaxed) {
log::trace!("Got term flag, exiting publisher task");
return Ok(());
}
delay_for(Duration::from_millis(100)).await;
}
}
async fn img_publisher_task(topic: String, count_to: i64, term: TerminationFlag) -> Fallible<()> {
let psmgr = TokioPubSubManager::instance();
let mut msgno: i64 = 0;
loop {
msgno = msgno + 1;
let mut msg = PubSubImageMessage::new(topic.clone())?;
msg.data["msgno"] = serde_json::to_value(msgno)?;
msg.imginfo["w"] = serde_json::json!(1);
msg.imginfo["h"] = serde_json::json!(1);
let imgdata: [u8; 3] = [0, 0, 255];
msg.imgdata = BytesMut::from(&imgdata[..]);
psmgr.lock().publish(&msg)?;
if count_to > 0 && msgno >= count_to {
log::info!("Sent {} messages, exiting now", msgno);
term.store(true, Ordering::Relaxed);
return Ok(());
}
if term.load(Ordering::Relaxed) {
log::trace!("Got term flag, exiting publisher task");
return Ok(());
}
delay_for(Duration::from_millis(100)).await;
}
}