datastreamservicelib 1.0.0

Rust version of https://gitlab.com/advian-oss/python-datastreamservicelib
Documentation
/// Simple test message receiver
use clap::{crate_authors, crate_version, App, Arg};
use datastreamcorelib::datamessage::PubSubDataMessage;
use datastreamcorelib::imagemessage::PubSubImageMessage;
use datastreamcorelib::logging::init_logging;
use datastreamcorelib::pubsub as coreps;
use datastreamservicelib::utils::{setup_signals_termination, wait_for_tasks};
use datastreamservicelib::zmqwrappers::TokioPubSubManager;
use datastreamservicelib::TerminationFlag;
// we need to explicitly import this trait for it to work
use datastreamcorelib::pubsub::PubSubManager;
use failure::Fallible;
use futures::stream::FuturesUnordered;
use log;
use std::convert::TryFrom;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::runtime;
use tokio::time::delay_for;

/// Main entrypoint, handler CLI parsing and runtime setup
fn main() -> Fallible<()> {
    init_logging(log::LevelFilter::Debug)?;
    let app = App::new("testpublisher")
        .about("PUBlish test messages ")
        .version(crate_version!())
        .author(crate_authors!())
        .arg(
            Arg::with_name("socket_uri")
                .short("s")
                .long("socket_uri")
                .takes_value(true)
                .empty_values(false)
                .required(true)
                .help("For example ipc:///tmp/publisher.sock"),
        )
        .arg(
            Arg::with_name("topic")
                .short("t")
                .long("topic")
                .takes_value(true)
                .empty_values(false)
                .required(true)
                .help("The topic to use for receiving"),
        )
        .arg(
            Arg::with_name("images")
                .short("i")
                .long("images")
                .help("Use image decoder"),
        )
        .arg(
            Arg::with_name("count")
                .short("c")
                .long("count")
                .takes_value(true)
                .empty_values(false)
                .help("Number of messages to receive"),
        );
    let app_args = app.get_matches();
    let socket_uri = app_args.value_of("socket_uri").unwrap().to_string();
    let topic = app_args.value_of("topic").unwrap().to_string();
    let count: Option<i64> = match app_args.value_of("count") {
        None => None,
        Some(val) => Some(val.parse()?),
    };
    let images = app_args.is_present("images");

    // Use the non-threaded scheduler explicitly
    let mut rt = runtime::Builder::new()
        .basic_scheduler()
        .enable_time()
        .enable_io()
        .build()?;
    rt.block_on(mainloop(socket_uri, topic, count, images))?;
    Ok(())
}

/// Receives messages and displays them via log
struct MsgReceiver {
    imagedecode: bool,
    termflag: TerminationFlag,
    count_to: i64,
    counter: i64,
}

impl MsgReceiver {
    /// Message receive callback, will decode according to settings and display
    pub fn callback(&mut self, _sub: &coreps::Subscription, msg: coreps::PubSubMessage) {
        if self.imagedecode {
            let msg = match PubSubImageMessage::try_from(msg) {
                Err(e) => {
                    log::error!("Decode error {:?}", e);
                    return;
                }
                Ok(msg) => msg,
            };
            log::debug!("Got image {:?} on sub {:?}", &msg, &_sub);
        } else {
            let msg = match PubSubDataMessage::try_from(msg) {
                Err(e) => {
                    log::error!("Decode error {:?}", e);
                    return;
                }
                Ok(msg) => msg,
            };
            log::debug!("Got message {:?} on sub {:?}", &msg, &_sub);
        }
        self.counter = self.counter + 1;
        if self.count_to > 0 && self.counter >= self.count_to {
            log::info!("Got {} messages, exiting now", self.counter);
            self.termflag.store(true, Ordering::Relaxed);
        }
    }
}

/// Mainloop task, runtime will block on this one.
async fn mainloop(
    socket_uri: String,
    topic: String,
    count: Option<i64>,
    imagedecode: bool,
) -> Fallible<()> {
    let term = setup_signals_termination()?;
    let socketuris = vec![socket_uri.clone()];
    let psmgr = TokioPubSubManager::instance();
    let count_to = match count {
        Some(cnt) => cnt,
        None => -1,
    };
    let recv = Arc::new(Mutex::new(MsgReceiver {
        imagedecode: imagedecode,
        termflag: term.clone(),
        count_to: count_to,
        counter: 0,
    }));
    let crecv = recv.clone();
    let sub = coreps::Subscription::new(
        socketuris.clone(),
        vec![topic.clone()],
        move |sub: &coreps::Subscription, msg: coreps::PubSubMessage| {
            crecv.lock().unwrap().callback(&sub, msg)
        },
    )?;
    // Scope limit the lock to this critical part
    {
        let mut psmgr_locked = psmgr.lock();
        psmgr_locked.term_flag = term.clone();
        psmgr_locked.subscribe(&sub)?;
    }
    log::info!("Subscription done");

    // Give the socket a moment to stabilize
    delay_for(Duration::from_millis(150)).await;

    // Track tasks
    let tasks = FuturesUnordered::new();
    tasks.push(tokio::spawn(idletask(term.clone())));
    wait_for_tasks(tasks).await?;

    Ok(())
}

/// Idle, so we have something to wait for
async fn idletask(term: TerminationFlag) -> Fallible<()> {
    loop {
        if term.load(Ordering::Relaxed) {
            log::trace!("Got term flag, exiting idle task");
            return Ok(());
        }
        delay_for(Duration::from_millis(10)).await;
    }
}