godwit_daemon/runner/
mod.rs1use crate::config::*;
5use crate::core::Ops;
6use crate::dispatcher::{self, DispatchMsg, ResponseMsg};
7use crate::errors::{NetworkError, RunError};
8use crate::prochandler::{self, HandleOps};
9use log::debug;
10use serde::{Deserialize, Serialize};
11use std::sync::mpsc::channel;
12use threadpool::ThreadPool;
13use zmq::{self, Context, Message};
14
15#[derive(Debug, Serialize, Deserialize)]
16pub enum Regress {
17 Once,
18 Infinite,
19}
20
21pub fn init_daemon() -> Result<(), NetworkError> {
22 let pool = ThreadPool::new(get_config()?.max_threads);
23
24 let context = Context::new();
25 let responder = context.socket(zmq::REP).unwrap();
26
27 responder.bind(get_config()?.daemon_url.as_str())?;
28
29 debug!("Started daemon listening on: {}", get_config()?.daemon_url);
30
31 let (tx, rx) = channel();
32
33 loop {
34 let tx = tx.clone();
35
36 let mut msg = Message::new();
37
38 responder.recv(&mut msg, 0)?;
39
40 debug!("Captured request {:?}", msg.as_str().unwrap_or_default());
41
42 let DispatchMsg {
43 proctype,
44 func,
45 application,
46 refresh,
47 regress_counter,
48 } = serde_json::from_str(msg.as_str().unwrap_or_default())?;
49
50 match regress_counter.unwrap_or(Regress::Once) {
51 Regress::Once => {
52 pool.execute(move || {
53 let retmsg = match prochandler::handle(proctype, func, application, refresh) {
54 Ok(_) => Ok(ResponseMsg {
55 code: String::from("S000"),
56 message: String::from("Process successfully cleared."),
57 }),
58 Err(e) => Err(ResponseMsg {
59 code: String::from("E002"),
60 message: e.to_string(),
61 }),
62 };
63
64 tx.send(retmsg.unwrap())
65 .expect("Channel blocked for the pool");
66 });
67
68 let retmsg = rx.recv().unwrap_or_default();
69 println!("{:?}", retmsg);
70
71 responder
72 .send(
73 &serde_json::to_string(&retmsg).expect(
74 format!("Tried to serialize invalid message string: {:?}", retmsg)
75 .as_str(),
76 ),
77 0,
78 )
79 .expect("Error occured while sending return message.");
80 }
81 Regress::Infinite => {
82 let ack_msg = ResponseMsg {
83 code: String::from("S001"),
84 message: String::from("Regression started on thread."),
85 };
86
87 responder
88 .send(
89 &serde_json::to_string(&ack_msg).expect(
90 format!("Tried to serialize invalid message string: {:?}", ack_msg)
91 .as_str(),
92 ),
93 0,
94 )
95 .expect("Error occured while sending return message.");
96 }
108 }
109 }
110}
111
112pub fn daemon_running() -> bool {
113 dispatcher::heartbeat().is_ok()
114}
115
116pub fn run(
117 func: Ops,
118 application: &str,
119 refresh: bool,
120 regress_counter: Regress,
121) -> Result<(), RunError> {
122 if !daemon_running() {
123 init_daemon().map_err(Into::into)
124 } else {
125 dispatcher::send(DispatchMsg {
126 proctype: HandleOps::Run,
127 func: Some(func),
128 application: Some(application.to_string().to_lowercase()),
129 refresh: refresh,
130 regress_counter: Some(regress_counter),
131 })?;
132 Ok(())
133 }
134}
135
136pub fn kill(func: Ops, application: &str) -> Result<(), RunError> {
137 dispatcher::send(DispatchMsg {
138 proctype: HandleOps::Kill,
139 func: Some(func),
140 application: Some(application.to_string()),
141 ..Default::default()
142 })?;
143 Ok(())
144}