datastreamservicelib 1.0.0

Rust version of https://gitlab.com/advian-oss/python-datastreamservicelib
Documentation
/// Simple test message publisher
use bytes::BytesMut;
use clap::{crate_authors, crate_version, App, Arg};
use datastreamcorelib::datamessage::PubSubDataMessage;
use datastreamcorelib::imagemessage::PubSubImageMessage;
use datastreamcorelib::logging::init_logging;
use datastreamservicelib::heartbeat::heartbeat_task;
use datastreamservicelib::utils::{setup_signals_termination, wait_for_tasks};
use datastreamservicelib::zmqwrappers::TokioPubSubManager;
use datastreamservicelib::TerminationFlag;
// we need to explicitly import this trait for it to work
use datastreamcorelib::pubsub::PubSubManager;
use failure::Fallible;
use futures::stream::FuturesUnordered;
use log;
use serde_json;
use std::sync::atomic::Ordering;
use std::time::Duration;
use tokio::runtime;
use tokio::time::delay_for;

/// Main entrypoint, handler CLI parsing and runtime setup
fn main() -> Fallible<()> {
    init_logging(log::LevelFilter::Trace)?;
    let app = App::new("testpublisher")
        .about("PUBlish test messages ")
        .version(crate_version!())
        .author(crate_authors!())
        .arg(
            Arg::with_name("socket_uri")
                .short("s")
                .long("socket_uri")
                .takes_value(true)
                .empty_values(false)
                .required(true)
                .help("For example ipc:///tmp/publisher.sock"),
        )
        .arg(
            Arg::with_name("topic")
                .short("t")
                .long("topic")
                .takes_value(true)
                .empty_values(false)
                .required(true)
                .help("The topic to use for sending"),
        )
        .arg(
            Arg::with_name("images")
                .short("i")
                .long("images")
                .help("Send images (one red pixel)"),
        )
        .arg(
            Arg::with_name("count")
                .short("c")
                .long("count")
                .takes_value(true)
                .empty_values(false)
                .help("Number of messages to send"),
        );
    let app_args = app.get_matches();
    let socket_uri = app_args.value_of("socket_uri").unwrap().to_string();
    let topic = app_args.value_of("topic").unwrap().to_string();
    let count_to: i64 = match app_args.value_of("count") {
        None => -1,
        Some(val) => val.parse()?,
    };
    let images = app_args.is_present("images");

    // Use the non-threaded scheduler explicitly
    let mut rt = runtime::Builder::new()
        .basic_scheduler()
        .enable_time()
        .enable_io()
        .build()?;
    rt.block_on(mainloop(socket_uri, topic, count_to, images))?;
    Ok(())
}

/// Mainloop task, runtime will block on this one.
async fn mainloop(socket_uri: String, topic: String, count_to: i64, images: bool) -> Fallible<()> {
    let term = setup_signals_termination()?;
    let psmgr = TokioPubSubManager::instance();
    // Scope limit the lock to this critical part
    {
        let mut psmgr_locked = psmgr.lock();
        psmgr_locked.term_flag = term.clone();
        psmgr_locked.set_default_pub_uris(&vec![socket_uri])?;
    }
    // Give the socket a moment to stabilize
    delay_for(Duration::from_millis(150)).await;

    // Track tasks
    let tasks = FuturesUnordered::new();
    tasks.push(tokio::spawn(heartbeat_task(term.clone())));
    if images {
        tasks.push(tokio::spawn(img_publisher_task(
            topic,
            count_to,
            term.clone(),
        )));
    } else {
        tasks.push(tokio::spawn(publisher_task(topic, count_to, term.clone())));
    }
    wait_for_tasks(tasks).await?;

    Ok(())
}

/// Publisher task, will publish messages with counter in the given topic
async fn publisher_task(topic: String, count_to: i64, term: TerminationFlag) -> Fallible<()> {
    let psmgr = TokioPubSubManager::instance();
    let mut msgno: i64 = 0;
    loop {
        msgno = msgno + 1;
        let mut msg = PubSubDataMessage::new(topic.clone())?;
        msg.data["msgno"] = serde_json::to_value(msgno)?;
        psmgr.lock().publish(&msg)?;
        if count_to > 0 && msgno >= count_to {
            log::info!("Sent {} messages, exiting now", msgno);
            term.store(true, Ordering::Relaxed);
            return Ok(());
        }
        if term.load(Ordering::Relaxed) {
            log::trace!("Got term flag, exiting publisher task");
            return Ok(());
        }
        delay_for(Duration::from_millis(100)).await;
    }
}

async fn img_publisher_task(topic: String, count_to: i64, term: TerminationFlag) -> Fallible<()> {
    let psmgr = TokioPubSubManager::instance();
    let mut msgno: i64 = 0;
    loop {
        msgno = msgno + 1;
        let mut msg = PubSubImageMessage::new(topic.clone())?;
        msg.data["msgno"] = serde_json::to_value(msgno)?;
        msg.imginfo["w"] = serde_json::json!(1);
        msg.imginfo["h"] = serde_json::json!(1);
        let imgdata: [u8; 3] = [0, 0, 255];
        msg.imgdata = BytesMut::from(&imgdata[..]);
        psmgr.lock().publish(&msg)?;
        if count_to > 0 && msgno >= count_to {
            log::info!("Sent {} messages, exiting now", msgno);
            term.store(true, Ordering::Relaxed);
            return Ok(());
        }
        if term.load(Ordering::Relaxed) {
            log::trace!("Got term flag, exiting publisher task");
            return Ok(());
        }
        delay_for(Duration::from_millis(100)).await;
    }
}