godwit_daemon/runner/
mod.rs

1//! Runner
2//!
3//! Defines a thread-runnable class. Contains list of runnable operations.
4use crate::config::*;
5use crate::core::Ops;
6use crate::dispatcher::{self, DispatchMsg, ResponseMsg};
7use crate::errors::{NetworkError, RunError};
8use crate::prochandler::{self, HandleOps};
9use log::debug;
10use serde::{Deserialize, Serialize};
11use std::sync::mpsc::channel;
12use threadpool::ThreadPool;
13use zmq::{self, Context, Message};
14
15#[derive(Debug, Serialize, Deserialize)]
16pub enum Regress {
17	Once,
18	Infinite,
19}
20
21pub fn init_daemon() -> Result<(), NetworkError> {
22	let pool = ThreadPool::new(get_config()?.max_threads);
23
24	let context = Context::new();
25	let responder = context.socket(zmq::REP).unwrap();
26
27	responder.bind(get_config()?.daemon_url.as_str())?;
28
29	debug!("Started daemon listening on: {}", get_config()?.daemon_url);
30
31	let (tx, rx) = channel();
32
33	loop {
34		let tx = tx.clone();
35
36		let mut msg = Message::new();
37
38		responder.recv(&mut msg, 0)?;
39
40		debug!("Captured request {:?}", msg.as_str().unwrap_or_default());
41
42		let DispatchMsg {
43			proctype,
44			func,
45			application,
46			refresh,
47			regress_counter,
48		} = serde_json::from_str(msg.as_str().unwrap_or_default())?;
49
50		match regress_counter.unwrap_or(Regress::Once) {
51			Regress::Once => {
52				pool.execute(move || {
53					let retmsg = match prochandler::handle(proctype, func, application, refresh) {
54						Ok(_) => Ok(ResponseMsg {
55							code: String::from("S000"),
56							message: String::from("Process successfully cleared."),
57						}),
58						Err(e) => Err(ResponseMsg {
59							code: String::from("E002"),
60							message: e.to_string(),
61						}),
62					};
63
64					tx.send(retmsg.unwrap())
65						.expect("Channel blocked for the pool");
66				});
67
68				let retmsg = rx.recv().unwrap_or_default();
69				println!("{:?}", retmsg);
70
71				responder
72					.send(
73						&serde_json::to_string(&retmsg).expect(
74							format!("Tried to serialize invalid message string: {:?}", retmsg)
75								.as_str(),
76						),
77						0,
78					)
79					.expect("Error occured while sending return message.");
80			}
81			Regress::Infinite => {
82				let ack_msg = ResponseMsg {
83					code: String::from("S001"),
84					message: String::from("Regression started on thread."),
85				};
86
87				responder
88					.send(
89						&serde_json::to_string(&ack_msg).expect(
90							format!("Tried to serialize invalid message string: {:?}", ack_msg)
91								.as_str(),
92						),
93						0,
94					)
95					.expect("Error occured while sending return message.");
96				// loop {
97				// 	let tx = tx.clone();
98				//
99				// 	pool.execute(move || {
100				// 		let retmsg = prochandler::handle(proctype, func, application, refresh);
101				//
102				// 		thread::sleep(Duration::from_millis(1000)); // Emulate processing
103				//
104				// 		// TODO: Write concise retmsg stream to file.
105				// 	});
106				// }
107			}
108		}
109	}
110}
111
112pub fn daemon_running() -> bool {
113	dispatcher::heartbeat().is_ok()
114}
115
116pub fn run(
117	func: Ops,
118	application: &str,
119	refresh: bool,
120	regress_counter: Regress,
121) -> Result<(), RunError> {
122	if !daemon_running() {
123		init_daemon().map_err(Into::into)
124	} else {
125		dispatcher::send(DispatchMsg {
126			proctype: HandleOps::Run,
127			func: Some(func),
128			application: Some(application.to_string().to_lowercase()),
129			refresh: refresh,
130			regress_counter: Some(regress_counter),
131		})?;
132		Ok(())
133	}
134}
135
136pub fn kill(func: Ops, application: &str) -> Result<(), RunError> {
137	dispatcher::send(DispatchMsg {
138		proctype: HandleOps::Kill,
139		func: Some(func),
140		application: Some(application.to_string()),
141		..Default::default()
142	})?;
143	Ok(())
144}