use crate::zmqwrappers::TokioPubSubManager;
use crate::TerminationFlag;
use datastreamcorelib::markers::PubSubMessageMarker;
use datastreamcorelib::pubsub::PubSubManager;
use failure::Fallible;
use futures::stream::{FuturesUnordered, StreamExt};
use log;
use std::fmt;
use std::path::Path;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use tokio::task::JoinHandle;
use toml;
pub fn setup_signals_termination() -> Fallible<TerminationFlag> {
let term = Arc::new(AtomicBool::new(false));
log::trace!("Registering SIGTERM");
signal_hook::flag::register(signal_hook::SIGTERM, term.clone())?;
log::trace!("Registering SIGINT");
signal_hook::flag::register(signal_hook::SIGINT, term.clone())?;
log::trace!("Registering SIGQUIT");
signal_hook::flag::register(signal_hook::SIGQUIT, term.clone())?;
log::trace!("Signals registering done");
Ok(term)
}
pub fn parse_config_file(configpath: &Path) -> Fallible<toml::Value> {
if !configpath.exists() {
return Err(failure::err_msg("configfile does not exist"));
}
let raw_config = std::fs::read_to_string(&configpath)?;
let parsed_config = toml::from_str(raw_config.as_str())?;
Ok(parsed_config)
}
pub fn configval_as_string_vec(val: &toml::Value) -> Fallible<Vec<String>> {
match val.as_array() {
None => return Err(failure::err_msg("Can't use value as array")),
Some(vals) => {
let mut ret: Vec<String> = Vec::with_capacity(vals.len());
for val in vals {
match val.as_str() {
None => log::error!("Can't handle {:?} as str", val),
Some(strval) => ret.push(strval.to_string()),
}
}
return Ok(ret);
}
}
}
pub async fn wait_for_tasks<T: fmt::Debug>(
mut tasks: FuturesUnordered<JoinHandle<Fallible<T>>>,
) -> Fallible<()> {
loop {
match tasks.next().await {
Some(result) => {
log::trace!("finished future [{:?}]", result);
match result {
Ok(inner) => match inner {
Ok(_) => {}
Err(e) => {
log::error!("Task failed with {:?}", e);
return Err(failure::err_msg("Task failed"));
}
},
Err(e) => {
log::error!("Task join failed with {:?}", e);
return Err(failure::err_msg("Task join failed"));
}
}
}
None => {
log::trace!("All tasks done");
break;
}
}
}
Ok(())
}
pub async fn send_task(new_msg: impl PubSubMessageMarker) -> Fallible<()> {
let psmgr = TokioPubSubManager::instance();
psmgr.lock().publish(&new_msg)?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::env::temp_dir;
use std::path::PathBuf;
fn configfile() -> PathBuf {
let mut configpath = temp_dir();
configpath.push("9ed06261-9eca-45b0-b26c-32145761d071.toml");
std::fs::write(
&configpath,
r###"
[zmq]
pub_sockets = ["ipc:///tmp/configtest_pub.sock"]
"###,
)
.unwrap();
configpath
}
#[test]
fn parse_config() {
let configpath = configfile();
let parsed = parse_config_file(&configpath).unwrap();
let socketpaths = configval_as_string_vec(&parsed["zmq"]["pub_sockets"]).unwrap();
assert_eq!(socketpaths.len(), 1);
assert_eq!(socketpaths[0], "ipc:///tmp/configtest_pub.sock".to_string());
}
}