use {
crate::*,
log::*,
redis::{self, Commands, Connection},
serde::Deserialize,
std::{
time::SystemTime,
},
};
#[derive(Debug, Deserialize)]
pub struct WatcherConf {
pub input_queue: String,
pub taken_queue: Option<String>,
pub rules: Vec<Rule>,
}
pub struct Watcher {
con: Connection,
listener_channel: String,
input_queue: String,
taken_queue: String, ruleset: Ruleset,
}
impl Watcher {
pub fn new(
watcher_conf: &WatcherConf,
global_conf: &Conf,
) -> Result<Self, RescError> {
let listener_channel = global_conf.listener_channel.clone();
let input_queue = watcher_conf.input_queue.clone();
let taken_queue = match watcher_conf.taken_queue.as_ref() {
Some(queue) => queue.clone(),
None => format!("{}/taken", &input_queue),
};
let ruleset = Ruleset {
rules: watcher_conf.rules.clone(),
};
let client = redis::Client::open(&*global_conf.redis.url)?;
let con = client.get_connection()?;
debug!("got redis connection");
Ok(Self {
con,
listener_channel,
input_queue,
taken_queue,
ruleset,
})
}
pub fn run(&mut self) -> Result<(), RescError> {
self.empty_taken_queue();
self.watch_input_queue()
}
fn empty_taken_queue(&mut self) {
debug!("watcher cleans its taken queue");
let mut n = 0;
while let Ok(taken) = self.con.rpoplpush::<_, String>(&self.taken_queue, &self.input_queue) {
debug!(
" moving {:?} from {:?} to {:?}",
&taken, &self.taken_queue, &self.input_queue
);
n += 1;
}
if n > 0 {
warn!(
"moved {} tasks from {:?} to {:?}",
n, &self.taken_queue, &self.input_queue
);
}
}
fn handle_input_event(&mut self, event: String) -> Result<(), RescError> {
let now = now_secs();
info!(
"<- got {:?} in queue {:?} @ {}",
&event, &self.input_queue, now
);
let mut results = Vec::new();
for rule in self.ruleset.matching_rules(&event) {
debug!(" applying rule {:?}", rule.name);
match rule.results(&event) {
Ok(mut rule_results) => {
results.append(&mut rule_results);
}
Err(e) => {
error!(" Rule execution failed: {:?}", e);
}
}
}
debug!(" {} result(s)", results.len());
for r in results {
let in_set_time: Option<i32> = r.set.as_ref()
.and_then(|s| self.con.zscore(s, &r.task).ok());
if let Some(time) = in_set_time {
info!(" task {:?} already queued @ {}", &r.task, time);
continue;
}
info!(" -> {:?} pushed to queue {:?}", &r.task, &r.queue);
if let Some(task_set) = r.set.as_ref() {
self.con.zadd(task_set, &r.task, now)?;
debug!(
" {:?} pushed to task_set {:?} @ {}",
&r.task, task_set, now
);
}
self.con.lpush(&r.queue, &r.task)?;
self.con.publish(
&self.listener_channel,
format!("{} TRIGGER {} -> {}", &self.taken_queue, &event, &r.task),
)?;
}
self.con.lrem(&self.taken_queue, 1, &event)?;
self.con.publish(
&self.listener_channel,
format!("{} DONE {}", &self.taken_queue, &event),
)?;
debug!(" done with task {:?}", &event);
Ok(())
}
fn watch_input_queue(&mut self) -> Result<(), RescError> {
info!("watcher launched on queue {:?}...", &self.input_queue);
loop {
match self.con.brpoplpush(&self.input_queue, &self.taken_queue, 0) {
Ok(event) => {
self.handle_input_event(event)?
}
Err(e) => {
error!("BRPOPLPUSH on {:?} failed : {}", &self.input_queue, e);
}
}
}
}
}
fn now_secs() -> f64 {
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs()
as f64
}