datastreamservicelib 1.0.0

Rust version of https://gitlab.com/advian-oss/python-datastreamservicelib
Documentation
/// Generic helpers
use crate::zmqwrappers::TokioPubSubManager;
use crate::TerminationFlag;
use datastreamcorelib::markers::PubSubMessageMarker;
// We need to import this trait to use methods in it
use datastreamcorelib::pubsub::PubSubManager;
use failure::Fallible;
use futures::stream::{FuturesUnordered, StreamExt};
use log;
use std::fmt;
use std::path::Path;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use tokio::task::JoinHandle;
use toml;

/// Setup termination on signals and return the flag so it can be passed around
pub fn setup_signals_termination() -> Fallible<TerminationFlag> {
    let term = Arc::new(AtomicBool::new(false));
    log::trace!("Registering SIGTERM");
    signal_hook::flag::register(signal_hook::SIGTERM, term.clone())?;
    log::trace!("Registering SIGINT");
    signal_hook::flag::register(signal_hook::SIGINT, term.clone())?;
    log::trace!("Registering SIGQUIT");
    signal_hook::flag::register(signal_hook::SIGQUIT, term.clone())?;
    log::trace!("Signals registering done");
    Ok(term)
}

/// Parse a TOML config file
pub fn parse_config_file(configpath: &Path) -> Fallible<toml::Value> {
    if !configpath.exists() {
        return Err(failure::err_msg("configfile does not exist"));
    }
    let raw_config = std::fs::read_to_string(&configpath)?;
    let parsed_config = toml::from_str(raw_config.as_str())?;
    Ok(parsed_config)
}

/// Convert config array to vector of strings (for list of uris for example...)
pub fn configval_as_string_vec(val: &toml::Value) -> Fallible<Vec<String>> {
    match val.as_array() {
        None => return Err(failure::err_msg("Can't use value as array")),
        Some(vals) => {
            let mut ret: Vec<String> = Vec::with_capacity(vals.len());
            for val in vals {
                match val.as_str() {
                    None => log::error!("Can't handle {:?} as str", val),
                    Some(strval) => ret.push(strval.to_string()),
                }
            }
            return Ok(ret);
        }
    }
}

/// Check all tasks, if one fails abort everything
pub async fn wait_for_tasks<T: fmt::Debug>(
    mut tasks: FuturesUnordered<JoinHandle<Fallible<T>>>,
) -> Fallible<()> {
    loop {
        match tasks.next().await {
            Some(result) => {
                log::trace!("finished future [{:?}]", result);
                match result {
                    Ok(inner) => match inner {
                        Ok(_) => {}
                        Err(e) => {
                            log::error!("Task failed with {:?}", e);
                            return Err(failure::err_msg("Task failed"));
                        }
                    },
                    Err(e) => {
                        log::error!("Task join failed with {:?}", e);
                        return Err(failure::err_msg("Task join failed"));
                    }
                }
            }
            None => {
                log::trace!("All tasks done");
                break;
            }
        }
    }
    Ok(())
}

/// Fire-and-forget message sending via event loop (to avoid deadlocks)
pub async fn send_task(new_msg: impl PubSubMessageMarker) -> Fallible<()> {
    let psmgr = TokioPubSubManager::instance();
    psmgr.lock().publish(&new_msg)?;
    Ok(())
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::env::temp_dir;
    use std::path::PathBuf;

    fn configfile() -> PathBuf {
        let mut configpath = temp_dir();
        configpath.push("9ed06261-9eca-45b0-b26c-32145761d071.toml");
        std::fs::write(
            &configpath,
            r###"
[zmq]
pub_sockets = ["ipc:///tmp/configtest_pub.sock"]
"###,
        )
        .unwrap();
        configpath
    }

    #[test]
    fn parse_config() {
        let configpath = configfile();
        let parsed = parse_config_file(&configpath).unwrap();
        let socketpaths = configval_as_string_vec(&parsed["zmq"]["pub_sockets"]).unwrap();
        assert_eq!(socketpaths.len(), 1);
        assert_eq!(socketpaths[0], "ipc:///tmp/configtest_pub.sock".to_string());
    }
}