use crate::config::*;
use crate::core::Ops;
use crate::dispatcher::{self, DispatchMsg, ResponseMsg};
use crate::errors::{NetworkError, RunError};
use crate::prochandler::{self, HandleOps};
use log::debug;
use serde::{Deserialize, Serialize};
use std::sync::mpsc::channel;
use threadpool::ThreadPool;
use zmq::{self, Context, Message};
#[derive(Debug, Serialize, Deserialize)]
pub enum Regress {
Once,
Infinite,
}
pub fn init_daemon() -> Result<(), NetworkError> {
let pool = ThreadPool::new(get_config()?.max_threads);
let context = Context::new();
let responder = context.socket(zmq::REP).unwrap();
responder.bind(get_config()?.daemon_url.as_str())?;
debug!("Started daemon listening on: {}", get_config()?.daemon_url);
let (tx, rx) = channel();
loop {
let tx = tx.clone();
let mut msg = Message::new();
responder.recv(&mut msg, 0)?;
debug!("Captured request {:?}", msg.as_str().unwrap_or_default());
let DispatchMsg {
proctype,
func,
application,
refresh,
regress_counter,
} = serde_json::from_str(msg.as_str().unwrap_or_default())?;
match regress_counter.unwrap_or(Regress::Once) {
Regress::Once => {
pool.execute(move || {
let retmsg = match prochandler::handle(proctype, func, application, refresh) {
Ok(_) => Ok(ResponseMsg {
code: String::from("S000"),
message: String::from("Process successfully cleared."),
}),
Err(e) => Err(ResponseMsg {
code: String::from("E002"),
message: e.to_string(),
}),
};
tx.send(retmsg.unwrap())
.expect("Channel blocked for the pool");
});
let retmsg = rx.recv().unwrap_or_default();
println!("{:?}", retmsg);
responder
.send(
&serde_json::to_string(&retmsg).expect(
format!("Tried to serialize invalid message string: {:?}", retmsg)
.as_str(),
),
0,
)
.expect("Error occured while sending return message.");
}
Regress::Infinite => {
let ack_msg = ResponseMsg {
code: String::from("S001"),
message: String::from("Regression started on thread."),
};
responder
.send(
&serde_json::to_string(&ack_msg).expect(
format!("Tried to serialize invalid message string: {:?}", ack_msg)
.as_str(),
),
0,
)
.expect("Error occured while sending return message.");
}
}
}
}
pub fn daemon_running() -> bool {
dispatcher::heartbeat().is_ok()
}
pub fn run(
func: Ops,
application: &str,
refresh: bool,
regress_counter: Regress,
) -> Result<(), RunError> {
if !daemon_running() {
init_daemon().map_err(Into::into)
} else {
dispatcher::send(DispatchMsg {
proctype: HandleOps::Run,
func: Some(func),
application: Some(application.to_string().to_lowercase()),
refresh: refresh,
regress_counter: Some(regress_counter),
})?;
Ok(())
}
}
pub fn kill(func: Ops, application: &str) -> Result<(), RunError> {
dispatcher::send(DispatchMsg {
proctype: HandleOps::Kill,
func: Some(func),
application: Some(application.to_string()),
..Default::default()
})?;
Ok(())
}